Configure deflater output streams for sync flush, flush after each message

pull/4/head
Zlatin Balevsky 2018-07-27 15:48:23 +01:00
parent a004bd430a
commit 736a4a7424
3 changed files with 8 additions and 5 deletions

View File

@ -139,7 +139,7 @@ class ConnectionAcceptor {
log.info("accepting connection, leaf:$leaf") log.info("accepting connection, leaf:$leaf")
e.outputStream.write("OK".bytes) e.outputStream.write("OK".bytes)
e.outputStream.flush() e.outputStream.flush()
def wrapped = new Endpoint(e.destination, new InflaterInputStream(e.inputStream), new DeflaterOutputStream(e.outputStream)) def wrapped = new Endpoint(e.destination, new InflaterInputStream(e.inputStream), new DeflaterOutputStream(e.outputStream, true))
eventBus.publish(new ConnectionEvent(endpoint: wrapped, incoming: true, leaf: leaf, status: ConnectionAttemptStatus.SUCCESSFUL)) eventBus.publish(new ConnectionEvent(endpoint: wrapped, incoming: true, leaf: leaf, status: ConnectionAttemptStatus.SUCCESSFUL))
} else { } else {
log.info("rejecting connection, leaf:$leaf") log.info("rejecting connection, leaf:$leaf")

View File

@ -4,6 +4,7 @@ import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors import java.util.concurrent.Executors
import java.util.concurrent.ThreadFactory import java.util.concurrent.ThreadFactory
import java.util.logging.Level import java.util.logging.Level
import java.util.zip.Deflater
import java.util.zip.DeflaterInputStream import java.util.zip.DeflaterInputStream
import java.util.zip.DeflaterOutputStream import java.util.zip.DeflaterOutputStream
import java.util.zip.InflaterInputStream import java.util.zip.InflaterInputStream
@ -132,7 +133,7 @@ class ConnectionEstablisher {
log.info("connection to ${e.destination.toBase32()} established") log.info("connection to ${e.destination.toBase32()} established")
// wrap into deflater / inflater streams and publish // wrap into deflater / inflater streams and publish
def wrapped = new Endpoint(e.destination, new InflaterInputStream(e.inputStream), new DeflaterOutputStream(e.outputStream)) def wrapped = new Endpoint(e.destination, new InflaterInputStream(e.inputStream), new DeflaterOutputStream(e.outputStream, true))
eventBus.publish(new ConnectionEvent(endpoint: wrapped, incoming: false, leaf: false, status: ConnectionAttemptStatus.SUCCESSFUL)) eventBus.publish(new ConnectionEvent(endpoint: wrapped, incoming: false, leaf: false, status: ConnectionAttemptStatus.SUCCESSFUL))
} }

View File

@ -43,7 +43,7 @@ class PeerConnection extends Connection {
byte[] payload = new byte[length] byte[] payload = new byte[length]
dis.readFully(payload) dis.readFully(payload)
if (readHeader[0] & 0x80 == 0x80) { if ((readHeader[0] & (byte)0x80) == 0x80) {
// TODO process binary // TODO process binary
} else { } else {
def json = slurper.parse(payload) def json = slurper.parse(payload)
@ -60,9 +60,10 @@ class PeerConnection extends Connection {
@Override @Override
protected void write(Object message) { protected void write(Object message) {
byte [] payload byte[] payload
if (message instanceof Map) { if (message instanceof Map) {
payload = JsonOutput.toJson(message) log.fine "$name writing message type ${message.type}"
payload = JsonOutput.toJson(message).bytes
DataUtil.packHeader(payload.length, writeHeader) DataUtil.packHeader(payload.length, writeHeader)
writeHeader[0] &= 0x7F writeHeader[0] &= 0x7F
} else { } else {
@ -71,6 +72,7 @@ class PeerConnection extends Connection {
dos.write(writeHeader) dos.write(writeHeader)
dos.write(payload) dos.write(payload)
dos.flush()
} }
} }