core side of messenger

pull/53/head
Zlatin Balevsky 2020-11-03 14:52:59 +00:00
parent f486b3d7db
commit ce221e30c2
No known key found for this signature in database
GPG Key ID: A72832072D525E41
8 changed files with 204 additions and 12 deletions

View File

@ -79,6 +79,9 @@ import com.muwire.core.hostcache.HostCache
import com.muwire.core.hostcache.HostDiscoveredEvent
import com.muwire.core.hostcache.SimpleHostCache
import com.muwire.core.mesh.MeshManager
import com.muwire.core.messenger.MessageReceivedEvent
import com.muwire.core.messenger.Messenger
import com.muwire.core.messenger.UIMessageEvent
import com.muwire.core.search.BrowseManager
import com.muwire.core.search.QueryEvent
import com.muwire.core.search.ResponderCache
@ -147,6 +150,7 @@ public class Core {
final CertificateManager certificateManager
final ChatServer chatServer
final ChatManager chatManager
final Messenger messenger
final FeedManager feedManager
private final FeedClient feedClient
private final WatchedDirectoryConverter watchedDirectoryConverter
@ -480,6 +484,13 @@ public class Core {
register(DirectoryUnsharedEvent.class, directoryWatcher)
register(WatchedDirectoryConfigurationEvent.class, directoryWatcher)
}
log.info("initializing messenger")
messenger = new Messenger(eventBus, home, i2pConnector, props)
eventBus.with {
register(MessageReceivedEvent.class, messenger)
register(UIMessageEvent.class, messenger)
}
}
public void startServices() {
@ -497,6 +508,7 @@ public class Core {
feedManager.start()
feedClient.start()
trackerResponder.start()
messenger.start()
}
public void shutdown() {
@ -546,6 +558,8 @@ public class Core {
log.info("shutting down update client")
updateClient.stop()
}
log.info("shutting down messenger")
messenger.stop()
log.info("killing socket manager")
i2pSocketManager.destroySocketManager()
if (router != null) {

View File

@ -42,6 +42,8 @@ class MuWireSettings {
int defaultFeedItemsToKeep
boolean defaultFeedSequential
int messageSendInterval
int peerConnections
int leafConnections
int connectionHistory
@ -118,6 +120,9 @@ class MuWireSettings {
defaultFeedSequential = Boolean.valueOf(props.getProperty("defaultFeedSequential", "false"))
defaultFeedUpdateInterval = Long.valueOf(props.getProperty("defaultFeedUpdateInterval", "3600000"))
// messenger settings
messageSendInterval = Integer.valueOf(props.getProperty("messageSendInterval","1"))
// ultrapeer connection settings
leafConnections = Integer.valueOf(props.getProperty("leafConnections","512"))
peerConnections = Integer.valueOf(props.getProperty("peerConnections","128"))
@ -200,6 +205,9 @@ class MuWireSettings {
props.setProperty("defaultFeedSequential", String.valueOf(defaultFeedSequential))
props.setProperty("defaultFeedUpdateInterval", String.valueOf(defaultFeedUpdateInterval))
// messenger settings
props.setProperty("messageSendInterval", String.valueOf(messageSendInterval))
// ultrapeer connection settings
props.setProperty("peerConnections", String.valueOf(peerConnections))
props.setProperty("leafConnections", String.valueOf(leafConnections))

View File

@ -24,6 +24,8 @@ import com.muwire.core.filecert.CertificateManager
import com.muwire.core.filefeeds.FeedItems
import com.muwire.core.files.FileManager
import com.muwire.core.hostcache.HostCache
import com.muwire.core.messenger.MWMessage
import com.muwire.core.messenger.MessageReceivedEvent
import com.muwire.core.trust.TrustLevel
import com.muwire.core.trust.TrustService
import com.muwire.core.upload.UploadManager
@ -173,6 +175,9 @@ class ConnectionAcceptor {
case (byte)'O':
processOLLECTION(e)
break
case (byte)'L':
processETTER(e)
break
default:
throw new Exception("Invalid read $read")
}
@ -675,5 +680,32 @@ class ConnectionAcceptor {
e.close()
}
}
private void processETTER(Endpoint e) {
byte [] ETTER = "ETTER\r\n".getBytes(StandardCharsets.US_ASCII)
byte [] read = new byte[ETTER.length]
DataInputStream dis = new DataInputStream(e.getInputStream())
try {
dis.readFully(read)
if (ETTER != read)
throw new Exception("invalid ETTER")
Map<String,String> headers = DataUtil.readAllHeaders(dis)
if (headers['Version'] != 1)
throw new Exception("unrecognized version")
int count = Integer.parseInt(headers['Count'])
dis = new DataInputStream(new GZIPInputStream(dis))
count.times {
MWMessage m = new MWMessage(dis)
eventBus.publish(new MessageReceivedEvent(message : m))
}
} catch (Exception bad) {
log.log(Level.WARNING, "failed to process LETTER", bad)
} finally {
e.close()
}
}
}

View File

@ -15,6 +15,7 @@ import net.i2p.data.SigningPrivateKey
class MWMessage {
final Persona sender
final Set<Persona> recipients = new LinkedHashSet<>()
final String subject
final long timestamp
final String body
@ -25,18 +26,19 @@ class MWMessage {
private volatile byte[] payload
private volatile InfoHash infoHash
MWMessage(Persona sender, String subject, long timestamp, String body,
MWMessage(Persona sender, Set<Persona> recipients, String subject, long timestamp, String body,
Set<MWMessageAttachment> attachments, SigningPrivateKey spk) {
this.sender = sender
this.subject = subject
this.timestamp = timestamp
this.attachments = attachments
this.recipients = recipients
byte [] signablePayload = signablePayload()
Signature signature = DSAEngine.getInstance().sign(signablePayload, spk)
this.sig = signature.getData()
this.hashCode = Objects.hash(sender, subject, timestamp, body, attachments)
this.hashCode = Objects.hash(sender, recipients, subject, timestamp, body, attachments)
}
MWMessage(InputStream is) {
@ -46,6 +48,12 @@ class MWMessage {
throw new InvalidMessageException("unknown version $version")
sender = new Persona(dis)
int nRecipients = dis.readByte()
nRecipients.times {
recipients.add(new Persona(dis))
}
timestamp = dis.readLong()
def n = new Name(dis)
subject = n.name
@ -63,7 +71,7 @@ class MWMessage {
if (!verify())
throw new InvalidMessageException("verification failed")
this.hashCode = Objects.hash(sender, subject, timestamp, body, attachments)
this.hashCode = Objects.hash(sender, recipients, subject, timestamp, body, attachments)
}
private byte[] signablePayload() {
@ -72,6 +80,13 @@ class MWMessage {
daos.writeByte(Constants.MESSENGER_MESSAGE_VERSION)
sender.write(daos)
daos.writeByte(recipients.size())
recipients.each {
it.write(daos)
}
daos.writeLong(timestamp)
def n = new Name(subject)
@ -125,9 +140,10 @@ class MWMessage {
public boolean equals(Object o) {
MWMessage other = (MWMessage) o
return Objects.equals(sender, other.sender) &&
Objects.equals(subject, other.subject) &&
Objects.equals(body, other.body) &&
timestamp == other.timestamp &&
Objects.equals(attachments, other.attachments)
timestamp == other.timestamp &&
Objects.equals(subject, other.subject) &&
Objects.equals(body, other.body) &&
Objects.equals(attachments, other.attachments) &&
Objects.equals(recipients, other.recipients)
}
}

View File

@ -0,0 +1,7 @@
package com.muwire.core.messenger
import com.muwire.core.Event
class MessageReceivedEvent extends Event {
MWMessage message
}

View File

@ -0,0 +1,6 @@
package com.muwire.core.messenger
import com.muwire.core.Event
class MessageSentEvent extends Event {
}

View File

@ -1,31 +1,55 @@
package com.muwire.core.messenger
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardCopyOption
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ThreadFactory
import java.util.logging.Level
import com.muwire.core.EventBus
import com.muwire.core.MuWireSettings
import com.muwire.core.Persona
import com.muwire.core.connection.Endpoint
import com.muwire.core.connection.I2PConnector
import groovy.util.logging.Log
import net.i2p.data.Base64
@Log
class Messenger {
private static final int MAX_IN_PROCESS = 4
private final EventBus eventBus
private final File inbox, outbox, sent
private final I2PConnector connector
private final MuWireSettings settings
private final Set<MWMessage> inboxMessages = new LinkedHashSet<>()
private final Set<MWMessage> outbodMessages = new LinkedHashSet<>()
private final Set<MWMessage> outboxMessages = new LinkedHashSet<>()
private final Set<MWMessage> sentMessages = new LinkedHashSet<>()
private final Set<MWMessage> inProcess = new HashSet<>()
private final ExecutorService diskIO = Executors.newSingleThreadExecutor({ Runnable r ->
new Thread(r, "messenger-io")
new Thread(r, "messenger-disk-io")
} as ThreadFactory)
public Messenger(EventBus eventBus, File home) {
private final ExecutorService netIO = Executors.newCachedThreadPool({ Runnable r ->
new Thread(r, "messenger-net-io")
} as ThreadFactory)
private final Timer timer = new Timer()
public Messenger(EventBus eventBus, File home, I2PConnector connector, MuWireSettings settings) {
this.eventBus = eventBus
this.connector = connector
this.settings = settings
File messages = new File(home, "messages")
inbox = new File(messages, "inbox")
@ -43,13 +67,18 @@ class Messenger {
public void stop() {
diskIO.shutdown()
netIO.shutdown()
timer.cancel()
}
private void load() {
log.info("loading messages")
loadFolder(inbox, inboxMessages, "inbox")
loadFolder(outbox, inboxMessages, "outbox")
loadFolder(sent, inboxMessages, "sent")
loadFolder(outbox, outboxMessages, "outbox")
loadFolder(sent, sentMessages, "sent")
log.info("loaded messages")
long interval = settings.messageSendInterval * 60 * 1000L
timer.schedule({send()} as TimerTask, interval, interval)
}
private void loadFolder(File file, Set<MWMessage> dest, String folderName) {
@ -69,4 +98,79 @@ class Messenger {
private synchronized void addMessage(MWMessage message, Set<MWMessage> dest) {
dest.add(message)
}
synchronized void onUIMessageEvent(UIMessageEvent e) {
outboxMessages.add(e.message)
diskIO.execute({persist(e.message, outbox)})
}
private void persist(MWMessage message, File folder) {
File f = new File(folder, deriveName(message))
f.withOutputStream {
message.write(it)
}
}
private void moveToSent(MWMessage message) {
String name = deriveName(message)
File f = new File(outbox, name)
File target = new File(sent, name)
Files.move(f.toPath(), target.toPath(), StandardCopyOption.ATOMIC_MOVE)
eventBus.publish(new MessageSentEvent())
}
private static String deriveName(MWMessage message) {
String ih = Base64.encode(message.getInfoHash().getRoot())
"${ih}_${message.sender.getHumanReadableName()}_${message.timestamp}"
}
private synchronized void send() {
Iterator<MWMessage> iter = outboxMessages.iterator()
while(inProcess.size() < MAX_IN_PROCESS && iter.hasNext()) {
MWMessage candidate = iter.next()
if (inProcess.contains(candidate))
continue
inProcess.add(candidate)
netIO.execute(deliver(candidate))
}
}
private void deliver(MWMessage message) {
Set<Persona> successful = new HashSet<>()
for (Persona recipient : message.recipients) {
if (deliverTo(message, recipient))
successful.add(message)
}
if (successful.containsAll(message.recipients)) {
synchronized(this) {
inProcess.remove(message)
outboxMessages.remove(message)
sentMessages.add(message)
}
diskIO.execute({moveToSent(message)})
}
}
public synchronized void onMessageReceivedEvent(MessageReceivedEvent e) {
inboxMessages.add(e.message)
diskIO.execute({persist(e.message, inbox)})
}
private boolean deliverTo(MWMessage message, Persona recipient) {
try {
Endpoint e = connector.connect(recipient.destination)
OutputStream os = e.getOutputStream()
os.write("ETTER\r\n".getBytes(StandardCharsets.US_ASCII))
os.write("Version:1\r\n".getBytes(StandardCharsets.US_ASCII))
os.write("Count:1\r\n".getBytes(StandardCharsets.US_ASCII))
os.write("\r\n".getBytes(StandardCharsets.US_ASCII))
message.write(os)
os.flush()
os.close()
return true
} catch (Exception e) {
log.log(Level.WARNING, "failed to send message to ${recipient.getHumanReadableName()}", e)
return false
}
}
}

View File

@ -0,0 +1,5 @@
package com.muwire.core.messenger
class UIMessageEvent extends MWMessage {
MWMessage message
}