diff --git a/core/src/main/groovy/com/muwire/core/Core.groovy b/core/src/main/groovy/com/muwire/core/Core.groovy index 79d1cb09..d9ff1bbd 100644 --- a/core/src/main/groovy/com/muwire/core/Core.groovy +++ b/core/src/main/groovy/com/muwire/core/Core.groovy @@ -87,6 +87,7 @@ class Core { ConnectionManager connectionManager = props.isLeaf() ? new LeafConnectionManager(eventBus,3) : new UltrapeerConnectionManager(eventBus, 512, 512) eventBus.register(TrustEvent.class, connectionManager) + eventBus.register(ConnectionEvent.class, connectionManager) log.info("initializing cache client") CacheClient cacheClient = new CacheClient(eventBus,hostCache, connectionManager, i2pSession, props, 10000) diff --git a/core/src/main/groovy/com/muwire/core/connection/ConnectionEstablisher.groovy b/core/src/main/groovy/com/muwire/core/connection/ConnectionEstablisher.groovy index 6fa5b41d..9c13a79e 100644 --- a/core/src/main/groovy/com/muwire/core/connection/ConnectionEstablisher.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/ConnectionEstablisher.groovy @@ -78,7 +78,7 @@ class ConnectionEstablisher { } if (toTry == null) return - if (inProgress.add(toTry)) + if (!connectionManager.isConnected(toTry) && inProgress.add(toTry)) executor.execute({connect(toTry)} as Runnable) } diff --git a/core/src/main/groovy/com/muwire/core/connection/ConnectionManager.groovy b/core/src/main/groovy/com/muwire/core/connection/ConnectionManager.groovy index 85be51d2..8113baa3 100644 --- a/core/src/main/groovy/com/muwire/core/connection/ConnectionManager.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/ConnectionManager.groovy @@ -31,5 +31,7 @@ abstract class ConnectionManager { return getConnections().size() < getDesiredConnections() } - abstract boolean isConnected(Destination d); + abstract boolean isConnected(Destination d) + + abstract void onConnectionEvent(ConnectionEvent e) } diff --git a/core/src/main/groovy/com/muwire/core/connection/LeafConnection.groovy b/core/src/main/groovy/com/muwire/core/connection/LeafConnection.groovy index 7c1cf8b7..91e26aa1 100644 --- a/core/src/main/groovy/com/muwire/core/connection/LeafConnection.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/LeafConnection.groovy @@ -14,8 +14,8 @@ import net.i2p.data.Destination */ class LeafConnection extends Connection { - public LeafConnection(EventBus eventBus, InputStream inputStream, OutputStream outputStream, Destination remoteSide) { - super(eventBus, inputStream, outputStream, remoteSide, true); + public LeafConnection(EventBus eventBus, Endpoint endpoint) { + super(eventBus, endpoint, true); } } diff --git a/core/src/main/groovy/com/muwire/core/connection/LeafConnectionManager.groovy b/core/src/main/groovy/com/muwire/core/connection/LeafConnectionManager.groovy index 95353d3d..ef02f04e 100644 --- a/core/src/main/groovy/com/muwire/core/connection/LeafConnectionManager.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/LeafConnectionManager.groovy @@ -1,12 +1,18 @@ package com.muwire.core.connection +import java.util.concurrent.ConcurrentHashMap + import com.muwire.core.EventBus +import groovy.util.logging.Log import net.i2p.data.Destination +@Log class LeafConnectionManager extends ConnectionManager { final int maxConnections + + final Map connections = new ConcurrentHashMap() public LeafConnectionManager(EventBus eventBus, int maxConnections) { super(eventBus) @@ -32,8 +38,18 @@ class LeafConnectionManager extends ConnectionManager { @Override public boolean isConnected(Destination d) { - // TODO Auto-generated method stub - return false; + connections.containsKey(d) + } + + @Override + public void onConnectionEvent(ConnectionEvent e) { + if (e.incoming || e.leaf) { + log.severe("Got inconsistent event as a leaf! $e") + return + } + Connection c = new UltrapeerConnection(eventBus, e.endpoint) + // TODO: start and stuff + connections.put(e.endpoint.destination, c) } } diff --git a/core/src/main/groovy/com/muwire/core/connection/PeerConnection.groovy b/core/src/main/groovy/com/muwire/core/connection/PeerConnection.groovy index 42a79bbd..4a10d6b4 100644 --- a/core/src/main/groovy/com/muwire/core/connection/PeerConnection.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/PeerConnection.groovy @@ -13,9 +13,9 @@ import net.i2p.data.Destination */ class PeerConnection extends Connection { - public PeerConnection(EventBus eventBus, InputStream inputStream, OutputStream outputStream, Destination remoteSide, + public PeerConnection(EventBus eventBus, Endpoint endpoint, boolean incoming) { - super(eventBus, inputStream, outputStream, remoteSide, incoming) + super(eventBus, endpoint, incoming) } } diff --git a/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnection.groovy b/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnection.groovy index 05c76bed..567e01d4 100644 --- a/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnection.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnection.groovy @@ -15,9 +15,8 @@ import net.i2p.data.Destination */ class UltrapeerConnection extends Connection { - public UltrapeerConnection(EventBus eventBus, InputStream inputStream, OutputStream outputStream, - Destination remoteSide) { - super(eventBus, inputStream, outputStream, remoteSide, false) + public UltrapeerConnection(EventBus eventBus, Endpoint endpoint) { + super(eventBus, endpoint, false) } } diff --git a/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnectionManager.groovy b/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnectionManager.groovy index 3f597040..a8d5d105 100644 --- a/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnectionManager.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnectionManager.groovy @@ -1,15 +1,21 @@ package com.muwire.core.connection import java.util.Collection +import java.util.concurrent.ConcurrentHashMap import com.muwire.core.EventBus +import groovy.util.logging.Log import net.i2p.data.Destination +@Log class UltrapeerConnectionManager extends ConnectionManager { final int maxPeers, maxLeafs + final Map peerConnections = new ConcurrentHashMap() + final Map leafConnections = new ConcurrentHashMap() + UltrapeerConnectionManager() {} public UltrapeerConnectionManager(EventBus eventBus, int maxPeers, int maxLeafs) { @@ -44,7 +50,19 @@ class UltrapeerConnectionManager extends ConnectionManager { } @Override public boolean isConnected(Destination d) { - // TODO Auto-generated method stub - return false; + peerConnections.containsKey(d) || leafConnections.containsKey(d) + } + + @Override + public void onConnectionEvent(ConnectionEvent e) { + if (!e.incoming && e.leaf) { + log.severe("Inconsistent event $e") + return + } + + Connection c = e.leaf ? new LeafConnection(eventBus, e.endpoint) : new PeerConnection(eventBus, e.endpoint, e.incoming) + // TODO: start and stuff + def map = e.leaf ? leafConnections : peerConnections + map.put(e.endpoint.destination, c) } }