From 73d3fb6ad8792b7147a729a2c227d7d82d77be94 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 15 Oct 2021 16:03:24 +0100 Subject: [PATCH] throttle new downloads to one every 100ms. Helps with GitHub issue #90 --- .../core/download/DownloadManager.groovy | 56 +++++++++++++++---- .../core/download/DownloadSession.groovy | 2 +- .../muwire/core/download/Downloader.groovy | 2 +- 3 files changed, 47 insertions(+), 13 deletions(-) diff --git a/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy b/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy index 6dbc5f45..9790c15e 100644 --- a/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy +++ b/core/src/main/groovy/com/muwire/core/download/DownloadManager.groovy @@ -31,9 +31,11 @@ import com.muwire.core.collections.FileCollection import com.muwire.core.collections.FileCollectionItem import com.muwire.core.collections.UIDownloadCollectionEvent +import java.util.concurrent.BlockingQueue import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executor import java.util.concurrent.Executors +import java.util.concurrent.LinkedBlockingQueue import java.util.logging.Level @Log @@ -52,6 +54,10 @@ public class DownloadManager { private final Map downloaders = new ConcurrentHashMap<>() + private final BlockingQueue pendingStart = new LinkedBlockingQueue<>() + private volatile boolean shutdown + private final Thread pendingProcessor + public DownloadManager(EventBus eventBus, TrustService trustService, MeshManager meshManager, MuWireSettings muSettings, I2PConnector connector, File home, Persona me, ChatServer chatServer, FileManager fileManager) { this.eventBus = eventBus @@ -70,6 +76,31 @@ public class DownloadManager { rv.setDaemon(true) rv }) + + pendingProcessor = new Thread({processDelayed()} as Runnable) + pendingProcessor.setName("download-delayer") + pendingProcessor.setDaemon(true) + pendingProcessor.start() + } + + private void processDelayed() { + while(!shutdown) { + DelayedStart ds = pendingStart.poll() + if (ds != null) { + Downloader downloader = ds.downloader + if (!ds.resume) { + if (!downloader.paused) + executor.execute({ downloader.download() } as Runnable) + eventBus.publish(new DownloadStartedEvent(downloader: downloader)) + } else + executor.execute({downloader.doResume() as Runnable}) + } + Thread.sleep(100) + } + } + + void queueForResume(Downloader downloader) { + pendingStart.offer(new DelayedStart(downloader, true)) } @@ -143,8 +174,7 @@ public class DownloadManager { } downloaders.put(infoHash, downloader) persistDownloaders() - executor.execute({downloader.download()} as Runnable) - eventBus.publish(new DownloadStartedEvent(downloader : downloader)) + pendingStart.offer(new DelayedStart(downloader, false)) } public void onUIDownloadCancelledEvent(UIDownloadCancelledEvent e) { @@ -232,15 +262,8 @@ public class DownloadManager { } - try { - if (!downloader.paused) - downloader.download() - downloaders.put(infoHash, downloader) - eventBus.publish(new DownloadStartedEvent(downloader : downloader)) - } catch (IllegalArgumentException bad) { - log.log(Level.WARNING,"cannot start downloader, skipping", bad) - return - } + downloaders.put(infoHash, downloader) + pendingStart.offer(new DelayedStart(downloader, false)) } } @@ -318,6 +341,8 @@ public class DownloadManager { } public void shutdown() { + shutdown = true + pendingProcessor.interrupt() downloaders.values().each { it.stop() } Downloader.executorService.shutdownNow() } @@ -331,4 +356,13 @@ public class DownloadManager { downloaders.values().each { total += it.speed() } total } + + private static class DelayedStart { + final Downloader downloader + final boolean resume + DelayedStart(Downloader downloader, boolean resume) { + this.downloader = downloader + this.resume = resume + } + } } diff --git a/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy b/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy index 8a0a7cfe..3f667afb 100644 --- a/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy +++ b/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy @@ -225,7 +225,7 @@ class DownloadSession { System.arraycopy(infoHash.getHashList(), piece * 32, expected, 0, 32) if (hash != expected) { pieces.markPartial(piece, 0) - throw new BadHashException("bad hash on piece $piece") + throw new BadHashException("bad hash on piece $piece for infoHash " + Base64.encode(infoHash.getRoot())) } eventBus.publish(new SourceVerifiedEvent(infoHash : infoHash, source : endpoint.destination)) diff --git a/core/src/main/groovy/com/muwire/core/download/Downloader.groovy b/core/src/main/groovy/com/muwire/core/download/Downloader.groovy index 94505647..9ee1c874 100644 --- a/core/src/main/groovy/com/muwire/core/download/Downloader.groovy +++ b/core/src/main/groovy/com/muwire/core/download/Downloader.groovy @@ -119,7 +119,7 @@ abstract class Downloader { public void resume() { paused = false - doResume() + downloadManager.queueForResume(this) } protected abstract void doResume();