diff --git a/core/src/main/groovy/com/muwire/core/connection/Connection.groovy b/core/src/main/groovy/com/muwire/core/connection/Connection.groovy index db96054c..23ea2001 100644 --- a/core/src/main/groovy/com/muwire/core/connection/Connection.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/Connection.groovy @@ -26,7 +26,7 @@ abstract class Connection implements Closeable { protected final String name - long lastPingSentTime, lastPingReceivedTime + long lastPingSentTime, lastPongReceivedTime Connection(EventBus eventBus, Endpoint endpoint, boolean incoming, HostCache hostCache) { this.eventBus = eventBus @@ -105,6 +105,7 @@ abstract class Connection implements Closeable { ping.type = "Ping" ping.version = 1 messages.put(ping) + lastPingSentTime = System.currentTimeMillis() } protected void handlePing() { @@ -118,6 +119,7 @@ abstract class Connection implements Closeable { protected void handlePong(def pong) { log.fine("$name received pong") + lastPongReceivedTime = System.currentTimeMillis() if (pong.pongs == null) throw new Exception("Pong doesn't have pongs") pong.pongs.each { diff --git a/core/src/main/groovy/com/muwire/core/connection/ConnectionManager.groovy b/core/src/main/groovy/com/muwire/core/connection/ConnectionManager.groovy index 8113baa3..a0d9850b 100644 --- a/core/src/main/groovy/com/muwire/core/connection/ConnectionManager.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/ConnectionManager.groovy @@ -7,13 +7,27 @@ import com.muwire.core.trust.TrustLevel import net.i2p.data.Destination abstract class ConnectionManager { + + private static final int PING_TIME = 20000 final EventBus eventBus + private final Timer timer + ConnectionManager() {} ConnectionManager(EventBus eventBus) { this.eventBus = eventBus + this.timer = new Timer("connections-pinger",true) + } + + void start() { + timer.schedule({sendPings()} as TimerTask, 1000,1000) + } + + void stop() { + timer.cancel() + getConnections().each { it.close() } } void onTrustEvent(TrustEvent e) { @@ -34,4 +48,12 @@ abstract class ConnectionManager { abstract boolean isConnected(Destination d) abstract void onConnectionEvent(ConnectionEvent e) + + private void sendPings() { + final long now = System.currentTimeMillis() + getConnections().each { + if (now - it.lastPingSentTime > PING_TIME) + it.sendPing() + } + } } diff --git a/core/src/main/groovy/com/muwire/core/connection/LeafConnectionManager.groovy b/core/src/main/groovy/com/muwire/core/connection/LeafConnectionManager.groovy index ef02f04e..ad152142 100644 --- a/core/src/main/groovy/com/muwire/core/connection/LeafConnectionManager.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/LeafConnectionManager.groovy @@ -27,8 +27,7 @@ class LeafConnectionManager extends ConnectionManager { @Override public Collection getConnections() { - // TODO implement - [] + connections.values() } @Override @@ -47,9 +46,12 @@ class LeafConnectionManager extends ConnectionManager { log.severe("Got inconsistent event as a leaf! $e") return } + if (e.status != ConnectionAttemptStatus.SUCCESSFUL) + return + Connection c = new UltrapeerConnection(eventBus, e.endpoint) - // TODO: start and stuff connections.put(e.endpoint.destination, c) + c.start() } } diff --git a/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnectionManager.groovy b/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnectionManager.groovy index a8d5d105..0065605c 100644 --- a/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnectionManager.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnectionManager.groovy @@ -31,19 +31,20 @@ class UltrapeerConnectionManager extends ConnectionManager { @Override public Collection getConnections() { - // TODO implement - [] + def rv = new ArrayList(peerConnections.size() + leafConnections.size()) + rv.addAll(peerConnections.values()) + rv.addAll(leafConnections.values()) + rv } boolean hasLeafSlots() { - // TODO implement - true + leafConnections.size() < maxLeafs } boolean hasPeerSlots() { - // TODO implement - true + peerConnections.size() < maxPeers } + @Override protected int getDesiredConnections() { return maxPeers / 2; @@ -60,9 +61,12 @@ class UltrapeerConnectionManager extends ConnectionManager { return } + if (e.status != ConnectionAttemptStatus.SUCCESSFUL) + return + Connection c = e.leaf ? new LeafConnection(eventBus, e.endpoint) : new PeerConnection(eventBus, e.endpoint, e.incoming) - // TODO: start and stuff def map = e.leaf ? leafConnections : peerConnections map.put(e.endpoint.destination, c) + c.start() } }