From 3e188f5d4a22dd51b39f8206aeafe3626d35ed71 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Tue, 6 Oct 2020 22:49:46 +0100 Subject: [PATCH] Implement variable length request queue, size based on piece size --- .../muwire/core/download/Downloader.groovy | 39 +++++++++++++------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/core/src/main/groovy/com/muwire/core/download/Downloader.groovy b/core/src/main/groovy/com/muwire/core/download/Downloader.groovy index ffa8edae..01a1dfeb 100644 --- a/core/src/main/groovy/com/muwire/core/download/Downloader.groovy +++ b/core/src/main/groovy/com/muwire/core/download/Downloader.groovy @@ -62,6 +62,8 @@ public class Downloader { /** LOCKING: itself */ private final Map failingDestinations = new HashMap<>() private final int maxFailures + + private final int queueSize private volatile boolean cancelled, paused @@ -95,6 +97,14 @@ public class Downloader { this.pieces = pieces this.nPieces = pieces.nPieces this.maxFailures = maxFailures + + // base queue size on download piece size + int queueSize = 1 + if (pieceSizePow2 < 19) + queueSize++ + if (pieceSizePow2 < 18) + queueSize++ + this.queueSize = queueSize } public synchronized InfoHash getInfoHash() { @@ -386,7 +396,7 @@ public class Downloader { private volatile WorkerState currentState private volatile Thread downloadThread private Endpoint endpoint - private volatile DownloadSession currentSession, nextSession + private final LinkedList sessionQueue = new LinkedList<>() private final Set available = new HashSet<>() DownloadWorker(Destination destination) { @@ -413,25 +423,30 @@ public class Downloader { boolean requestPerformed while(!pieces.isComplete()) { - if (currentSession == null) { - currentSession = new DownloadSession(eventBus, me.toBase64(), pieces, getInfoHash(), - endpoint, incompleteFile, pieceSize, length, available, dataSinceLastRead, - browse, feed, chat) - if (!currentSession.sendRequest()) + if (sessionQueue.isEmpty()) { + boolean sentAnyRequests = false + queueSize.times { + def currentSession = new DownloadSession(eventBus, me.toBase64(), pieces, getInfoHash(), + endpoint, incompleteFile, pieceSize, length, available, dataSinceLastRead, + browse, feed, chat) + if (currentSession.sendRequest()) { + sessionQueue.addLast(currentSession) + sentAnyRequests = true + } + } + if (!sentAnyRequests) break; endpoint.getOutputStream().flush() } - nextSession = new DownloadSession(eventBus, me.toBase64(), pieces, getInfoHash(), + def nextSession = new DownloadSession(eventBus, me.toBase64(), pieces, getInfoHash(), endpoint, incompleteFile, pieceSize, length, available, dataSinceLastRead, browse, feed, chat) - if (!nextSession.sendRequest()) - nextSession = null + if (nextSession.sendRequest()) + sessionQueue.addLast(nextSession) - requestPerformed = currentSession.consumeResponse() + requestPerformed = sessionQueue.removeFirst().consumeResponse() if (!requestPerformed) break - if (nextSession != null) - currentSession = nextSession successfulDestinations.add(endpoint.destination) writePieces() }