From 6380af7d121adaaed663a77e698a358902a6cbd8 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Thu, 26 Jul 2018 23:13:23 +0100 Subject: [PATCH] skeleton of connection implementation --- .../muwire/core/connection/Connection.groovy | 77 ++++++++++++++++++- .../core/connection/LeafConnection.groovy | 12 +++ .../core/connection/PeerConnection.groovy | 17 ++++ .../connection/UltrapeerConnection.groovy | 12 +++ 4 files changed, 116 insertions(+), 2 deletions(-) diff --git a/core/src/main/groovy/com/muwire/core/connection/Connection.groovy b/core/src/main/groovy/com/muwire/core/connection/Connection.groovy index 3db9308b..bb927b19 100644 --- a/core/src/main/groovy/com/muwire/core/connection/Connection.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/Connection.groovy @@ -1,25 +1,98 @@ package com.muwire.core.connection +import java.util.concurrent.BlockingQueue +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.AtomicBoolean +import java.util.logging.Level + import com.muwire.core.EventBus +import groovy.util.logging.Log import net.i2p.data.Destination -abstract class Connection { +@Log +abstract class Connection implements Closeable { final EventBus eventBus final Endpoint endpoint final boolean incoming + private final AtomicBoolean running = new AtomicBoolean() + private final BlockingQueue messages = new LinkedBlockingQueue() + private final Thread reader, writer + + protected final String name + + long lastPingSentTime, lastPingReceivedTime + Connection(EventBus eventBus, Endpoint endpoint, boolean incoming) { this.eventBus = eventBus this.incoming = incoming this.endpoint = endpoint + + this.name = endpoint.destination.toBase32().substring(0,8) + + this.reader = new Thread({readLoop()} as Runnable) + this.reader.setName("reader-$name") + this.reader.setDaemon(true) + + this.writer = new Thread({writeLoop()} as Runnable) + this.writer.setName("writer-$name") + this.writer.setDaemon(true) } /** * starts the connection threads */ void start() { - + if (!running.compareAndSet(false, true)) { + log.log(Level.WARNING,"$name already running", new Exception()) + return + } + reader.start() + writer.start() } + + @Override + public void close() { + if (!running.compareAndSet(true, false)) { + log.log(Level.WARNING, "$name already closed", new Exception() ) + return + } + reader.interrupt() + writer.interrupt() + reader.join() + writer.join() + } + + private void readLoop() { + try { + while(running.get()) { + read() + } + } catch (Exception e) { + if (running.get()) { + log.log(Level.WARNING,"unhandled exception in reader",e) + close() + } + } + } + + protected abstract void read() + + private void writeLoop() { + try { + while(running.get()) { + def message = messages.take() + write(message) + } + } catch (Exception e) { + if (running.get()) { + log.log(Level.WARNING, "unhandled exception in writer",e) + close() + } + } + } + + protected abstract void write(def message); } diff --git a/core/src/main/groovy/com/muwire/core/connection/LeafConnection.groovy b/core/src/main/groovy/com/muwire/core/connection/LeafConnection.groovy index 91e26aa1..3f97a5d8 100644 --- a/core/src/main/groovy/com/muwire/core/connection/LeafConnection.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/LeafConnection.groovy @@ -18,4 +18,16 @@ class LeafConnection extends Connection { super(eventBus, endpoint, true); } + @Override + protected void read() { + // TODO Auto-generated method stub + + } + + @Override + protected void write(Object message) { + // TODO Auto-generated method stub + + } + } diff --git a/core/src/main/groovy/com/muwire/core/connection/PeerConnection.groovy b/core/src/main/groovy/com/muwire/core/connection/PeerConnection.groovy index 4a10d6b4..71803076 100644 --- a/core/src/main/groovy/com/muwire/core/connection/PeerConnection.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/PeerConnection.groovy @@ -12,10 +12,27 @@ import net.i2p.data.Destination * @author zab */ class PeerConnection extends Connection { + + private final DataInputStream dis + private final DataOutputStream dos public PeerConnection(EventBus eventBus, Endpoint endpoint, boolean incoming) { super(eventBus, endpoint, incoming) + this.dis = new DataInputStream(endpoint.inputStream) + this.dos = new DataOutputStream(endpoint.outputStream) + } + + @Override + protected void read() { + // TODO Auto-generated method stub + + } + + @Override + protected void write(Object message) { + // TODO Auto-generated method stub + } } diff --git a/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnection.groovy b/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnection.groovy index 567e01d4..9c696dc2 100644 --- a/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnection.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnection.groovy @@ -19,4 +19,16 @@ class UltrapeerConnection extends Connection { super(eventBus, endpoint, false) } + @Override + protected void read() { + // TODO Auto-generated method stub + + } + + @Override + protected void write(Object message) { + // TODO Auto-generated method stub + + } + }