wip on switching to binary pongs

binary-pongs
Zlatin Balevsky 2021-05-26 07:46:25 +01:00
parent 8f1b663bbd
commit cfe1fe98b0
No known key found for this signature in database
GPG Key ID: A72832072D525E41
3 changed files with 111 additions and 19 deletions

View File

@ -33,6 +33,9 @@ abstract class Connection implements Closeable {
private static final int SEARCHES = 10
private static final long INTERVAL = 1000
private static final MAX_PONGS_V1 = 2
private static final MAX_PONGS_V2 = 3
final EventBus eventBus
final Endpoint endpoint
@ -133,7 +136,7 @@ abstract class Connection implements Closeable {
void sendPing() {
def ping = [:]
ping.type = "Ping"
ping.version = 1
ping.version = 2
lastPingUUID = UUID.randomUUID()
ping.uuid = lastPingUUID.toString()
messages.put(ping)
@ -166,18 +169,33 @@ abstract class Connection implements Closeable {
}
protected void handlePing(def ping) {
log.fine("$name received ping")
log.fine("$name received ping version ${ping.version}")
if (ping.version < 2)
handlePingV1(ping)
else
handlePingV2(ping)
}
private void handlePingV1(def ping) {
def pong = [:]
pong.type = "Pong"
pong.version = 1
if (ping.uuid != null)
pong.uuid = ping.uuid
pong.pongs = hostCache.getGoodHosts(2).collect { d -> d.toBase64() }
pong.pongs = hostCache.getGoodHosts(MAX_PONGS_V1).collect { d -> d.toBase64() }
messages.put(pong)
}
private void handlePingV2(def ping) {
if (ping.uuid == null)
throw new Exception("Ping V2 without an UUID")
UUID uuid = UUID.fromString(ping.uuid)
byte [] pongPayload = MessageUtil.createPongV2(uuid, hostCache.getGoodHosts(MAX_PONGS_V2))
messages.put(pongPayload)
}
protected void handlePong(def pong) {
log.fine("$name received pong")
log.fine("$name received pong version ${pong.version}")
lastPongReceivedTime = System.currentTimeMillis()
if (pong.pongs == null)
throw new Exception("Pong doesn't have pongs")
@ -197,7 +215,8 @@ abstract class Connection implements Closeable {
}
lastPingUUID = null
pong.pongs.stream().limit(2).forEach {
int limit = pong.version == 1 ? MAX_PONGS_V1 : MAX_PONGS_V2
pong.pongs.stream().limit(limit).forEach {
def dest = new Destination(it)
eventBus.publish(new HostDiscoveredEvent(destination: dest))
}

View File

@ -0,0 +1,63 @@
package com.muwire.core.connection
import net.i2p.data.Destination
class MessageUtil {
private static final byte PONG = (byte)1
static byte [] createPongV2(UUID uuid, List<Destination> destinations) {
def baos = new ByteArrayOutputStream()
def daos = new DataOutputStream(baos)
daos.writeByte(PONG)
daos.writeByte((byte)2)
daos.writeLong(uuid.mostSignificantBits)
daos.writeLong(uuid.leastSignificantBits)
daos.writeByte((byte) destinations.size())
destinations.each {
it.writeBytes(daos)
}
daos.close()
baos.toByteArray()
}
static def parseBinaryMessage(byte [] payload) {
def bais = new ByteArrayInputStream(payload)
byte type = (byte)(bais.read() & 0xFF)
switch(type) {
case PONG:
return parsePong(bais)
default:
throw new Exception("unknown binary message type ${type}")
}
}
private static def parsePong(InputStream is) {
byte version = (byte)(is.read() & 0xFF)
if (version == (byte)2)
return parsePongV2(is)
throw new Exception("Unknown pong version ${version}")
}
private static def parsePongV2(InputStream is) {
def rv = [:]
def dis = new DataInputStream(is)
long msb = dis.readLong()
long lsb = dis.readLong()
UUID uuid = new UUID(msb, lsb)
rv.uuid = uuid.toString()
byte count = dis.readByte()
List<Destination> destinations = new ArrayList<>(count)
count.times {
destinations << Destination.create(dis)
}
rv.pongs = destinations
rv.type = "Pong"
rv.version = 2
rv
}
}

View File

@ -20,6 +20,8 @@ import net.i2p.data.Destination
*/
@Log
class PeerConnection extends Connection {
private static final int MAX_PAYLOAD_SIZE = 0x1 << 20
private final DataInputStream dis
private final DataOutputStream dos
@ -42,23 +44,27 @@ class PeerConnection extends Connection {
dis.readFully(readHeader)
int length = DataUtil.readLength(readHeader)
log.fine("$name read length $length")
if (length > MAX_PAYLOAD_SIZE)
throw new Exception("Rejecting large message $length")
byte[] payload = new byte[length]
dis.readFully(payload)
def json
if ((readHeader[0] & (byte)0x80) == 0x80) {
// TODO process binary
json = MessageUtil.parseBinaryMessage(payload)
} else {
def json = slurper.parse(payload)
if (json.type == null)
throw new Exception("missing json type")
switch(json.type) {
case "Ping" : handlePing(json); break;
case "Pong" : handlePong(json); break;
case "Search": handleSearch(json); break
default :
throw new Exception("unknown json type ${json.type}")
}
json = slurper.parse(payload)
}
if (json.type == null)
throw new Exception("missing json type")
switch(json.type) {
case "Ping" : handlePing(json); break;
case "Pong" : handlePong(json); break;
case "Search": handleSearch(json); break
default :
throw new Exception("unknown json type ${json.type}")
}
}
@ -70,9 +76,13 @@ class PeerConnection extends Connection {
DataUtil.packHeader(payload.length, writeHeader)
log.fine "$name writing message type ${message.type} length $payload.length"
writeHeader[0] &= (byte)0x7F
} else {
// TODO: write binary
}
} else if (message instanceof byte[]) {
payload = (byte[]) message
DataUtil.packHeader(payload.length, writeHeader)
log.fine "$name writing binary message length ${payload.length}"
writeHeader[0] |= (byte)80
} else
throw new IllegalArgumentException()
dos.write(writeHeader)
dos.write(payload)