From eec9bab081fb1437433ee976c48e93e5cd600b77 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Wed, 29 Apr 2020 05:21:18 +0100 Subject: [PATCH] Start work on timer-based swarm tracking --- .../groovy/com/muwire/tracker/Swarm.groovy | 19 ++++-- .../com/muwire/tracker/SwarmManager.groovy | 68 +++++++++++++++---- .../muwire/tracker/TrackerProperties.groovy | 13 ++++ 3 files changed, 82 insertions(+), 18 deletions(-) diff --git a/tracker/src/main/groovy/com/muwire/tracker/Swarm.groovy b/tracker/src/main/groovy/com/muwire/tracker/Swarm.groovy index 030e4925..d25bac7a 100644 --- a/tracker/src/main/groovy/com/muwire/tracker/Swarm.groovy +++ b/tracker/src/main/groovy/com/muwire/tracker/Swarm.groovy @@ -12,7 +12,7 @@ import groovy.util.logging.Log */ @Log class Swarm { - private final InfoHash infoHash + final InfoHash infoHash /** * Invariant: these four collections are mutually exclusive. @@ -29,6 +29,11 @@ class Swarm { */ private final Map inFlight = new HashMap<>() + /** + * Last time a query was made to the MW network for this hash + */ + private long lastQueryTime + Swarm(InfoHash infoHash) { this.infoHash = infoHash } @@ -51,11 +56,17 @@ class Swarm { } } - synchronized boolean needsQuery() { - seeds.isEmpty() && + synchronized boolean shouldQuery(long queryCutoff, long now) { + if (!(seeds.isEmpty() && leeches.isEmpty() && inFlight.isEmpty() && - unknown.isEmpty() + unknown.isEmpty())) + return false + if (lastQueryTime <= queryCutoff) { + lastQueryTime = now + return true + } + false } synchronized boolean isHealthy() { diff --git a/tracker/src/main/groovy/com/muwire/tracker/SwarmManager.groovy b/tracker/src/main/groovy/com/muwire/tracker/SwarmManager.groovy index 24679ebf..cb311038 100644 --- a/tracker/src/main/groovy/com/muwire/tracker/SwarmManager.groovy +++ b/tracker/src/main/groovy/com/muwire/tracker/SwarmManager.groovy @@ -28,21 +28,35 @@ class SwarmManager { @Autowired private Pinger pinger + @Autowired + private TrackerProperties trackerProperties + private final Map swarms = new ConcurrentHashMap<>() + private final Map queries = new ConcurrentHashMap<>() + private final Timer swarmTimer = new Timer("swarm-timer",true) @PostConstruct public void postConstruct() { core.eventBus.register(UIResultBatchEvent.class, this) + swarmTimer.schedule({trackSwarms()} as TimerTask, 10 * 1000, 10 * 1000) } void onUIResultBatchEvent(UIResultBatchEvent e) { + InfoHash stored = queries.get(e.uuid) InfoHash ih = e.results[0].infohash + + if (ih != stored) { + log.warning("infohash mismatch in result $ih vs $stored") + return + } + Swarm swarm = swarms.get(ih) if (swarm == null) { log.warning("no swarm found for result with infoHash $ih") return } + log.info("got a result with uuid ${e.uuid} for infoHash $ih") swarm.add(e.results[0].sender) } @@ -50,25 +64,51 @@ class SwarmManager { swarms.size() } - void track(InfoHash infoHash) { - Swarm swarm = swarms.computeIfAbsent(infoHash, {new Swarm(it)} as Function) - if (swarm.needsQuery()) { - UUID uuid = UUID.randomUUID() - def searchEvent = new SearchEvent(searchHash : infoHash.getRoot(), uuid: uuid, oobInfohash: true, compressedResults : true, persona : core.me) - byte [] payload = infoHash.getRoot() - boolean firstHop = core.muOptions.allowUntrusted || core.muOptions.searchExtraHop - - Signature sig = DSAEngine.getInstance().sign(payload, core.spk) - long timestamp = System.currentTimeMillis() - core.eventBus.publish(new QueryEvent(searchEvent : searchEvent, firstHop : firstHop, - replyTo: core.me.destination, receivedOn: core.me.destination, - originator : core.me, sig : sig.data, queryTime : timestamp, sig2 : DataUtil.signUUID(uuid, timestamp, core.spk))) + private void trackSwarms() { + final long now = System.currentTimeMillis() + final long expiryCutoff = now - trackerProperties.getSwarmParameters().getExpiry() * 60 * 1000L + swarms.values().each { it.expire(expiryCutoff) } + final long queryCutoff = now - trackerProperties.getSwarmParameters().getQueryInterval() * 60 * 60 * 1000L + swarms.values().each { + if (it.shouldQuery(queryCutoff, now)) + query(it) } } + private void query(Swarm swarm) { + InfoHash infoHash = swarm.getInfoHash() + cleanQueryMap(infoHash) + UUID uuid = UUID.randomUUID() + queries.put(uuid, infoHash) + + log.info("will query MW network for $infoHash with uuid $uuid") + + def searchEvent = new SearchEvent(searchHash : infoHash.getRoot(), uuid: uuid, oobInfohash: true, compressedResults : true, persona : core.me) + byte [] payload = infoHash.getRoot() + boolean firstHop = core.muOptions.allowUntrusted || core.muOptions.searchExtraHop + + Signature sig = DSAEngine.getInstance().sign(payload, core.spk) + long timestamp = System.currentTimeMillis() + core.eventBus.publish(new QueryEvent(searchEvent : searchEvent, firstHop : firstHop, + replyTo: core.me.destination, receivedOn: core.me.destination, + originator : core.me, sig : sig.data, queryTime : timestamp, sig2 : DataUtil.signUUID(uuid, timestamp, core.spk))) + } + + void track(InfoHash infoHash) { + swarms.computeIfAbsent(infoHash, {new Swarm(it)} as Function) + } + boolean forget(InfoHash infoHash) { Swarm swarm = swarms.remove(infoHash) - swarm != null + if (swarm != null) { + cleanQueryMap(infoHash) + return true + } else + return false + } + + private void cleanQueryMap(InfoHash infoHash) { + queries.values().removeAll {it == infoHash} } Swarm.Info info(InfoHash infoHash) { diff --git a/tracker/src/main/groovy/com/muwire/tracker/TrackerProperties.groovy b/tracker/src/main/groovy/com/muwire/tracker/TrackerProperties.groovy index 76b1547f..994754df 100644 --- a/tracker/src/main/groovy/com/muwire/tracker/TrackerProperties.groovy +++ b/tracker/src/main/groovy/com/muwire/tracker/TrackerProperties.groovy @@ -13,4 +13,17 @@ class TrackerProperties { InetAddress iface int port } + + final SwarmParameters swarmParameters = new SwarmParameters() + + public static class SwarmParameters { + /** how often to kick of queries on the MW net, in hours */ + int queryInterval = 1 + /** how many hosts to ping in parallel */ + int pingParallel = 5 + /** interval of time between pinging the same host, in minutes */ + int pingInterval = 15 + /** how long to wait before declaring a host is dead, in minutes */ + int expiry = 60 + } }