revert delaying of downloads as the semaphore approach is enough. GitHub issue #90

auto-update
Zlatin Balevsky 2021-10-15 18:33:05 +01:00
parent 3d72497d5f
commit 0e0b30c3a6
No known key found for this signature in database
GPG Key ID: A72832072D525E41
3 changed files with 13 additions and 51 deletions

View File

@ -31,11 +31,9 @@ import com.muwire.core.collections.FileCollection
import com.muwire.core.collections.FileCollectionItem import com.muwire.core.collections.FileCollectionItem
import com.muwire.core.collections.UIDownloadCollectionEvent import com.muwire.core.collections.UIDownloadCollectionEvent
import java.util.concurrent.BlockingQueue
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executor import java.util.concurrent.Executor
import java.util.concurrent.Executors import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingQueue
import java.util.logging.Level import java.util.logging.Level
@Log @Log
@ -54,10 +52,6 @@ public class DownloadManager {
private final Map<InfoHash, Downloader> downloaders = new ConcurrentHashMap<>() private final Map<InfoHash, Downloader> downloaders = new ConcurrentHashMap<>()
private final BlockingQueue<DelayedStart> pendingStart = new LinkedBlockingQueue<>()
private volatile boolean shutdown
private final Thread pendingProcessor
public DownloadManager(EventBus eventBus, TrustService trustService, MeshManager meshManager, MuWireSettings muSettings, public DownloadManager(EventBus eventBus, TrustService trustService, MeshManager meshManager, MuWireSettings muSettings,
I2PConnector connector, File home, Persona me, ChatServer chatServer, FileManager fileManager) { I2PConnector connector, File home, Persona me, ChatServer chatServer, FileManager fileManager) {
this.eventBus = eventBus this.eventBus = eventBus
@ -76,31 +70,6 @@ public class DownloadManager {
rv.setDaemon(true) rv.setDaemon(true)
rv rv
}) })
pendingProcessor = new Thread({processDelayed()} as Runnable)
pendingProcessor.setName("download-delayer")
pendingProcessor.setDaemon(true)
pendingProcessor.start()
}
private void processDelayed() {
while(!shutdown) {
DelayedStart ds = pendingStart.poll()
if (ds != null) {
Downloader downloader = ds.downloader
if (!ds.resume) {
if (!downloader.paused)
executor.execute({ downloader.download() } as Runnable)
eventBus.publish(new DownloadStartedEvent(downloader: downloader))
} else
executor.execute({downloader.doResume() as Runnable})
}
Thread.sleep(100)
}
}
void queueForResume(Downloader downloader) {
pendingStart.offer(new DelayedStart(downloader, true))
} }
@ -174,7 +143,8 @@ public class DownloadManager {
} }
downloaders.put(infoHash, downloader) downloaders.put(infoHash, downloader)
persistDownloaders() persistDownloaders()
pendingStart.offer(new DelayedStart(downloader, false)) executor.execute({downloader.download()} as Runnable)
eventBus.publish(new DownloadStartedEvent(downloader: downloader))
return downloader return downloader
} }
@ -196,10 +166,6 @@ public class DownloadManager {
persistDownloaders() persistDownloaders()
} }
void resume(Downloader downloader) {
executor.execute({downloader.download() as Runnable})
}
void onUILoadedEvent(UILoadedEvent e) { void onUILoadedEvent(UILoadedEvent e) {
File downloadsFile = new File(home, "downloads.json") File downloadsFile = new File(home, "downloads.json")
if (!downloadsFile.exists()) if (!downloadsFile.exists())
@ -263,8 +229,15 @@ public class DownloadManager {
} }
downloaders.put(infoHash, downloader) try {
pendingStart.offer(new DelayedStart(downloader, false)) if (!downloader.paused)
downloader.download()
downloaders.put(infoHash, downloader)
eventBus.publish(new DownloadStartedEvent(downloader : downloader))
} catch (IllegalArgumentException bad) {
log.log(Level.WARNING,"cannot start downloader, skipping", bad)
return
}
} }
} }
@ -342,8 +315,6 @@ public class DownloadManager {
} }
public void shutdown() { public void shutdown() {
shutdown = true
pendingProcessor.interrupt()
downloaders.values().each { it.stop() } downloaders.values().each { it.stop() }
Downloader.executorService.shutdownNow() Downloader.executorService.shutdownNow()
} }
@ -357,13 +328,4 @@ public class DownloadManager {
downloaders.values().each { total += it.speed() } downloaders.values().each { total += it.speed() }
total total
} }
private static class DelayedStart {
final Downloader downloader
final boolean resume
DelayedStart(Downloader downloader, boolean resume) {
this.downloader = downloader
this.resume = resume
}
}
} }

View File

@ -225,7 +225,7 @@ class DownloadSession {
System.arraycopy(infoHash.getHashList(), piece * 32, expected, 0, 32) System.arraycopy(infoHash.getHashList(), piece * 32, expected, 0, 32)
if (hash != expected) { if (hash != expected) {
pieces.markPartial(piece, 0) pieces.markPartial(piece, 0)
throw new BadHashException("bad hash on piece $piece for infoHash " + Base64.encode(infoHash.getRoot())) throw new BadHashException("bad hash on piece $piece")
} }
eventBus.publish(new SourceVerifiedEvent(infoHash : infoHash, source : endpoint.destination)) eventBus.publish(new SourceVerifiedEvent(infoHash : infoHash, source : endpoint.destination))

View File

@ -119,7 +119,7 @@ abstract class Downloader {
public void resume() { public void resume() {
paused = false paused = false
downloadManager.queueForResume(this) doResume()
} }
protected abstract void doResume(); protected abstract void doResume();