From cfe1fe98b0f7248eef3878ba20a15186bf1704e0 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Wed, 26 May 2021 07:46:25 +0100 Subject: [PATCH] wip on switching to binary pongs --- .../muwire/core/connection/Connection.groovy | 29 +++++++-- .../muwire/core/connection/MessageUtil.groovy | 63 +++++++++++++++++++ .../core/connection/PeerConnection.groovy | 38 ++++++----- 3 files changed, 111 insertions(+), 19 deletions(-) create mode 100644 core/src/main/groovy/com/muwire/core/connection/MessageUtil.groovy diff --git a/core/src/main/groovy/com/muwire/core/connection/Connection.groovy b/core/src/main/groovy/com/muwire/core/connection/Connection.groovy index 3594c0a0..0c6e6708 100644 --- a/core/src/main/groovy/com/muwire/core/connection/Connection.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/Connection.groovy @@ -33,6 +33,9 @@ abstract class Connection implements Closeable { private static final int SEARCHES = 10 private static final long INTERVAL = 1000 + + private static final MAX_PONGS_V1 = 2 + private static final MAX_PONGS_V2 = 3 final EventBus eventBus final Endpoint endpoint @@ -133,7 +136,7 @@ abstract class Connection implements Closeable { void sendPing() { def ping = [:] ping.type = "Ping" - ping.version = 1 + ping.version = 2 lastPingUUID = UUID.randomUUID() ping.uuid = lastPingUUID.toString() messages.put(ping) @@ -166,18 +169,33 @@ abstract class Connection implements Closeable { } protected void handlePing(def ping) { - log.fine("$name received ping") + log.fine("$name received ping version ${ping.version}") + if (ping.version < 2) + handlePingV1(ping) + else + handlePingV2(ping) + } + + private void handlePingV1(def ping) { def pong = [:] pong.type = "Pong" pong.version = 1 if (ping.uuid != null) pong.uuid = ping.uuid - pong.pongs = hostCache.getGoodHosts(2).collect { d -> d.toBase64() } + pong.pongs = hostCache.getGoodHosts(MAX_PONGS_V1).collect { d -> d.toBase64() } messages.put(pong) } + + private void handlePingV2(def ping) { + if (ping.uuid == null) + throw new Exception("Ping V2 without an UUID") + UUID uuid = UUID.fromString(ping.uuid) + byte [] pongPayload = MessageUtil.createPongV2(uuid, hostCache.getGoodHosts(MAX_PONGS_V2)) + messages.put(pongPayload) + } protected void handlePong(def pong) { - log.fine("$name received pong") + log.fine("$name received pong version ${pong.version}") lastPongReceivedTime = System.currentTimeMillis() if (pong.pongs == null) throw new Exception("Pong doesn't have pongs") @@ -197,7 +215,8 @@ abstract class Connection implements Closeable { } lastPingUUID = null - pong.pongs.stream().limit(2).forEach { + int limit = pong.version == 1 ? MAX_PONGS_V1 : MAX_PONGS_V2 + pong.pongs.stream().limit(limit).forEach { def dest = new Destination(it) eventBus.publish(new HostDiscoveredEvent(destination: dest)) } diff --git a/core/src/main/groovy/com/muwire/core/connection/MessageUtil.groovy b/core/src/main/groovy/com/muwire/core/connection/MessageUtil.groovy new file mode 100644 index 00000000..416f03cc --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/connection/MessageUtil.groovy @@ -0,0 +1,63 @@ +package com.muwire.core.connection + +import net.i2p.data.Destination + +class MessageUtil { + + private static final byte PONG = (byte)1 + + static byte [] createPongV2(UUID uuid, List destinations) { + def baos = new ByteArrayOutputStream() + def daos = new DataOutputStream(baos) + daos.writeByte(PONG) + daos.writeByte((byte)2) + daos.writeLong(uuid.mostSignificantBits) + daos.writeLong(uuid.leastSignificantBits) + daos.writeByte((byte) destinations.size()) + destinations.each { + it.writeBytes(daos) + } + daos.close() + baos.toByteArray() + } + + static def parseBinaryMessage(byte [] payload) { + def bais = new ByteArrayInputStream(payload) + byte type = (byte)(bais.read() & 0xFF) + switch(type) { + case PONG: + return parsePong(bais) + default: + throw new Exception("unknown binary message type ${type}") + } + } + + private static def parsePong(InputStream is) { + byte version = (byte)(is.read() & 0xFF) + if (version == (byte)2) + return parsePongV2(is) + throw new Exception("Unknown pong version ${version}") + } + + private static def parsePongV2(InputStream is) { + def rv = [:] + def dis = new DataInputStream(is) + + long msb = dis.readLong() + long lsb = dis.readLong() + UUID uuid = new UUID(msb, lsb) + rv.uuid = uuid.toString() + + byte count = dis.readByte() + List destinations = new ArrayList<>(count) + count.times { + destinations << Destination.create(dis) + } + rv.pongs = destinations + + rv.type = "Pong" + rv.version = 2 + + rv + } +} diff --git a/core/src/main/groovy/com/muwire/core/connection/PeerConnection.groovy b/core/src/main/groovy/com/muwire/core/connection/PeerConnection.groovy index a133a4fd..a4803347 100644 --- a/core/src/main/groovy/com/muwire/core/connection/PeerConnection.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/PeerConnection.groovy @@ -20,6 +20,8 @@ import net.i2p.data.Destination */ @Log class PeerConnection extends Connection { + + private static final int MAX_PAYLOAD_SIZE = 0x1 << 20 private final DataInputStream dis private final DataOutputStream dos @@ -42,23 +44,27 @@ class PeerConnection extends Connection { dis.readFully(readHeader) int length = DataUtil.readLength(readHeader) log.fine("$name read length $length") + + if (length > MAX_PAYLOAD_SIZE) + throw new Exception("Rejecting large message $length") byte[] payload = new byte[length] dis.readFully(payload) + def json if ((readHeader[0] & (byte)0x80) == 0x80) { - // TODO process binary + json = MessageUtil.parseBinaryMessage(payload) } else { - def json = slurper.parse(payload) - if (json.type == null) - throw new Exception("missing json type") - switch(json.type) { - case "Ping" : handlePing(json); break; - case "Pong" : handlePong(json); break; - case "Search": handleSearch(json); break - default : - throw new Exception("unknown json type ${json.type}") - } + json = slurper.parse(payload) + } + if (json.type == null) + throw new Exception("missing json type") + switch(json.type) { + case "Ping" : handlePing(json); break; + case "Pong" : handlePong(json); break; + case "Search": handleSearch(json); break + default : + throw new Exception("unknown json type ${json.type}") } } @@ -70,9 +76,13 @@ class PeerConnection extends Connection { DataUtil.packHeader(payload.length, writeHeader) log.fine "$name writing message type ${message.type} length $payload.length" writeHeader[0] &= (byte)0x7F - } else { - // TODO: write binary - } + } else if (message instanceof byte[]) { + payload = (byte[]) message + DataUtil.packHeader(payload.length, writeHeader) + log.fine "$name writing binary message length ${payload.length}" + writeHeader[0] |= (byte)80 + } else + throw new IllegalArgumentException() dos.write(writeHeader) dos.write(payload)