move the connections closing to a separate threadpool and limit the time we wait for reset() to complete

pull/24/head
Zlatin Balevsky 2019-10-29 09:01:41 +00:00
parent 6c806c4441
commit d7c7afe2c0
5 changed files with 31 additions and 10 deletions

View File

@ -353,6 +353,7 @@ public class Core {
log.info("shutting down embedded router") log.info("shutting down embedded router")
router.shutdown(0) router.shutdown(0)
} }
log.info("shutdown complete")
} }
static main(args) { static main(args) {

View File

@ -1,7 +1,12 @@
package com.muwire.core.connection package com.muwire.core.connection
import java.util.concurrent.BlockingQueue import java.util.concurrent.BlockingQueue
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadFactory
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import java.util.logging.Level import java.util.logging.Level
@ -22,6 +27,17 @@ import net.i2p.data.Destination
@Log @Log
abstract class Connection implements Closeable { abstract class Connection implements Closeable {
/**
* This exists because the reset() method blocks for a long time
* even though the javadocs say it's non-blocking. When that gets
* fixed on the I2P side, this can be removed.
*/
private static final ExecutorService CLOSER = Executors.newCachedThreadPool( {r ->
def rv = new Thread(r,"connection closer")
rv.setDaemon(true)
rv
} as ThreadFactory)
private static final int SEARCHES = 10 private static final int SEARCHES = 10
private static final long INTERVAL = 1000 private static final long INTERVAL = 1000
@ -82,7 +98,14 @@ abstract class Connection implements Closeable {
log.info("closing $name") log.info("closing $name")
reader.interrupt() reader.interrupt()
writer.interrupt() writer.interrupt()
CountDownLatch latch = new CountDownLatch(1)
CLOSER.submit({
endpoint.close() endpoint.close()
latch.countDown()
log.info("closed $name")
})
latch.await(3000, TimeUnit.MILLISECONDS)
eventBus.publish(new DisconnectionEvent(destination: endpoint.destination)) eventBus.publish(new DisconnectionEvent(destination: endpoint.destination))
} }
@ -91,6 +114,7 @@ abstract class Connection implements Closeable {
while(running.get()) { while(running.get()) {
read() read()
} }
} catch (InterruptedException ok) {
} catch (SocketTimeoutException e) { } catch (SocketTimeoutException e) {
} catch (Exception e) { } catch (Exception e) {
log.log(Level.WARNING,"unhandled exception in reader",e) log.log(Level.WARNING,"unhandled exception in reader",e)
@ -107,6 +131,7 @@ abstract class Connection implements Closeable {
def message = messages.take() def message = messages.take()
write(message) write(message)
} }
} catch (InterruptedException ok) {
} catch (Exception e) { } catch (Exception e) {
log.log(Level.WARNING, "unhandled exception in writer",e) log.log(Level.WARNING, "unhandled exception in writer",e)
} finally { } finally {

View File

@ -36,9 +36,8 @@ abstract class ConnectionManager {
timer.schedule({sendPings()} as TimerTask, 1000,1000) timer.schedule({sendPings()} as TimerTask, 1000,1000)
} }
void stop() { void shutdown() {
timer.cancel() timer.cancel()
getConnections().each { it.close() }
} }
void onTrustEvent(TrustEvent e) { void onTrustEvent(TrustEvent e) {
@ -62,8 +61,6 @@ abstract class ConnectionManager {
abstract void onDisconnectionEvent(DisconnectionEvent e) abstract void onDisconnectionEvent(DisconnectionEvent e)
abstract void shutdown()
protected void sendPings() { protected void sendPings() {
final long now = System.currentTimeMillis() final long now = System.currentTimeMillis()
getConnections().each { getConnections().each {

View File

@ -104,6 +104,7 @@ class UltrapeerConnectionManager extends ConnectionManager {
@Override @Override
void shutdown() { void shutdown() {
super.shutdown()
peerConnections.values().stream().parallel().forEach({v -> v.close()}) peerConnections.values().stream().parallel().forEach({v -> v.close()})
leafConnections.values().stream().parallel().forEach({v -> v.close()}) leafConnections.values().stream().parallel().forEach({v -> v.close()})
peerConnections.clear() peerConnections.clear()

View File

@ -18,11 +18,8 @@ class Shutdown extends AbstractLifecycleHandler {
@Override @Override
void execute() { void execute() {
log.info("shutting down") log.info("shutting down from lifecycle")
Core core = application.context.get("core") Core core = application.context.get("core")
if (core != null) { core?.shutdown()
Thread t = new Thread({ core.shutdown() } as Runnable)
t.start()
}
} }
} }