mirror of https://github.com/zlatinb/muwire
delay sending of results to prevent duplicates
parent
c932e5675c
commit
ee004e8e8c
|
@ -1,7 +1,10 @@
|
||||||
package com.muwire.core.search
|
package com.muwire.core.search
|
||||||
|
|
||||||
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
|
||||||
import com.muwire.core.EventBus
|
import com.muwire.core.EventBus
|
||||||
import com.muwire.core.Persona
|
import com.muwire.core.Persona
|
||||||
|
import com.muwire.core.SharedFile
|
||||||
|
|
||||||
import groovy.util.logging.Log
|
import groovy.util.logging.Log
|
||||||
import net.i2p.data.Destination
|
import net.i2p.data.Destination
|
||||||
|
@ -11,12 +14,15 @@ public class SearchManager {
|
||||||
|
|
||||||
private static final int EXPIRE_TIME = 60 * 1000 * 1000
|
private static final int EXPIRE_TIME = 60 * 1000 * 1000
|
||||||
private static final int CHECK_INTERVAL = 60 * 1000
|
private static final int CHECK_INTERVAL = 60 * 1000
|
||||||
|
private static final int RESULT_DELAY = 100
|
||||||
|
|
||||||
private final EventBus eventBus
|
private final EventBus eventBus
|
||||||
private final Persona me
|
private final Persona me
|
||||||
private final ResultsSender resultsSender
|
private final ResultsSender resultsSender
|
||||||
private final Map<UUID, QueryEvent> responderAddress = Collections.synchronizedMap(new HashMap<>())
|
private final Map<UUID, QueryEvent> responderAddress = Collections.synchronizedMap(new HashMap<>())
|
||||||
|
|
||||||
|
private final Map<UUID, ResultBatch> pendingResults = new HashMap<>()
|
||||||
|
|
||||||
SearchManager(){}
|
SearchManager(){}
|
||||||
|
|
||||||
SearchManager(EventBus eventBus, Persona me, ResultsSender resultsSender) {
|
SearchManager(EventBus eventBus, Persona me, ResultsSender resultsSender) {
|
||||||
|
@ -25,6 +31,7 @@ public class SearchManager {
|
||||||
this.resultsSender = resultsSender
|
this.resultsSender = resultsSender
|
||||||
Timer timer = new Timer("query-expirer", true)
|
Timer timer = new Timer("query-expirer", true)
|
||||||
timer.schedule({cleanup()} as TimerTask, CHECK_INTERVAL, CHECK_INTERVAL)
|
timer.schedule({cleanup()} as TimerTask, CHECK_INTERVAL, CHECK_INTERVAL)
|
||||||
|
timer.schedule({sendBatched()}, RESULT_DELAY, RESULT_DELAY)
|
||||||
}
|
}
|
||||||
|
|
||||||
void onQueryEvent(QueryEvent event) {
|
void onQueryEvent(QueryEvent event) {
|
||||||
|
@ -36,7 +43,7 @@ public class SearchManager {
|
||||||
eventBus.publish(event.searchEvent)
|
eventBus.publish(event.searchEvent)
|
||||||
}
|
}
|
||||||
|
|
||||||
void onResultsEvent(ResultsEvent event) {
|
synchronized void onResultsEvent(ResultsEvent event) {
|
||||||
Destination target = responderAddress.get(event.uuid)?.replyTo
|
Destination target = responderAddress.get(event.uuid)?.replyTo
|
||||||
if (target == null)
|
if (target == null)
|
||||||
throw new IllegalStateException("UUID unknown $event.uuid")
|
throw new IllegalStateException("UUID unknown $event.uuid")
|
||||||
|
@ -44,7 +51,10 @@ public class SearchManager {
|
||||||
log.info("No results for search uuid $event.uuid")
|
log.info("No results for search uuid $event.uuid")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
resultsSender.sendResults(event.uuid, event.results, target, event.searchEvent.oobInfohash, event.searchEvent.compressedResults)
|
|
||||||
|
ResultBatch batch = pendingResults.putIfAbsent(event.uuid, new ResultBatch(event, target))
|
||||||
|
if (batch != null)
|
||||||
|
batch.results.addAll(event.results)
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean hasLocalSearch(UUID uuid) {
|
boolean hasLocalSearch(UUID uuid) {
|
||||||
|
@ -62,4 +72,30 @@ public class SearchManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private synchronized void sendBatched() {
|
||||||
|
if (pendingResults.isEmpty())
|
||||||
|
return
|
||||||
|
|
||||||
|
Set<ResultBatch> copy = new HashSet<>(pendingResults.values())
|
||||||
|
pendingResults.clear()
|
||||||
|
|
||||||
|
copy.each {
|
||||||
|
SharedFile[] results = it.results.toArray()
|
||||||
|
resultsSender.sendResults(it.resultsEvent.uuid, results, it.target,
|
||||||
|
it.resultsEvent.searchEvent.oobInfohash,
|
||||||
|
it.resultsEvent.searchEvent.compressedResults)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class ResultBatch {
|
||||||
|
final ResultsEvent resultsEvent
|
||||||
|
final Destination target
|
||||||
|
final Set<SharedFile> results = new HashSet<>()
|
||||||
|
ResultBatch(ResultsEvent resultsEvent, Destination target) {
|
||||||
|
this.resultsEvent = resultsEvent
|
||||||
|
this.target = target
|
||||||
|
results.addAll(resultsEvent.results)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue