From ce221e30c252187a6bf0d096d34f68cd404e664d Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Tue, 3 Nov 2020 14:52:59 +0000 Subject: [PATCH] core side of messenger --- .../main/groovy/com/muwire/core/Core.groovy | 14 +++ .../com/muwire/core/MuWireSettings.groovy | 8 ++ .../core/connection/ConnectionAcceptor.groovy | 32 +++++ .../muwire/core/messenger/MWMessage.groovy | 30 +++-- .../messenger/MessageReceivedEvent.groovy | 7 ++ .../core/messenger/MessageSentEvent.groovy | 6 + .../muwire/core/messenger/Messenger.groovy | 114 +++++++++++++++++- .../core/messenger/UIMessageEvent.groovy | 5 + 8 files changed, 204 insertions(+), 12 deletions(-) create mode 100644 core/src/main/groovy/com/muwire/core/messenger/MessageReceivedEvent.groovy create mode 100644 core/src/main/groovy/com/muwire/core/messenger/MessageSentEvent.groovy create mode 100644 core/src/main/groovy/com/muwire/core/messenger/UIMessageEvent.groovy diff --git a/core/src/main/groovy/com/muwire/core/Core.groovy b/core/src/main/groovy/com/muwire/core/Core.groovy index 153e95e2..8644d82f 100644 --- a/core/src/main/groovy/com/muwire/core/Core.groovy +++ b/core/src/main/groovy/com/muwire/core/Core.groovy @@ -79,6 +79,9 @@ import com.muwire.core.hostcache.HostCache import com.muwire.core.hostcache.HostDiscoveredEvent import com.muwire.core.hostcache.SimpleHostCache import com.muwire.core.mesh.MeshManager +import com.muwire.core.messenger.MessageReceivedEvent +import com.muwire.core.messenger.Messenger +import com.muwire.core.messenger.UIMessageEvent import com.muwire.core.search.BrowseManager import com.muwire.core.search.QueryEvent import com.muwire.core.search.ResponderCache @@ -147,6 +150,7 @@ public class Core { final CertificateManager certificateManager final ChatServer chatServer final ChatManager chatManager + final Messenger messenger final FeedManager feedManager private final FeedClient feedClient private final WatchedDirectoryConverter watchedDirectoryConverter @@ -480,6 +484,13 @@ public class Core { register(DirectoryUnsharedEvent.class, directoryWatcher) register(WatchedDirectoryConfigurationEvent.class, directoryWatcher) } + + log.info("initializing messenger") + messenger = new Messenger(eventBus, home, i2pConnector, props) + eventBus.with { + register(MessageReceivedEvent.class, messenger) + register(UIMessageEvent.class, messenger) + } } public void startServices() { @@ -497,6 +508,7 @@ public class Core { feedManager.start() feedClient.start() trackerResponder.start() + messenger.start() } public void shutdown() { @@ -546,6 +558,8 @@ public class Core { log.info("shutting down update client") updateClient.stop() } + log.info("shutting down messenger") + messenger.stop() log.info("killing socket manager") i2pSocketManager.destroySocketManager() if (router != null) { diff --git a/core/src/main/groovy/com/muwire/core/MuWireSettings.groovy b/core/src/main/groovy/com/muwire/core/MuWireSettings.groovy index 3fdb6aec..b918fe87 100644 --- a/core/src/main/groovy/com/muwire/core/MuWireSettings.groovy +++ b/core/src/main/groovy/com/muwire/core/MuWireSettings.groovy @@ -42,6 +42,8 @@ class MuWireSettings { int defaultFeedItemsToKeep boolean defaultFeedSequential + int messageSendInterval + int peerConnections int leafConnections int connectionHistory @@ -118,6 +120,9 @@ class MuWireSettings { defaultFeedSequential = Boolean.valueOf(props.getProperty("defaultFeedSequential", "false")) defaultFeedUpdateInterval = Long.valueOf(props.getProperty("defaultFeedUpdateInterval", "3600000")) + // messenger settings + messageSendInterval = Integer.valueOf(props.getProperty("messageSendInterval","1")) + // ultrapeer connection settings leafConnections = Integer.valueOf(props.getProperty("leafConnections","512")) peerConnections = Integer.valueOf(props.getProperty("peerConnections","128")) @@ -200,6 +205,9 @@ class MuWireSettings { props.setProperty("defaultFeedSequential", String.valueOf(defaultFeedSequential)) props.setProperty("defaultFeedUpdateInterval", String.valueOf(defaultFeedUpdateInterval)) + // messenger settings + props.setProperty("messageSendInterval", String.valueOf(messageSendInterval)) + // ultrapeer connection settings props.setProperty("peerConnections", String.valueOf(peerConnections)) props.setProperty("leafConnections", String.valueOf(leafConnections)) diff --git a/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy b/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy index a746e63b..297bc2fb 100644 --- a/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy @@ -24,6 +24,8 @@ import com.muwire.core.filecert.CertificateManager import com.muwire.core.filefeeds.FeedItems import com.muwire.core.files.FileManager import com.muwire.core.hostcache.HostCache +import com.muwire.core.messenger.MWMessage +import com.muwire.core.messenger.MessageReceivedEvent import com.muwire.core.trust.TrustLevel import com.muwire.core.trust.TrustService import com.muwire.core.upload.UploadManager @@ -173,6 +175,9 @@ class ConnectionAcceptor { case (byte)'O': processOLLECTION(e) break + case (byte)'L': + processETTER(e) + break default: throw new Exception("Invalid read $read") } @@ -675,5 +680,32 @@ class ConnectionAcceptor { e.close() } } + + private void processETTER(Endpoint e) { + byte [] ETTER = "ETTER\r\n".getBytes(StandardCharsets.US_ASCII) + byte [] read = new byte[ETTER.length] + + DataInputStream dis = new DataInputStream(e.getInputStream()) + try { + dis.readFully(read) + if (ETTER != read) + throw new Exception("invalid ETTER") + + Map headers = DataUtil.readAllHeaders(dis) + if (headers['Version'] != 1) + throw new Exception("unrecognized version") + int count = Integer.parseInt(headers['Count']) + + dis = new DataInputStream(new GZIPInputStream(dis)) + count.times { + MWMessage m = new MWMessage(dis) + eventBus.publish(new MessageReceivedEvent(message : m)) + } + } catch (Exception bad) { + log.log(Level.WARNING, "failed to process LETTER", bad) + } finally { + e.close() + } + } } diff --git a/core/src/main/groovy/com/muwire/core/messenger/MWMessage.groovy b/core/src/main/groovy/com/muwire/core/messenger/MWMessage.groovy index 9a6ba881..74a23908 100644 --- a/core/src/main/groovy/com/muwire/core/messenger/MWMessage.groovy +++ b/core/src/main/groovy/com/muwire/core/messenger/MWMessage.groovy @@ -15,6 +15,7 @@ import net.i2p.data.SigningPrivateKey class MWMessage { final Persona sender + final Set recipients = new LinkedHashSet<>() final String subject final long timestamp final String body @@ -25,18 +26,19 @@ class MWMessage { private volatile byte[] payload private volatile InfoHash infoHash - MWMessage(Persona sender, String subject, long timestamp, String body, + MWMessage(Persona sender, Set recipients, String subject, long timestamp, String body, Set attachments, SigningPrivateKey spk) { this.sender = sender this.subject = subject this.timestamp = timestamp this.attachments = attachments + this.recipients = recipients byte [] signablePayload = signablePayload() Signature signature = DSAEngine.getInstance().sign(signablePayload, spk) this.sig = signature.getData() - this.hashCode = Objects.hash(sender, subject, timestamp, body, attachments) + this.hashCode = Objects.hash(sender, recipients, subject, timestamp, body, attachments) } MWMessage(InputStream is) { @@ -46,6 +48,12 @@ class MWMessage { throw new InvalidMessageException("unknown version $version") sender = new Persona(dis) + + int nRecipients = dis.readByte() + nRecipients.times { + recipients.add(new Persona(dis)) + } + timestamp = dis.readLong() def n = new Name(dis) subject = n.name @@ -63,7 +71,7 @@ class MWMessage { if (!verify()) throw new InvalidMessageException("verification failed") - this.hashCode = Objects.hash(sender, subject, timestamp, body, attachments) + this.hashCode = Objects.hash(sender, recipients, subject, timestamp, body, attachments) } private byte[] signablePayload() { @@ -72,6 +80,13 @@ class MWMessage { daos.writeByte(Constants.MESSENGER_MESSAGE_VERSION) sender.write(daos) + + daos.writeByte(recipients.size()) + recipients.each { + it.write(daos) + } + + daos.writeLong(timestamp) def n = new Name(subject) @@ -125,9 +140,10 @@ class MWMessage { public boolean equals(Object o) { MWMessage other = (MWMessage) o return Objects.equals(sender, other.sender) && - Objects.equals(subject, other.subject) && - Objects.equals(body, other.body) && - timestamp == other.timestamp && - Objects.equals(attachments, other.attachments) + timestamp == other.timestamp && + Objects.equals(subject, other.subject) && + Objects.equals(body, other.body) && + Objects.equals(attachments, other.attachments) && + Objects.equals(recipients, other.recipients) } } diff --git a/core/src/main/groovy/com/muwire/core/messenger/MessageReceivedEvent.groovy b/core/src/main/groovy/com/muwire/core/messenger/MessageReceivedEvent.groovy new file mode 100644 index 00000000..89561af7 --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/messenger/MessageReceivedEvent.groovy @@ -0,0 +1,7 @@ +package com.muwire.core.messenger + +import com.muwire.core.Event + +class MessageReceivedEvent extends Event { + MWMessage message +} diff --git a/core/src/main/groovy/com/muwire/core/messenger/MessageSentEvent.groovy b/core/src/main/groovy/com/muwire/core/messenger/MessageSentEvent.groovy new file mode 100644 index 00000000..9705cdea --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/messenger/MessageSentEvent.groovy @@ -0,0 +1,6 @@ +package com.muwire.core.messenger + +import com.muwire.core.Event + +class MessageSentEvent extends Event { +} diff --git a/core/src/main/groovy/com/muwire/core/messenger/Messenger.groovy b/core/src/main/groovy/com/muwire/core/messenger/Messenger.groovy index 52dcc303..ae898672 100644 --- a/core/src/main/groovy/com/muwire/core/messenger/Messenger.groovy +++ b/core/src/main/groovy/com/muwire/core/messenger/Messenger.groovy @@ -1,31 +1,55 @@ package com.muwire.core.messenger +import java.nio.charset.StandardCharsets import java.nio.file.Files import java.nio.file.Path +import java.nio.file.StandardCopyOption import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.concurrent.ThreadFactory +import java.util.logging.Level import com.muwire.core.EventBus +import com.muwire.core.MuWireSettings +import com.muwire.core.Persona +import com.muwire.core.connection.Endpoint +import com.muwire.core.connection.I2PConnector import groovy.util.logging.Log +import net.i2p.data.Base64 @Log class Messenger { + + private static final int MAX_IN_PROCESS = 4 + private final EventBus eventBus private final File inbox, outbox, sent + private final I2PConnector connector + private final MuWireSettings settings private final Set inboxMessages = new LinkedHashSet<>() - private final Set outbodMessages = new LinkedHashSet<>() + private final Set outboxMessages = new LinkedHashSet<>() private final Set sentMessages = new LinkedHashSet<>() + private final Set inProcess = new HashSet<>() + private final ExecutorService diskIO = Executors.newSingleThreadExecutor({ Runnable r -> - new Thread(r, "messenger-io") + new Thread(r, "messenger-disk-io") } as ThreadFactory) - public Messenger(EventBus eventBus, File home) { + private final ExecutorService netIO = Executors.newCachedThreadPool({ Runnable r -> + new Thread(r, "messenger-net-io") + } as ThreadFactory) + + private final Timer timer = new Timer() + + + public Messenger(EventBus eventBus, File home, I2PConnector connector, MuWireSettings settings) { this.eventBus = eventBus + this.connector = connector + this.settings = settings File messages = new File(home, "messages") inbox = new File(messages, "inbox") @@ -43,13 +67,18 @@ class Messenger { public void stop() { diskIO.shutdown() + netIO.shutdown() + timer.cancel() } private void load() { log.info("loading messages") loadFolder(inbox, inboxMessages, "inbox") - loadFolder(outbox, inboxMessages, "outbox") - loadFolder(sent, inboxMessages, "sent") + loadFolder(outbox, outboxMessages, "outbox") + loadFolder(sent, sentMessages, "sent") + log.info("loaded messages") + long interval = settings.messageSendInterval * 60 * 1000L + timer.schedule({send()} as TimerTask, interval, interval) } private void loadFolder(File file, Set dest, String folderName) { @@ -69,4 +98,79 @@ class Messenger { private synchronized void addMessage(MWMessage message, Set dest) { dest.add(message) } + + synchronized void onUIMessageEvent(UIMessageEvent e) { + outboxMessages.add(e.message) + diskIO.execute({persist(e.message, outbox)}) + } + + private void persist(MWMessage message, File folder) { + File f = new File(folder, deriveName(message)) + f.withOutputStream { + message.write(it) + } + } + + private void moveToSent(MWMessage message) { + String name = deriveName(message) + File f = new File(outbox, name) + File target = new File(sent, name) + Files.move(f.toPath(), target.toPath(), StandardCopyOption.ATOMIC_MOVE) + eventBus.publish(new MessageSentEvent()) + } + + private static String deriveName(MWMessage message) { + String ih = Base64.encode(message.getInfoHash().getRoot()) + "${ih}_${message.sender.getHumanReadableName()}_${message.timestamp}" + } + + private synchronized void send() { + Iterator iter = outboxMessages.iterator() + while(inProcess.size() < MAX_IN_PROCESS && iter.hasNext()) { + MWMessage candidate = iter.next() + if (inProcess.contains(candidate)) + continue + inProcess.add(candidate) + netIO.execute(deliver(candidate)) + } + } + + private void deliver(MWMessage message) { + Set successful = new HashSet<>() + for (Persona recipient : message.recipients) { + if (deliverTo(message, recipient)) + successful.add(message) + } + if (successful.containsAll(message.recipients)) { + synchronized(this) { + inProcess.remove(message) + outboxMessages.remove(message) + sentMessages.add(message) + } + diskIO.execute({moveToSent(message)}) + } + } + + public synchronized void onMessageReceivedEvent(MessageReceivedEvent e) { + inboxMessages.add(e.message) + diskIO.execute({persist(e.message, inbox)}) + } + + private boolean deliverTo(MWMessage message, Persona recipient) { + try { + Endpoint e = connector.connect(recipient.destination) + OutputStream os = e.getOutputStream() + os.write("ETTER\r\n".getBytes(StandardCharsets.US_ASCII)) + os.write("Version:1\r\n".getBytes(StandardCharsets.US_ASCII)) + os.write("Count:1\r\n".getBytes(StandardCharsets.US_ASCII)) + os.write("\r\n".getBytes(StandardCharsets.US_ASCII)) + message.write(os) + os.flush() + os.close() + return true + } catch (Exception e) { + log.log(Level.WARNING, "failed to send message to ${recipient.getHumanReadableName()}", e) + return false + } + } } \ No newline at end of file diff --git a/core/src/main/groovy/com/muwire/core/messenger/UIMessageEvent.groovy b/core/src/main/groovy/com/muwire/core/messenger/UIMessageEvent.groovy new file mode 100644 index 00000000..c844bb6b --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/messenger/UIMessageEvent.groovy @@ -0,0 +1,5 @@ +package com.muwire.core.messenger + +class UIMessageEvent extends MWMessage { + MWMessage message +}