diff --git a/core/src/main/groovy/com/muwire/core/Core.groovy b/core/src/main/groovy/com/muwire/core/Core.groovy index 182e2783..9d79faff 100644 --- a/core/src/main/groovy/com/muwire/core/Core.groovy +++ b/core/src/main/groovy/com/muwire/core/Core.groovy @@ -146,14 +146,24 @@ class Core { log.info("initializing cache client") CacheClient cacheClient = new CacheClient(eventBus,hostCache, connectionManager, i2pSession, props, 10000) cacheClient.start() + + log.info("initializing connector") + I2PConnector i2pConnector = new I2PConnector(socketManager) + + log.info "initializing results sender" + ResultsSender resultsSender = new ResultsSender(eventBus, i2pConnector, me) + + log.info "initializing search manager" + SearchManager searchManager = new SearchManager(eventBus, resultsSender) + eventBus.register(QueryEvent.class, searchManager) + eventBus.register(ResultsEvent.class, searchManager) log.info("initializing acceptor") I2PAcceptor i2pAcceptor = new I2PAcceptor(socketManager) - ConnectionAcceptor acceptor = new ConnectionAcceptor(eventBus, connectionManager, props, i2pAcceptor, hostCache, trustService) + ConnectionAcceptor acceptor = new ConnectionAcceptor(eventBus, connectionManager, props, i2pAcceptor, hostCache, trustService, searchManager) acceptor.start() - log.info("initializing connector") - I2PConnector i2pConnector = new I2PConnector(socketManager) + ConnectionEstablisher connector = new ConnectionEstablisher(eventBus, i2pConnector, props, connectionManager, hostCache) connector.start() @@ -170,13 +180,6 @@ class Core { eventBus.register(FileUnsharedEvent.class, fileManager) eventBus.register(SearchEvent.class, fileManager) - log.info "initializing results sender" - ResultsSender resultsSender = new ResultsSender(eventBus, me) - - log.info "initializing search manager" - SearchManager searchManager = new SearchManager(eventBus, resultsSender) - eventBus.register(QueryEvent.class, searchManager) - eventBus.register(ResultsEvent.class, searchManager) // ... at the end, sleep or execute script if (args.length == 0) { diff --git a/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy b/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy index e34cba9b..8c88e443 100644 --- a/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy @@ -1,5 +1,6 @@ package com.muwire.core.connection +import java.nio.charset.StandardCharsets import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.logging.Level @@ -8,11 +9,17 @@ import java.util.zip.InflaterInputStream import com.muwire.core.EventBus import com.muwire.core.MuWireSettings +import com.muwire.core.Persona import com.muwire.core.hostcache.HostCache import com.muwire.core.trust.TrustLevel import com.muwire.core.trust.TrustService +import com.muwire.core.search.InvalidSearchResultException +import com.muwire.core.search.ResultsParser +import com.muwire.core.search.SearchManager +import com.muwire.core.search.UnexpectedResultsException import groovy.json.JsonOutput +import groovy.json.JsonSlurper import groovy.util.logging.Log @Log @@ -24,19 +31,21 @@ class ConnectionAcceptor { final I2PAcceptor acceptor final HostCache hostCache final TrustService trustService + final SearchManager searchManager final ExecutorService acceptorThread final ExecutorService handshakerThreads ConnectionAcceptor(EventBus eventBus, UltrapeerConnectionManager manager, MuWireSettings settings, I2PAcceptor acceptor, HostCache hostCache, - TrustService trustService) { + TrustService trustService, searchManager) { this.eventBus = eventBus this.manager = manager this.settings = settings this.acceptor = acceptor this.hostCache = hostCache this.trustService = trustService + this.searchManager = searchManager acceptorThread = Executors.newSingleThreadExecutor { r -> def rv = new Thread(r) @@ -86,11 +95,16 @@ class ConnectionAcceptor { int read = is.read() switch(read) { case (byte)'M': + if (settings.isLeaf()) + throw new IOException("Incoming connection as leaf") processMuWire(e) break case (byte)'G': processGET(e) break + case (byte)'P': + processPOST(e) + break default: throw new Exception("Invalid read $read") } @@ -109,30 +123,19 @@ class ConnectionAcceptor { throw new IOException("unexpected value $read at position $i") } } - + byte[] type = new byte[4] DataInputStream dis = new DataInputStream(e.inputStream) dis.readFully(type) - - if (settings.isLeaf()) { - if (type != "resu".bytes) { - throw new IOException("Received incoming non-results connection as leaf") - } - byte [] lts = new byte[3] - dis.readFully(lts) - if (lts != "lts".bytes) - throw new IOException("malformed results connection") - // TODO: hand-off results connection - } else { - if (type == "leaf".bytes) - handleIncoming(e, true) - else if (type == "peer".bytes) - handleIncoming(e, false) - else - throw new IOException("unknown connection type $type") - } - } - + + if (type == "leaf".bytes) + handleIncoming(e, true) + else if (type == "peer".bytes) + handleIncoming(e, false) + else + throw new IOException("unknown connection type $type") + } + private void handleIncoming(Endpoint e, boolean leaf) { boolean accept = leaf ? manager.hasLeafSlots() : manager.hasPeerSlots() if (accept) { @@ -164,5 +167,42 @@ class ConnectionAcceptor { private void processGET(Endpoint e) { // TODO: implement } + + private void processPOST(final Endpoint e) throws IOException { + byte [] ost = new byte[4] + final DataInputStream dis = new DataInputStream(e.getInputStream()) + dis.readFully(ost) + if (ost != "OST ".getBytes(StandardCharsets.US_ASCII)) + throw new IOException("Invalid POST connection") + handshakerThreads.execute({ + JsonSlurper slurper = new JsonSlurper() + try { + byte uuid = new byte[36] + dis.readFully(uuid) + UUID resultsUUID = UUID.fromString(new String(uuid, StandardCharsets.US_ASCII)) + if (!searchManager.hasLocalSearch(resultsUUID)) + throw new UnexpectedResultsException(resultsUUID.toString()) + + byte rn = new byte[2] + dis.readFully(rn) + if (rn != "\r\n".getBytes(StandardCharsets.US_ASCII)) + throw new IOException("invalid request header") + + Persona sender = new Persona(dis) + int nResults = dis.readUnsignedShort() + for (int i = 0; i < nResults; i++) { + int jsonSize = dis.readUnsignedShort() + byte [] payload = new byte[jsonSize] + dis.readFully(payload) + def json = slurper.parse(payload) + eventBus.publish(ResultsParser.parse(sender, json)) + } + } catch (IOException | UnexpectedResultsException | InvalidSearchResultException bad) { + log.warning(bad) + } finally { + e.closeQuietly() + } + } as Runnable) + } } diff --git a/core/src/main/groovy/com/muwire/core/search/InvalidSearchResultException.groovy b/core/src/main/groovy/com/muwire/core/search/InvalidSearchResultException.groovy new file mode 100644 index 00000000..f203f43d --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/search/InvalidSearchResultException.groovy @@ -0,0 +1,25 @@ +package com.muwire.core.search + +class InvalidSearchResultException extends Exception { + + public InvalidSearchResultException() { + super(); + // TODO Auto-generated constructor stub + } + + public InvalidSearchResultException(String message, Throwable cause) { + super(message, cause); + // TODO Auto-generated constructor stub + } + + public InvalidSearchResultException(String message) { + super(message); + // TODO Auto-generated constructor stub + } + + public InvalidSearchResultException(Throwable cause) { + super(cause); + // TODO Auto-generated constructor stub + } + +} diff --git a/core/src/main/groovy/com/muwire/core/search/ResultsEvent.groovy b/core/src/main/groovy/com/muwire/core/search/ResultsEvent.groovy index f3f3c3b3..059310eb 100644 --- a/core/src/main/groovy/com/muwire/core/search/ResultsEvent.groovy +++ b/core/src/main/groovy/com/muwire/core/search/ResultsEvent.groovy @@ -7,5 +7,4 @@ class ResultsEvent extends Event { SharedFile[] results UUID uuid - } diff --git a/core/src/main/groovy/com/muwire/core/search/ResultsParser.groovy b/core/src/main/groovy/com/muwire/core/search/ResultsParser.groovy new file mode 100644 index 00000000..e90d9010 --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/search/ResultsParser.groovy @@ -0,0 +1,9 @@ +package com.muwire.core.search + +import com.muwire.core.Persona + +class ResultsParser { + public static UIResultEvent parse(Persona p, def json) throws InvalidSearchResultException { + null + } +} diff --git a/core/src/main/groovy/com/muwire/core/search/ResultsSender.groovy b/core/src/main/groovy/com/muwire/core/search/ResultsSender.groovy index 72950cf0..5c3d674d 100644 --- a/core/src/main/groovy/com/muwire/core/search/ResultsSender.groovy +++ b/core/src/main/groovy/com/muwire/core/search/ResultsSender.groovy @@ -49,7 +49,7 @@ class ResultsSender { log.info("Sending $results.length results for uuid $uuid to ${target.toBase32()}") if (target.equals(me.destination)) { def resultEvent = new ResultsEvent( uuid : uuid, results : results ) - def uiResultEvent = new UIResultEvent(resultsEvent : resultEvent) + def uiResultEvent = new UIResultEvent(sender: me, resultsEvent : resultEvent) eventBus.publish(uiResultEvent) } else { executor.execute(new ResultSendJob(uuid : uuid, results : results, target: target)) diff --git a/core/src/main/groovy/com/muwire/core/search/SearchManager.groovy b/core/src/main/groovy/com/muwire/core/search/SearchManager.groovy index e7ff6737..8719305e 100644 --- a/core/src/main/groovy/com/muwire/core/search/SearchManager.groovy +++ b/core/src/main/groovy/com/muwire/core/search/SearchManager.groovy @@ -12,6 +12,8 @@ public class SearchManager { private final ResultsSender resultsSender private final Map responderAddress = new HashMap<>() + SearchManager(){} + SearchManager(EventBus eventBus, ResultsSender resultsSender) { this.eventBus = eventBus this.resultsSender = resultsSender @@ -32,4 +34,8 @@ public class SearchManager { } resultsSender.sendResults(event.uuid, event.results, target) } + + boolean hasLocalSearch(UUID uuid) { + false + } } diff --git a/core/src/main/groovy/com/muwire/core/search/UIResultEvent.groovy b/core/src/main/groovy/com/muwire/core/search/UIResultEvent.groovy index 7378fc1e..be0a1882 100644 --- a/core/src/main/groovy/com/muwire/core/search/UIResultEvent.groovy +++ b/core/src/main/groovy/com/muwire/core/search/UIResultEvent.groovy @@ -1,7 +1,9 @@ package com.muwire.core.search import com.muwire.core.Event +import com.muwire.core.Persona class UIResultEvent extends Event { + Persona sender ResultsEvent resultsEvent } diff --git a/core/src/main/groovy/com/muwire/core/search/UnexpectedResultsException.groovy b/core/src/main/groovy/com/muwire/core/search/UnexpectedResultsException.groovy new file mode 100644 index 00000000..cd8a7f53 --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/search/UnexpectedResultsException.groovy @@ -0,0 +1,22 @@ +package com.muwire.core.search + +class UnexpectedResultsException extends Exception { + + public UnexpectedResultsException() { + super(); + } + + public UnexpectedResultsException(String message, Throwable cause, boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + + public UnexpectedResultsException(String message, Throwable cause) { + super(message, cause); + } + + public UnexpectedResultsException(String message) { + super(message); + } + +} diff --git a/core/src/test/groovy/com/muwire/core/connection/ConnectionAcceptorTest.groovy b/core/src/test/groovy/com/muwire/core/connection/ConnectionAcceptorTest.groovy index df5241e1..bfefd5c0 100644 --- a/core/src/test/groovy/com/muwire/core/connection/ConnectionAcceptorTest.groovy +++ b/core/src/test/groovy/com/muwire/core/connection/ConnectionAcceptorTest.groovy @@ -14,6 +14,7 @@ import com.muwire.core.MuWireSettings import com.muwire.core.hostcache.HostCache import com.muwire.core.trust.TrustLevel import com.muwire.core.trust.TrustService +import com.muwire.core.search.SearchManager import groovy.json.JsonSlurper import groovy.mock.interceptor.MockFor @@ -35,6 +36,9 @@ class ConnectionAcceptorTest { def trustServiceMock TrustService trustService + + def searchManagerMock + SearchManager searchManager ConnectionAcceptor acceptor List connectionEvents @@ -47,6 +51,7 @@ class ConnectionAcceptorTest { i2pAcceptorMock = new MockFor(I2PAcceptor.class) hostCacheMock = new MockFor(HostCache.class) trustServiceMock = new MockFor(TrustService.class) + searchManagerMock = new MockFor(SearchManager.class) } @After @@ -56,6 +61,7 @@ class ConnectionAcceptorTest { i2pAcceptorMock.verify i2pAcceptor hostCacheMock.verify hostCache trustServiceMock.verify trustService + searchManagerMock.verify searchManager Thread.sleep(100) } @@ -73,8 +79,9 @@ class ConnectionAcceptorTest { i2pAcceptor = i2pAcceptorMock.proxyInstance() hostCache = hostCacheMock.proxyInstance() trustService = trustServiceMock.proxyInstance() + searchManager = searchManagerMock.proxyInstance() - acceptor = new ConnectionAcceptor(eventBus, connectionManager, settings, i2pAcceptor, hostCache, trustService) + acceptor = new ConnectionAcceptor(eventBus, connectionManager, settings, i2pAcceptor, hostCache, trustService, searchManager) acceptor.start() Thread.sleep(100) }