Implement variable length request queue, size based on piece size

pull/53/head
Zlatin Balevsky 2020-10-06 22:49:46 +01:00
parent ae193873c9
commit 3e188f5d4a
No known key found for this signature in database
GPG Key ID: A72832072D525E41
1 changed files with 27 additions and 12 deletions

View File

@ -62,6 +62,8 @@ public class Downloader {
/** LOCKING: itself */
private final Map<Destination, Integer> failingDestinations = new HashMap<>()
private final int maxFailures
private final int queueSize
private volatile boolean cancelled, paused
@ -95,6 +97,14 @@ public class Downloader {
this.pieces = pieces
this.nPieces = pieces.nPieces
this.maxFailures = maxFailures
// base queue size on download piece size
int queueSize = 1
if (pieceSizePow2 < 19)
queueSize++
if (pieceSizePow2 < 18)
queueSize++
this.queueSize = queueSize
}
public synchronized InfoHash getInfoHash() {
@ -386,7 +396,7 @@ public class Downloader {
private volatile WorkerState currentState
private volatile Thread downloadThread
private Endpoint endpoint
private volatile DownloadSession currentSession, nextSession
private final LinkedList<DownloadSession> sessionQueue = new LinkedList<>()
private final Set<Integer> available = new HashSet<>()
DownloadWorker(Destination destination) {
@ -413,25 +423,30 @@ public class Downloader {
boolean requestPerformed
while(!pieces.isComplete()) {
if (currentSession == null) {
currentSession = new DownloadSession(eventBus, me.toBase64(), pieces, getInfoHash(),
endpoint, incompleteFile, pieceSize, length, available, dataSinceLastRead,
browse, feed, chat)
if (!currentSession.sendRequest())
if (sessionQueue.isEmpty()) {
boolean sentAnyRequests = false
queueSize.times {
def currentSession = new DownloadSession(eventBus, me.toBase64(), pieces, getInfoHash(),
endpoint, incompleteFile, pieceSize, length, available, dataSinceLastRead,
browse, feed, chat)
if (currentSession.sendRequest()) {
sessionQueue.addLast(currentSession)
sentAnyRequests = true
}
}
if (!sentAnyRequests)
break;
endpoint.getOutputStream().flush()
}
nextSession = new DownloadSession(eventBus, me.toBase64(), pieces, getInfoHash(),
def nextSession = new DownloadSession(eventBus, me.toBase64(), pieces, getInfoHash(),
endpoint, incompleteFile, pieceSize, length, available, dataSinceLastRead,
browse, feed, chat)
if (!nextSession.sendRequest())
nextSession = null
if (nextSession.sendRequest())
sessionQueue.addLast(nextSession)
requestPerformed = currentSession.consumeResponse()
requestPerformed = sessionQueue.removeFirst().consumeResponse()
if (!requestPerformed)
break
if (nextSession != null)
currentSession = nextSession
successfulDestinations.add(endpoint.destination)
writePieces()
}