From cfb04a9811007323fe67a7f06d295d587ed8dd16 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 27 Jul 2018 09:40:58 +0100 Subject: [PATCH] work on handling pings --- .../muwire/core/connection/Connection.groovy | 25 +++++++++- .../core/connection/LeafConnection.groovy | 5 +- .../core/connection/PeerConnection.groovy | 46 +++++++++++++++++-- .../connection/UltrapeerConnection.groovy | 5 +- 4 files changed, 72 insertions(+), 9 deletions(-) diff --git a/core/src/main/groovy/com/muwire/core/connection/Connection.groovy b/core/src/main/groovy/com/muwire/core/connection/Connection.groovy index 95e80fc1..db96054c 100644 --- a/core/src/main/groovy/com/muwire/core/connection/Connection.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/Connection.groovy @@ -6,6 +6,8 @@ import java.util.concurrent.atomic.AtomicBoolean import java.util.logging.Level import com.muwire.core.EventBus +import com.muwire.core.hostcache.HostCache +import com.muwire.core.hostcache.HostDiscoveredEvent import groovy.util.logging.Log import net.i2p.data.Destination @@ -16,6 +18,7 @@ abstract class Connection implements Closeable { final EventBus eventBus final Endpoint endpoint final boolean incoming + final HostCache hostCache private final AtomicBoolean running = new AtomicBoolean() private final BlockingQueue messages = new LinkedBlockingQueue() @@ -25,10 +28,11 @@ abstract class Connection implements Closeable { long lastPingSentTime, lastPingReceivedTime - Connection(EventBus eventBus, Endpoint endpoint, boolean incoming) { + Connection(EventBus eventBus, Endpoint endpoint, boolean incoming, HostCache hostCache) { this.eventBus = eventBus this.incoming = incoming this.endpoint = endpoint + this.hostCache = hostCache this.name = endpoint.destination.toBase32().substring(0,8) @@ -102,4 +106,23 @@ abstract class Connection implements Closeable { ping.version = 1 messages.put(ping) } + + protected void handlePing() { + log.fine("$name received ping") + def pong = [:] + pong.type = "Pong" + pong.version = 1 + pong.pongs = hostCache.getGoodHosts(10).collect { d -> d.toBase64() } + messages.put(pong) + } + + protected void handlePong(def pong) { + log.fine("$name received pong") + if (pong.pongs == null) + throw new Exception("Pong doesn't have pongs") + pong.pongs.each { + def dest = new Destination(it) + eventBus.publish(new HostDiscoveredEvent(destination: dest)) + } + } } diff --git a/core/src/main/groovy/com/muwire/core/connection/LeafConnection.groovy b/core/src/main/groovy/com/muwire/core/connection/LeafConnection.groovy index 3f97a5d8..f349df24 100644 --- a/core/src/main/groovy/com/muwire/core/connection/LeafConnection.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/LeafConnection.groovy @@ -4,6 +4,7 @@ import java.io.InputStream import java.io.OutputStream import com.muwire.core.EventBus +import com.muwire.core.hostcache.HostCache import net.i2p.data.Destination @@ -14,8 +15,8 @@ import net.i2p.data.Destination */ class LeafConnection extends Connection { - public LeafConnection(EventBus eventBus, Endpoint endpoint) { - super(eventBus, endpoint, true); + public LeafConnection(EventBus eventBus, Endpoint endpoint, HostCache hostCache) { + super(eventBus, endpoint, true, hostCache); } @Override diff --git a/core/src/main/groovy/com/muwire/core/connection/PeerConnection.groovy b/core/src/main/groovy/com/muwire/core/connection/PeerConnection.groovy index 71803076..39094c88 100644 --- a/core/src/main/groovy/com/muwire/core/connection/PeerConnection.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/PeerConnection.groovy @@ -4,35 +4,73 @@ import java.io.InputStream import java.io.OutputStream import com.muwire.core.EventBus +import com.muwire.core.hostcache.HostCache +import com.muwire.core.util.DataUtil +import groovy.json.JsonOutput +import groovy.json.JsonSlurper +import groovy.util.logging.Log import net.i2p.data.Destination /** * This side is an ultrapeer and the remote is an ultrapeer too * @author zab */ +@Log class PeerConnection extends Connection { private final DataInputStream dis private final DataOutputStream dos + + private final byte[] readHeader = new byte[3] + private final byte[] writeHeader = new byte[3] + + private final JsonSlurper slurper = new JsonSlurper() public PeerConnection(EventBus eventBus, Endpoint endpoint, - boolean incoming) { - super(eventBus, endpoint, incoming) + boolean incoming, HostCache hostCache) { + super(eventBus, endpoint, incoming, hostCache) this.dis = new DataInputStream(endpoint.inputStream) this.dos = new DataOutputStream(endpoint.outputStream) } @Override protected void read() { - // TODO Auto-generated method stub + dis.readFully(readHeader) + int length = DataUtil.readLength(readHeader) + log.fine("$name read length $length") + byte[] payload = new byte[length] + dis.readFully(payload) + + if (readHeader[0] & 0x80 == 0x80) { + // TODO process binary + } else { + def json = slurper.parse(payload) + if (json.type == null) + throw new Exception("missing json type") + switch(json.type) { + case "Ping" : handlePing(); break; + case "Pong" : handlePong(json); break; + default : + throw new Exception("unknown json type ${json.type}") + } + } } @Override protected void write(Object message) { - // TODO Auto-generated method stub + byte [] payload + if (message instanceof Map) { + payload = JsonOutput.toJson(message) + DataUtil.packHeader(payload.bytes.length, writeHeader) + writeHeader[0] &= 0x7F + } else { + // TODO: write binary + } + dos.write(writeHeader) + dos.write(payload) } } diff --git a/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnection.groovy b/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnection.groovy index cc193a7a..5dc063f0 100644 --- a/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnection.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnection.groovy @@ -4,6 +4,7 @@ import java.io.InputStream import java.io.OutputStream import com.muwire.core.EventBus +import com.muwire.core.hostcache.HostCache import net.i2p.data.Destination @@ -15,8 +16,8 @@ import net.i2p.data.Destination */ class UltrapeerConnection extends Connection { - public UltrapeerConnection(EventBus eventBus, Endpoint endpoint) { - super(eventBus, endpoint, false) + public UltrapeerConnection(EventBus eventBus, Endpoint endpoint, HostCache hostCache) { + super(eventBus, endpoint, false, hostCache) } @Override