enable stealing of pieces from other download workers

pull/9/head
Zlatin Balevsky 2019-07-06 11:26:18 +01:00
parent db36367b11
commit 0a8016dea7
2 changed files with 23 additions and 8 deletions

View File

@ -79,9 +79,10 @@ class DownloadSession {
return false return false
int piece = pieceAndPosition[0] int piece = pieceAndPosition[0]
int position = pieceAndPosition[1] int position = pieceAndPosition[1]
boolean steal = pieceAndPosition[2] == 1
boolean unclaim = true boolean unclaim = true
log.info("will download piece $piece from position $position") log.info("will download piece $piece from position $position steal $steal")
long pieceStart = piece * pieceSize long pieceStart = piece * pieceSize
long end = Math.min(fileLength, pieceStart + pieceSize) - 1 long end = Math.min(fileLength, pieceStart + pieceSize) - 1
@ -208,7 +209,7 @@ class DownloadSession {
pieces.markDownloaded(piece) pieces.markDownloaded(piece)
unclaim = false unclaim = false
} finally { } finally {
if (unclaim) if (unclaim && !steal)
pieces.unclaim(piece) pieces.unclaim(piece)
} }
return true return true

View File

@ -20,14 +20,20 @@ class Pieces {
synchronized int[] claim() { synchronized int[] claim() {
int claimedCardinality = claimed.cardinality() int claimedCardinality = claimed.cardinality()
if (claimedCardinality == nPieces) if (claimedCardinality == nPieces) {
return null // steal
int downloadedCardinality = done.cardinality()
if (downloadedCardinality == nPieces)
return null
int rv = done.nextClearBit(0)
return [rv, partials.getOrDefault(rv, 0), 1]
}
// if fuller than ratio just do sequential // if fuller than ratio just do sequential
if ( (1.0f * claimedCardinality) / nPieces > ratio) { if ( (1.0f * claimedCardinality) / nPieces > ratio) {
int rv = claimed.nextClearBit(0) int rv = claimed.nextClearBit(0)
claimed.set(rv) claimed.set(rv)
return [rv, partials.getOrDefault(rv, 0)] return [rv, partials.getOrDefault(rv, 0), 0]
} }
while(true) { while(true) {
@ -35,20 +41,28 @@ class Pieces {
if (claimed.get(start)) if (claimed.get(start))
continue continue
claimed.set(start) claimed.set(start)
return [start, partials.getOrDefault(start,0)] return [start, partials.getOrDefault(start,0), 0]
} }
} }
synchronized int[] claim(Set<Integer> available) { synchronized int[] claim(Set<Integer> available) {
for (int i = claimed.nextSetBit(0); i >= 0; i = claimed.nextSetBit(i+1)) for (int i = done.nextSetBit(0); i >= 0; i = done.nextSetBit(i+1))
available.remove(i) available.remove(i)
if (available.isEmpty()) if (available.isEmpty())
return null return null
Set<Integer> availableCopy = new HashSet<>(available)
for (int i = claimed.nextSetBit(0); i >= 0; i = claimed.nextSetBit(i+1))
availableCopy.remove(i)
if (availableCopy.isEmpty()) {
// steal
int rv = available.first()
return [rv, partials.getOrDefault(rv, 0), 1]
}
List<Integer> toList = available.toList() List<Integer> toList = available.toList()
Collections.shuffle(toList) Collections.shuffle(toList)
int rv = toList[0] int rv = toList[0]
claimed.set(rv) claimed.set(rv)
[rv, partials.getOrDefault(rv, 0)] [rv, partials.getOrDefault(rv, 0), 0]
} }
synchronized def getDownloaded() { synchronized def getDownloaded() {