From ee004e8e8c6665a46c3d841a8f04cd4e416633bd Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Mon, 2 Nov 2020 19:52:47 +0000 Subject: [PATCH] delay sending of results to prevent duplicates --- .../muwire/core/search/SearchManager.groovy | 40 ++++++++++++++++++- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/core/src/main/groovy/com/muwire/core/search/SearchManager.groovy b/core/src/main/groovy/com/muwire/core/search/SearchManager.groovy index 55231160..7077014b 100644 --- a/core/src/main/groovy/com/muwire/core/search/SearchManager.groovy +++ b/core/src/main/groovy/com/muwire/core/search/SearchManager.groovy @@ -1,7 +1,10 @@ package com.muwire.core.search +import java.util.concurrent.ConcurrentHashMap + import com.muwire.core.EventBus import com.muwire.core.Persona +import com.muwire.core.SharedFile import groovy.util.logging.Log import net.i2p.data.Destination @@ -11,11 +14,14 @@ public class SearchManager { private static final int EXPIRE_TIME = 60 * 1000 * 1000 private static final int CHECK_INTERVAL = 60 * 1000 + private static final int RESULT_DELAY = 100 private final EventBus eventBus private final Persona me private final ResultsSender resultsSender private final Map responderAddress = Collections.synchronizedMap(new HashMap<>()) + + private final Map pendingResults = new HashMap<>() SearchManager(){} @@ -25,6 +31,7 @@ public class SearchManager { this.resultsSender = resultsSender Timer timer = new Timer("query-expirer", true) timer.schedule({cleanup()} as TimerTask, CHECK_INTERVAL, CHECK_INTERVAL) + timer.schedule({sendBatched()}, RESULT_DELAY, RESULT_DELAY) } void onQueryEvent(QueryEvent event) { @@ -36,7 +43,7 @@ public class SearchManager { eventBus.publish(event.searchEvent) } - void onResultsEvent(ResultsEvent event) { + synchronized void onResultsEvent(ResultsEvent event) { Destination target = responderAddress.get(event.uuid)?.replyTo if (target == null) throw new IllegalStateException("UUID unknown $event.uuid") @@ -44,7 +51,10 @@ public class SearchManager { log.info("No results for search uuid $event.uuid") return } - resultsSender.sendResults(event.uuid, event.results, target, event.searchEvent.oobInfohash, event.searchEvent.compressedResults) + + ResultBatch batch = pendingResults.putIfAbsent(event.uuid, new ResultBatch(event, target)) + if (batch != null) + batch.results.addAll(event.results) } boolean hasLocalSearch(UUID uuid) { @@ -62,4 +72,30 @@ public class SearchManager { } } } + + private synchronized void sendBatched() { + if (pendingResults.isEmpty()) + return + + Set copy = new HashSet<>(pendingResults.values()) + pendingResults.clear() + + copy.each { + SharedFile[] results = it.results.toArray() + resultsSender.sendResults(it.resultsEvent.uuid, results, it.target, + it.resultsEvent.searchEvent.oobInfohash, + it.resultsEvent.searchEvent.compressedResults) + } + } + + private static class ResultBatch { + final ResultsEvent resultsEvent + final Destination target + final Set results = new HashSet<>() + ResultBatch(ResultsEvent resultsEvent, Destination target) { + this.resultsEvent = resultsEvent + this.target = target + results.addAll(resultsEvent.results) + } + } }