diff --git a/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy b/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy index 3080091d..e97e2df0 100644 --- a/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy +++ b/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy @@ -3,6 +3,7 @@ package com.muwire.core.download import com.muwire.core.connection.I2PConnector import net.i2p.data.Base64 +import net.i2p.data.Destination import com.muwire.core.EventBus import com.muwire.core.Persona @@ -16,16 +17,13 @@ public class DownloadManager { private final I2PConnector connector private final Executor executor private final File incompletes - private final String meB64 + private final Persona me public DownloadManager(EventBus eventBus, I2PConnector connector, File incompletes, Persona me) { this.eventBus = eventBus this.connector = connector this.incompletes = incompletes - - def baos = new ByteArrayOutputStream() - me.write(baos) - this.meB64 = Base64.encode(baos.toByteArray()) + this.me = me incompletes.mkdir() @@ -39,8 +37,18 @@ public class DownloadManager { public void onUIDownloadEvent(UIDownloadEvent e) { - def downloader = new Downloader(this, meB64, e.target, e.result.size, - e.result.infohash, e.result.pieceSize, connector, e.result.sender.destination, + + def size = e.result[0].size + def infohash = e.result[0].infohash + def pieceSize = e.result[0].pieceSize + + Set destinations = new HashSet<>() + e.result.each { + destinations.add(it.sender.destination) + } + + def downloader = new Downloader(this, me, e.target, size, + infohash, pieceSize, connector, destinations, incompletes) executor.execute({downloader.download()} as Runnable) eventBus.publish(new DownloadStartedEvent(downloader : downloader)) diff --git a/core/src/main/groovy/com/muwire/core/download/Downloader.groovy b/core/src/main/groovy/com/muwire/core/download/Downloader.groovy index 43dce26e..1802f872 100644 --- a/core/src/main/groovy/com/muwire/core/download/Downloader.groovy +++ b/core/src/main/groovy/com/muwire/core/download/Downloader.groovy @@ -1,8 +1,12 @@ package com.muwire.core.download import com.muwire.core.InfoHash +import com.muwire.core.Persona import com.muwire.core.connection.Endpoint +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors import java.util.logging.Level import com.muwire.core.Constants @@ -14,35 +18,41 @@ import net.i2p.data.Destination @Log public class Downloader { public enum DownloadState { CONNECTING, DOWNLOADING, FAILED, CANCELLED, FINISHED } + private enum WorkerState { CONNECTING, DOWNLOADING, FINISHED} + + private static final ExecutorService executorService = Executors.newCachedThreadPool({r -> + Thread rv = new Thread(r) + rv.setName("download worker") + rv.setDaemon(true) + rv + }) private final DownloadManager downloadManager - private final String meB64 + private final Persona me private final File file private final Pieces downloaded, claimed private final long length private final InfoHash infoHash private final int pieceSize private final I2PConnector connector - private final Destination destination + private final Set destinations private final int nPieces private final File piecesFile + private final Map activeWorkers = new ConcurrentHashMap<>() + - private Endpoint endpoint - private volatile DownloadSession currentSession - private volatile DownloadState currentState private volatile boolean cancelled - private volatile Thread downloadThread - - public Downloader(DownloadManager downloadManager, String meB64, File file, long length, InfoHash infoHash, - int pieceSizePow2, I2PConnector connector, Destination destination, + + public Downloader(DownloadManager downloadManager, Persona me, File file, long length, InfoHash infoHash, + int pieceSizePow2, I2PConnector connector, Set destinations, File incompletes) { - this.meB64 = meB64 + this.me = me this.downloadManager = downloadManager this.file = file this.infoHash = infoHash this.length = length this.connector = connector - this.destination = destination + this.destinations = destinations this.piecesFile = new File(incompletes, file.getName()+".pieces") this.pieceSize = 1 << pieceSizePow2 @@ -55,36 +65,16 @@ public class Downloader { downloaded = new Pieces(nPieces, Constants.DOWNLOAD_SEQUENTIAL_RATIO) claimed = new Pieces(nPieces) - currentState = DownloadState.CONNECTING } void download() { readPieces() - downloadThread = Thread.currentThread() - Endpoint endpoint = null - try { - endpoint = connector.connect(destination) - currentState = DownloadState.DOWNLOADING - boolean requestPerformed - while(!downloaded.isComplete()) { - currentSession = new DownloadSession(meB64, downloaded, claimed, infoHash, endpoint, file, pieceSize, length) - requestPerformed = currentSession.request() - if (!requestPerformed) - break - writePieces() + destinations.each { + if (it != me.destination) { + def worker = new DownloadWorker(it) + activeWorkers.put(it, worker) + executorService.submit(worker) } - if (requestPerformed) { - currentState = DownloadState.FINISHED - piecesFile.delete() - } else log.info("request not performed") - } catch (Exception bad) { - log.log(Level.WARNING,"Exception while downloading",bad) - if (cancelled) - currentState = DownloadState.CANCELLED - else if (currentState != DownloadState.FINISHED) - currentState = DownloadState.FAILED - } finally { - endpoint?.close() } } @@ -109,29 +99,96 @@ public class Downloader { downloaded.donePieces() } - public int positionInPiece() { - if (currentSession == null) - return 0 - currentSession.positionInPiece() - } public int speed() { - if (currentSession == null) - return 0 - currentSession.speed() + int total = 0 + activeWorkers.values().each { + total += it.speed() + } + total } public DownloadState getCurrentState() { - currentState + if (cancelled) + return DownloadState.CANCELLED + boolean allFinished = true + activeWorkers.values().each { + allFinished &= it.currentState == WorkerState.FINISHED + } + if (allFinished) { + if (downloaded.isComplete()) + return DownloadState.FINISHED + return DownloadState.FAILED + } + + // if at least one is downloading... + boolean oneDownloading = false + activeWorkers.values().each { + if (it.currentState == WorkerState.DOWNLOADING) { + oneDownloading = true + return + } + } + + if (oneDownloading) + return DownloadState.DOWNLOADING + + return DownloadState.CONNECTING } public void cancel() { cancelled = true - downloadThread?.interrupt() + activeWorkers.values().each { + it.cancel() + } } public void resume() { - currentState = DownloadState.CONNECTING downloadManager.resume(this) } + + class DownloadWorker implements Runnable { + private final Destination destination + private volatile WorkerState currentState + private volatile Thread downloadThread + private Endpoint endpoint + private volatile DownloadSession currentSession + + DownloadWorker(Destination destination) { + this.destination = destination + } + + public void run() { + downloadThread = Thread.currentThread() + currentState = WorkerState.CONNECTING + Endpoint endpoint = null + try { + endpoint = connector.connect(destination) + currentState = WorkerState.DOWNLOADING + boolean requestPerformed + while(!downloaded.isComplete()) { + currentSession = new DownloadSession(me.toBase64(), downloaded, claimed, infoHash, endpoint, file, pieceSize, length) + requestPerformed = currentSession.request() + if (!requestPerformed) + break + writePieces() + } + } catch (Exception bad) { + log.log(Level.WARNING,"Exception while downloading",bad) + } finally { + currentState = WorkerState.FINISHED + endpoint?.close() + } + } + + int speed() { + if (currentSession == null) + return 0 + currentSession.speed() + } + + void cancel() { + downloadThread?.interrupt() + } + } } diff --git a/core/src/main/groovy/com/muwire/core/download/UIDownloadEvent.groovy b/core/src/main/groovy/com/muwire/core/download/UIDownloadEvent.groovy index 86e8ff29..0dab3d18 100644 --- a/core/src/main/groovy/com/muwire/core/download/UIDownloadEvent.groovy +++ b/core/src/main/groovy/com/muwire/core/download/UIDownloadEvent.groovy @@ -5,6 +5,6 @@ import com.muwire.core.search.UIResultEvent class UIDownloadEvent extends Event { - UIResultEvent result + UIResultEvent[] result File target } diff --git a/gui/griffon-app/controllers/com/muwire/gui/MainFrameController.groovy b/gui/griffon-app/controllers/com/muwire/gui/MainFrameController.groovy index e090cba5..7a12eec9 100644 --- a/gui/griffon-app/controllers/com/muwire/gui/MainFrameController.groovy +++ b/gui/griffon-app/controllers/com/muwire/gui/MainFrameController.groovy @@ -71,8 +71,15 @@ class MainFrameController { def result = selectedResult() if (result == null) return // TODO disable button - def file = new File(application.context.get("muwire-settings").downloadLocation, result.name) - core.eventBus.publish(new UIDownloadEvent(result : result, target : file)) + + def file = new File(application.context.get("muwire-settings").downloadLocation, result.name) + + def selected = builder.getVariable("result-tabs").getSelectedComponent() + def group = selected.getClientProperty("mvc-group") + + def resultsBucket = group.model.hashBucket[result.infohash] + + core.eventBus.publish(new UIDownloadEvent(result : resultsBucket, target : file)) } @ControllerAction diff --git a/gui/griffon-app/models/com/muwire/gui/SearchTabModel.groovy b/gui/griffon-app/models/com/muwire/gui/SearchTabModel.groovy index 2966920d..7c274f06 100644 --- a/gui/griffon-app/models/com/muwire/gui/SearchTabModel.groovy +++ b/gui/griffon-app/models/com/muwire/gui/SearchTabModel.groovy @@ -21,7 +21,7 @@ class SearchTabModel { Core core String uuid def results = [] - def hashCount = [:] + def hashBucket = [:] void mvcGroupInit(Map args) { @@ -35,11 +35,12 @@ class SearchTabModel { void handleResult(UIResultEvent e) { runInsideUIAsync { - Integer count = hashCount.get(e.infohash) - if (count == null) - count = 0 - count++ - hashCount[e.infohash] = count + def bucket = hashBucket.get(e.infohash) + if (bucket == null) { + bucket = [] + hashBucket[e.infohash] = bucket + } + bucket << e results << e JTable table = builder.getVariable("results-table") diff --git a/gui/griffon-app/views/com/muwire/gui/MainFrameView.groovy b/gui/griffon-app/views/com/muwire/gui/MainFrameView.groovy index 45e12012..281b3465 100644 --- a/gui/griffon-app/views/com/muwire/gui/MainFrameView.groovy +++ b/gui/griffon-app/views/com/muwire/gui/MainFrameView.groovy @@ -104,11 +104,7 @@ class MainFrameView { int done = row.downloader.donePieces() "$done/$pieces pieces" }) - closureColumn(header: "Piece", type: String, read: { row -> - int position = row.downloader.positionInPiece() - int pieceSize = row.downloader.pieceSize // TODO: fix for last piece - "$position/$pieceSize bytes" - }) + closureColumn(header: "Sources", type: Integer, read : {row -> row.downloader.activeWorkers.size()}) closureColumn(header: "Speed (bytes/second)", type:Integer, read :{row -> row.downloader.speed()}) } } diff --git a/gui/griffon-app/views/com/muwire/gui/SearchTabView.groovy b/gui/griffon-app/views/com/muwire/gui/SearchTabView.groovy index da29b679..eb6019d8 100644 --- a/gui/griffon-app/views/com/muwire/gui/SearchTabView.groovy +++ b/gui/griffon-app/views/com/muwire/gui/SearchTabView.groovy @@ -31,7 +31,7 @@ class SearchTabView { tableModel(list: model.results) { closureColumn(header: "Name", type: String, read : {row -> row.name}) closureColumn(header: "Size", preferredWidth: 150, type: Long, read : {row -> row.size}) - closureColumn(header: "Sources", type : Integer, read : { row -> model.hashCount[row.infohash]}) + closureColumn(header: "Sources", type : Integer, read : { row -> model.hashBucket[row.infohash].size()}) closureColumn(header: "Sender", type: String, read : {row -> row.sender.getHumanReadableName()}) closureColumn(header: "Trust", type: String, read : {row -> model.core.trustService.getLevel(row.sender.destination)