From ef39ca39437777df3ac14060e18825abd33f3e21 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Sun, 26 May 2019 13:09:46 +0100 Subject: [PATCH] close i2p socket when closing connections --- .../com/muwire/core/connection/Connection.groovy | 2 ++ .../core/connection/ConnectionAcceptor.groovy | 2 +- .../core/connection/ConnectionEstablisher.groovy | 4 ++-- .../com/muwire/core/connection/Endpoint.groovy | 7 ++++++- .../com/muwire/core/connection/I2PAcceptor.groovy | 2 +- .../com/muwire/core/connection/I2PConnector.groovy | 2 +- .../core/connection/ConnectionAcceptorTest.groovy | 14 +++++++------- .../connection/ConnectionEstablisherTest.groovy | 8 ++++---- .../com/muwire/core/hostcache/HostCacheTest.groovy | 6 +++--- 9 files changed, 27 insertions(+), 20 deletions(-) diff --git a/core/src/main/groovy/com/muwire/core/connection/Connection.groovy b/core/src/main/groovy/com/muwire/core/connection/Connection.groovy index 19293a04..cfb6f270 100644 --- a/core/src/main/groovy/com/muwire/core/connection/Connection.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/Connection.groovy @@ -69,6 +69,8 @@ abstract class Connection implements Closeable { log.log(Level.WARNING, "$name already closed", new Exception() ) return } + log.info("closing $name") + endpoint.close() reader.interrupt() writer.interrupt() eventBus.publish(new DisconnectionEvent(destination: endpoint.destination)) diff --git a/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy b/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy index 4a33d05c..c8bb8087 100644 --- a/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy @@ -142,7 +142,7 @@ class ConnectionAcceptor { log.info("accepting connection, leaf:$leaf") e.outputStream.write("OK".bytes) e.outputStream.flush() - def wrapped = new Endpoint(e.destination, new InflaterInputStream(e.inputStream), new DeflaterOutputStream(e.outputStream, true)) + def wrapped = new Endpoint(e.destination, new InflaterInputStream(e.inputStream), new DeflaterOutputStream(e.outputStream, true), e.toClose) eventBus.publish(new ConnectionEvent(endpoint: wrapped, incoming: true, leaf: leaf, status: ConnectionAttemptStatus.SUCCESSFUL)) } else { log.info("rejecting connection, leaf:$leaf") diff --git a/core/src/main/groovy/com/muwire/core/connection/ConnectionEstablisher.groovy b/core/src/main/groovy/com/muwire/core/connection/ConnectionEstablisher.groovy index 2efae001..87d8b0db 100644 --- a/core/src/main/groovy/com/muwire/core/connection/ConnectionEstablisher.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/ConnectionEstablisher.groovy @@ -110,7 +110,7 @@ class ConnectionEstablisher { } } catch (Exception e) { log.log(Level.WARNING, "Couldn't connect to ${toTry.toBase32()}", e) - def endpoint = new Endpoint(toTry, null, null) + def endpoint = new Endpoint(toTry, null, null, null) fail(endpoint) } finally { inProgress.remove(toTry) @@ -133,7 +133,7 @@ class ConnectionEstablisher { log.info("connection to ${e.destination.toBase32()} established") // wrap into deflater / inflater streams and publish - def wrapped = new Endpoint(e.destination, new InflaterInputStream(e.inputStream), new DeflaterOutputStream(e.outputStream, true)) + def wrapped = new Endpoint(e.destination, new InflaterInputStream(e.inputStream), new DeflaterOutputStream(e.outputStream, true), e.toClose) eventBus.publish(new ConnectionEvent(endpoint: wrapped, incoming: false, leaf: false, status: ConnectionAttemptStatus.SUCCESSFUL)) } diff --git a/core/src/main/groovy/com/muwire/core/connection/Endpoint.groovy b/core/src/main/groovy/com/muwire/core/connection/Endpoint.groovy index 5e4db6b2..3bc140d6 100644 --- a/core/src/main/groovy/com/muwire/core/connection/Endpoint.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/Endpoint.groovy @@ -10,13 +10,15 @@ class Endpoint implements Closeable { final Destination destination final InputStream inputStream final OutputStream outputStream + final def toClose private final AtomicBoolean closed = new AtomicBoolean() - Endpoint(Destination destination, InputStream inputStream, OutputStream outputStream) { + Endpoint(Destination destination, InputStream inputStream, OutputStream outputStream, def toClose) { this.destination = destination this.inputStream = inputStream this.outputStream = outputStream + this.toClose = toClose } @Override @@ -31,6 +33,9 @@ class Endpoint implements Closeable { if (outputStream != null) { try {outputStream.close()} catch (Exception ignore) {} } + if (toClose != null) { + try {toClose.close()} catch (Exception ignore) {} + } } @Override diff --git a/core/src/main/groovy/com/muwire/core/connection/I2PAcceptor.groovy b/core/src/main/groovy/com/muwire/core/connection/I2PAcceptor.groovy index 6e4bc21d..f6335c48 100644 --- a/core/src/main/groovy/com/muwire/core/connection/I2PAcceptor.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/I2PAcceptor.groovy @@ -17,6 +17,6 @@ class I2PAcceptor { Endpoint accept() { def socket = serverSocket.accept() - new Endpoint(socket.getPeerDestination(), socket.getInputStream(), socket.getOutputStream()) + new Endpoint(socket.getPeerDestination(), socket.getInputStream(), socket.getOutputStream(), socket) } } diff --git a/core/src/main/groovy/com/muwire/core/connection/I2PConnector.groovy b/core/src/main/groovy/com/muwire/core/connection/I2PConnector.groovy index 0b5c6004..aecf547f 100644 --- a/core/src/main/groovy/com/muwire/core/connection/I2PConnector.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/I2PConnector.groovy @@ -15,7 +15,7 @@ class I2PConnector { Endpoint connect(Destination dest) { def socket = socketManager.connect(dest) - new Endpoint(dest, socket.getInputStream(), socket.getOutputStream()) + new Endpoint(dest, socket.getInputStream(), socket.getOutputStream(), socket) } } diff --git a/core/src/test/groovy/com/muwire/core/connection/ConnectionAcceptorTest.groovy b/core/src/test/groovy/com/muwire/core/connection/ConnectionAcceptorTest.groovy index bfefd5c0..4d4da4f5 100644 --- a/core/src/test/groovy/com/muwire/core/connection/ConnectionAcceptorTest.groovy +++ b/core/src/test/groovy/com/muwire/core/connection/ConnectionAcceptorTest.groovy @@ -98,7 +98,7 @@ class ConnectionAcceptorTest { outputStream = new PipedOutputStream(is) def os = new PipedOutputStream() inputStream = new PipedInputStream(os) - new Endpoint(destinations.dest1, is, os) + new Endpoint(destinations.dest1, is, os, null) } i2pAcceptorMock.demand.accept { Thread.sleep(Integer.MAX_VALUE) } connectionManagerMock.demand.hasLeafSlots() { true } @@ -136,7 +136,7 @@ class ConnectionAcceptorTest { outputStream = new PipedOutputStream(is) def os = new PipedOutputStream() inputStream = new PipedInputStream(os) - new Endpoint(destinations.dest1, is, os) + new Endpoint(destinations.dest1, is, os, null) } i2pAcceptorMock.demand.accept { Thread.sleep(Integer.MAX_VALUE) } connectionManagerMock.demand.hasPeerSlots() { true } @@ -174,7 +174,7 @@ class ConnectionAcceptorTest { outputStream = new PipedOutputStream(is) def os = new PipedOutputStream() inputStream = new PipedInputStream(os) - new Endpoint(destinations.dest1, is, os) + new Endpoint(destinations.dest1, is, os, null) } i2pAcceptorMock.demand.accept { Thread.sleep(Integer.MAX_VALUE) } trustServiceMock.demand.getLevel { dest -> @@ -210,7 +210,7 @@ class ConnectionAcceptorTest { outputStream = new PipedOutputStream(is) def os = new PipedOutputStream() inputStream = new PipedInputStream(os) - new Endpoint(destinations.dest1, is, os) + new Endpoint(destinations.dest1, is, os, null) } i2pAcceptorMock.demand.accept { Thread.sleep(Integer.MAX_VALUE) } trustServiceMock.demand.getLevel { dest -> @@ -246,7 +246,7 @@ class ConnectionAcceptorTest { outputStream = new PipedOutputStream(is) def os = new PipedOutputStream() inputStream = new PipedInputStream(os) - new Endpoint(destinations.dest1, is, os) + new Endpoint(destinations.dest1, is, os, null) } i2pAcceptorMock.demand.accept { Thread.sleep(Integer.MAX_VALUE) } connectionManagerMock.demand.hasPeerSlots() { false } @@ -288,7 +288,7 @@ class ConnectionAcceptorTest { outputStream = new PipedOutputStream(is) def os = new PipedOutputStream() inputStream = new PipedInputStream(os) - new Endpoint(destinations.dest1, is, os) + new Endpoint(destinations.dest1, is, os, null) } i2pAcceptorMock.demand.accept { Thread.sleep(Integer.MAX_VALUE) } connectionManagerMock.demand.hasLeafSlots() { false } @@ -330,7 +330,7 @@ class ConnectionAcceptorTest { outputStream = new PipedOutputStream(is) def os = new PipedOutputStream() inputStream = new PipedInputStream(os) - new Endpoint(destinations.dest1, is, os) + new Endpoint(destinations.dest1, is, os, null) } i2pAcceptorMock.demand.accept { Thread.sleep(Integer.MAX_VALUE) } connectionManagerMock.demand.hasPeerSlots() { false } diff --git a/core/src/test/groovy/com/muwire/core/connection/ConnectionEstablisherTest.groovy b/core/src/test/groovy/com/muwire/core/connection/ConnectionEstablisherTest.groovy index 9403ac40..1122ed66 100644 --- a/core/src/test/groovy/com/muwire/core/connection/ConnectionEstablisherTest.groovy +++ b/core/src/test/groovy/com/muwire/core/connection/ConnectionEstablisherTest.groovy @@ -126,7 +126,7 @@ class ConnectionEstablisherTest { inputStream = new DataInputStream(new PipedInputStream(os)) PipedInputStream is = new PipedInputStream() outputStream = new DataOutputStream(new PipedOutputStream(is)) - new Endpoint(dest, is, os) + new Endpoint(dest, is, os, null) } initMocks() @@ -169,7 +169,7 @@ class ConnectionEstablisherTest { inputStream = new DataInputStream(new PipedInputStream(os)) PipedInputStream is = new PipedInputStream() outputStream = new DataOutputStream(new PipedOutputStream(is)) - new Endpoint(dest, is, os) + new Endpoint(dest, is, os, null) } initMocks() @@ -211,7 +211,7 @@ class ConnectionEstablisherTest { inputStream = new DataInputStream(new PipedInputStream(os)) PipedInputStream is = new PipedInputStream() outputStream = new DataOutputStream(new PipedOutputStream(is)) - new Endpoint(dest, is, os) + new Endpoint(dest, is, os, null) } initMocks() @@ -254,7 +254,7 @@ class ConnectionEstablisherTest { inputStream = new DataInputStream(new PipedInputStream(os)) PipedInputStream is = new PipedInputStream() outputStream = new DataOutputStream(new PipedOutputStream(is)) - new Endpoint(dest, is, os) + new Endpoint(dest, is, os, null) } initMocks() diff --git a/core/src/test/groovy/com/muwire/core/hostcache/HostCacheTest.groovy b/core/src/test/groovy/com/muwire/core/hostcache/HostCacheTest.groovy index 52305524..59b3e097 100644 --- a/core/src/test/groovy/com/muwire/core/hostcache/HostCacheTest.groovy +++ b/core/src/test/groovy/com/muwire/core/hostcache/HostCacheTest.groovy @@ -143,7 +143,7 @@ class HostCacheTest { initMocks() cache.onHostDiscoveredEvent(new HostDiscoveredEvent(destination: destinations.dest1)) - def endpoint = new Endpoint(destinations.dest1, null, null) + def endpoint = new Endpoint(destinations.dest1, null, null, null) cache.onConnectionEvent( new ConnectionEvent(endpoint: endpoint, status: ConnectionAttemptStatus.FAILED)) cache.onConnectionEvent( new ConnectionEvent(endpoint: endpoint, status: ConnectionAttemptStatus.FAILED)) cache.onConnectionEvent( new ConnectionEvent(endpoint: endpoint, status: ConnectionAttemptStatus.FAILED)) @@ -161,7 +161,7 @@ class HostCacheTest { initMocks() cache.onHostDiscoveredEvent(new HostDiscoveredEvent(destination: destinations.dest1)) - def endpoint = new Endpoint(destinations.dest1, null, null) + def endpoint = new Endpoint(destinations.dest1, null, null, null) cache.onConnectionEvent( new ConnectionEvent(endpoint: endpoint, status: ConnectionAttemptStatus.FAILED)) cache.onConnectionEvent( new ConnectionEvent(endpoint: endpoint, status: ConnectionAttemptStatus.FAILED)) cache.onConnectionEvent( new ConnectionEvent(endpoint: endpoint, status: ConnectionAttemptStatus.FAILED)) @@ -186,7 +186,7 @@ class HostCacheTest { initMocks() cache.onHostDiscoveredEvent(new HostDiscoveredEvent(destination: destinations.dest1)) - def endpoint = new Endpoint(destinations.dest1, null, null) + def endpoint = new Endpoint(destinations.dest1, null, null, null) cache.onConnectionEvent( new ConnectionEvent(endpoint: endpoint, status: ConnectionAttemptStatus.SUCCESSFUL)) def rv = cache.getHosts(5)