Start work on timer-based swarm tracking

pull/53/head
Zlatin Balevsky 2020-04-29 05:21:18 +01:00
parent 0a66267264
commit eec9bab081
3 changed files with 82 additions and 18 deletions

View File

@ -12,7 +12,7 @@ import groovy.util.logging.Log
*/
@Log
class Swarm {
private final InfoHash infoHash
final InfoHash infoHash
/**
* Invariant: these four collections are mutually exclusive.
@ -29,6 +29,11 @@ class Swarm {
*/
private final Map<Persona, Host> inFlight = new HashMap<>()
/**
* Last time a query was made to the MW network for this hash
*/
private long lastQueryTime
Swarm(InfoHash infoHash) {
this.infoHash = infoHash
}
@ -51,11 +56,17 @@ class Swarm {
}
}
synchronized boolean needsQuery() {
seeds.isEmpty() &&
synchronized boolean shouldQuery(long queryCutoff, long now) {
if (!(seeds.isEmpty() &&
leeches.isEmpty() &&
inFlight.isEmpty() &&
unknown.isEmpty()
unknown.isEmpty()))
return false
if (lastQueryTime <= queryCutoff) {
lastQueryTime = now
return true
}
false
}
synchronized boolean isHealthy() {

View File

@ -28,21 +28,35 @@ class SwarmManager {
@Autowired
private Pinger pinger
@Autowired
private TrackerProperties trackerProperties
private final Map<InfoHash, Swarm> swarms = new ConcurrentHashMap<>()
private final Map<UUID, InfoHash> queries = new ConcurrentHashMap<>()
private final Timer swarmTimer = new Timer("swarm-timer",true)
@PostConstruct
public void postConstruct() {
core.eventBus.register(UIResultBatchEvent.class, this)
swarmTimer.schedule({trackSwarms()} as TimerTask, 10 * 1000, 10 * 1000)
}
void onUIResultBatchEvent(UIResultBatchEvent e) {
InfoHash stored = queries.get(e.uuid)
InfoHash ih = e.results[0].infohash
if (ih != stored) {
log.warning("infohash mismatch in result $ih vs $stored")
return
}
Swarm swarm = swarms.get(ih)
if (swarm == null) {
log.warning("no swarm found for result with infoHash $ih")
return
}
log.info("got a result with uuid ${e.uuid} for infoHash $ih")
swarm.add(e.results[0].sender)
}
@ -50,25 +64,51 @@ class SwarmManager {
swarms.size()
}
void track(InfoHash infoHash) {
Swarm swarm = swarms.computeIfAbsent(infoHash, {new Swarm(it)} as Function)
if (swarm.needsQuery()) {
UUID uuid = UUID.randomUUID()
def searchEvent = new SearchEvent(searchHash : infoHash.getRoot(), uuid: uuid, oobInfohash: true, compressedResults : true, persona : core.me)
byte [] payload = infoHash.getRoot()
boolean firstHop = core.muOptions.allowUntrusted || core.muOptions.searchExtraHop
Signature sig = DSAEngine.getInstance().sign(payload, core.spk)
long timestamp = System.currentTimeMillis()
core.eventBus.publish(new QueryEvent(searchEvent : searchEvent, firstHop : firstHop,
replyTo: core.me.destination, receivedOn: core.me.destination,
originator : core.me, sig : sig.data, queryTime : timestamp, sig2 : DataUtil.signUUID(uuid, timestamp, core.spk)))
private void trackSwarms() {
final long now = System.currentTimeMillis()
final long expiryCutoff = now - trackerProperties.getSwarmParameters().getExpiry() * 60 * 1000L
swarms.values().each { it.expire(expiryCutoff) }
final long queryCutoff = now - trackerProperties.getSwarmParameters().getQueryInterval() * 60 * 60 * 1000L
swarms.values().each {
if (it.shouldQuery(queryCutoff, now))
query(it)
}
}
private void query(Swarm swarm) {
InfoHash infoHash = swarm.getInfoHash()
cleanQueryMap(infoHash)
UUID uuid = UUID.randomUUID()
queries.put(uuid, infoHash)
log.info("will query MW network for $infoHash with uuid $uuid")
def searchEvent = new SearchEvent(searchHash : infoHash.getRoot(), uuid: uuid, oobInfohash: true, compressedResults : true, persona : core.me)
byte [] payload = infoHash.getRoot()
boolean firstHop = core.muOptions.allowUntrusted || core.muOptions.searchExtraHop
Signature sig = DSAEngine.getInstance().sign(payload, core.spk)
long timestamp = System.currentTimeMillis()
core.eventBus.publish(new QueryEvent(searchEvent : searchEvent, firstHop : firstHop,
replyTo: core.me.destination, receivedOn: core.me.destination,
originator : core.me, sig : sig.data, queryTime : timestamp, sig2 : DataUtil.signUUID(uuid, timestamp, core.spk)))
}
void track(InfoHash infoHash) {
swarms.computeIfAbsent(infoHash, {new Swarm(it)} as Function)
}
boolean forget(InfoHash infoHash) {
Swarm swarm = swarms.remove(infoHash)
swarm != null
if (swarm != null) {
cleanQueryMap(infoHash)
return true
} else
return false
}
private void cleanQueryMap(InfoHash infoHash) {
queries.values().removeAll {it == infoHash}
}
Swarm.Info info(InfoHash infoHash) {

View File

@ -13,4 +13,17 @@ class TrackerProperties {
InetAddress iface
int port
}
final SwarmParameters swarmParameters = new SwarmParameters()
public static class SwarmParameters {
/** how often to kick of queries on the MW net, in hours */
int queryInterval = 1
/** how many hosts to ping in parallel */
int pingParallel = 5
/** interval of time between pinging the same host, in minutes */
int pingInterval = 15
/** how long to wait before declaring a host is dead, in minutes */
int expiry = 60
}
}