throttle new downloads to one every 100ms. Helps with GitHub issue #90

auto-update
Zlatin Balevsky 2021-10-15 16:03:24 +01:00
parent 33b5ca184c
commit 73d3fb6ad8
No known key found for this signature in database
GPG Key ID: A72832072D525E41
3 changed files with 47 additions and 13 deletions

View File

@ -31,9 +31,11 @@ import com.muwire.core.collections.FileCollection
import com.muwire.core.collections.FileCollectionItem import com.muwire.core.collections.FileCollectionItem
import com.muwire.core.collections.UIDownloadCollectionEvent import com.muwire.core.collections.UIDownloadCollectionEvent
import java.util.concurrent.BlockingQueue
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executor import java.util.concurrent.Executor
import java.util.concurrent.Executors import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingQueue
import java.util.logging.Level import java.util.logging.Level
@Log @Log
@ -52,6 +54,10 @@ public class DownloadManager {
private final Map<InfoHash, Downloader> downloaders = new ConcurrentHashMap<>() private final Map<InfoHash, Downloader> downloaders = new ConcurrentHashMap<>()
private final BlockingQueue<DelayedStart> pendingStart = new LinkedBlockingQueue<>()
private volatile boolean shutdown
private final Thread pendingProcessor
public DownloadManager(EventBus eventBus, TrustService trustService, MeshManager meshManager, MuWireSettings muSettings, public DownloadManager(EventBus eventBus, TrustService trustService, MeshManager meshManager, MuWireSettings muSettings,
I2PConnector connector, File home, Persona me, ChatServer chatServer, FileManager fileManager) { I2PConnector connector, File home, Persona me, ChatServer chatServer, FileManager fileManager) {
this.eventBus = eventBus this.eventBus = eventBus
@ -70,6 +76,31 @@ public class DownloadManager {
rv.setDaemon(true) rv.setDaemon(true)
rv rv
}) })
pendingProcessor = new Thread({processDelayed()} as Runnable)
pendingProcessor.setName("download-delayer")
pendingProcessor.setDaemon(true)
pendingProcessor.start()
}
private void processDelayed() {
while(!shutdown) {
DelayedStart ds = pendingStart.poll()
if (ds != null) {
Downloader downloader = ds.downloader
if (!ds.resume) {
if (!downloader.paused)
executor.execute({ downloader.download() } as Runnable)
eventBus.publish(new DownloadStartedEvent(downloader: downloader))
} else
executor.execute({downloader.doResume() as Runnable})
}
Thread.sleep(100)
}
}
void queueForResume(Downloader downloader) {
pendingStart.offer(new DelayedStart(downloader, true))
} }
@ -143,8 +174,7 @@ public class DownloadManager {
} }
downloaders.put(infoHash, downloader) downloaders.put(infoHash, downloader)
persistDownloaders() persistDownloaders()
executor.execute({downloader.download()} as Runnable) pendingStart.offer(new DelayedStart(downloader, false))
eventBus.publish(new DownloadStartedEvent(downloader : downloader))
} }
public void onUIDownloadCancelledEvent(UIDownloadCancelledEvent e) { public void onUIDownloadCancelledEvent(UIDownloadCancelledEvent e) {
@ -232,15 +262,8 @@ public class DownloadManager {
} }
try { downloaders.put(infoHash, downloader)
if (!downloader.paused) pendingStart.offer(new DelayedStart(downloader, false))
downloader.download()
downloaders.put(infoHash, downloader)
eventBus.publish(new DownloadStartedEvent(downloader : downloader))
} catch (IllegalArgumentException bad) {
log.log(Level.WARNING,"cannot start downloader, skipping", bad)
return
}
} }
} }
@ -318,6 +341,8 @@ public class DownloadManager {
} }
public void shutdown() { public void shutdown() {
shutdown = true
pendingProcessor.interrupt()
downloaders.values().each { it.stop() } downloaders.values().each { it.stop() }
Downloader.executorService.shutdownNow() Downloader.executorService.shutdownNow()
} }
@ -331,4 +356,13 @@ public class DownloadManager {
downloaders.values().each { total += it.speed() } downloaders.values().each { total += it.speed() }
total total
} }
private static class DelayedStart {
final Downloader downloader
final boolean resume
DelayedStart(Downloader downloader, boolean resume) {
this.downloader = downloader
this.resume = resume
}
}
} }

View File

@ -225,7 +225,7 @@ class DownloadSession {
System.arraycopy(infoHash.getHashList(), piece * 32, expected, 0, 32) System.arraycopy(infoHash.getHashList(), piece * 32, expected, 0, 32)
if (hash != expected) { if (hash != expected) {
pieces.markPartial(piece, 0) pieces.markPartial(piece, 0)
throw new BadHashException("bad hash on piece $piece") throw new BadHashException("bad hash on piece $piece for infoHash " + Base64.encode(infoHash.getRoot()))
} }
eventBus.publish(new SourceVerifiedEvent(infoHash : infoHash, source : endpoint.destination)) eventBus.publish(new SourceVerifiedEvent(infoHash : infoHash, source : endpoint.destination))

View File

@ -119,7 +119,7 @@ abstract class Downloader {
public void resume() { public void resume() {
paused = false paused = false
doResume() downloadManager.queueForResume(this)
} }
protected abstract void doResume(); protected abstract void doResume();