From 614ecc85fe6fc5e11691ea3cf3ec86b022ae2026 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Mon, 17 Jun 2019 03:21:37 +0100 Subject: [PATCH] new piece selection logic to avoid high cpu bug --- .../core/download/DownloadSession.groovy | 35 +++++++---------- .../muwire/core/download/Downloader.groovy | 19 +++++---- .../com/muwire/core/download/Pieces.groovy | 39 ++++++++++--------- 3 files changed, 42 insertions(+), 51 deletions(-) diff --git a/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy b/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy index b8110d81..00874c54 100644 --- a/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy +++ b/core/src/main/groovy/com/muwire/core/download/DownloadSession.groovy @@ -16,6 +16,7 @@ import java.nio.file.Files import java.nio.file.StandardOpenOption import java.security.MessageDigest import java.security.NoSuchAlgorithmException +import java.util.logging.Level @Log class DownloadSession { @@ -23,7 +24,7 @@ class DownloadSession { private static int SAMPLES = 10 private final String meB64 - private final Pieces downloaded, claimed + private final Pieces pieces private final InfoHash infoHash private final Endpoint endpoint private final File file @@ -36,11 +37,10 @@ class DownloadSession { private ByteBuffer mapped - DownloadSession(String meB64, Pieces downloaded, Pieces claimed, InfoHash infoHash, Endpoint endpoint, File file, + DownloadSession(String meB64, Pieces pieces, InfoHash infoHash, Endpoint endpoint, File file, int pieceSize, long fileLength) { this.meB64 = meB64 - this.downloaded = downloaded - this.claimed = claimed + this.pieces = pieces this.endpoint = endpoint this.infoHash = infoHash this.file = file @@ -63,22 +63,11 @@ class DownloadSession { OutputStream os = endpoint.getOutputStream() InputStream is = endpoint.getInputStream() - int piece - while(true) { - piece = downloaded.getRandomPiece() - if (piece == -1) - return false - if (claimed.isMarked(piece)) { - if (downloaded.donePieces() + claimed.donePieces() == downloaded.nPieces) { - log.info("all pieces claimed") - return false - } - continue - } - break - } - claimed.markDownloaded(piece) - + int piece = pieces.claim() + if (piece == -1) + return false + boolean unclaim = true + log.info("will download piece $piece") long start = piece * pieceSize @@ -171,9 +160,11 @@ class DownloadSession { } finally { try { channel?.close() } catch (IOException ignore) {} } - downloaded.markDownloaded(piece) + pieces.markDownloaded(piece) + unclaim = false } finally { - claimed.clear(piece) + if (unclaim) + pieces.unclaim(piece) } return true } diff --git a/core/src/main/groovy/com/muwire/core/download/Downloader.groovy b/core/src/main/groovy/com/muwire/core/download/Downloader.groovy index a6b25ed3..a575999c 100644 --- a/core/src/main/groovy/com/muwire/core/download/Downloader.groovy +++ b/core/src/main/groovy/com/muwire/core/download/Downloader.groovy @@ -35,7 +35,7 @@ public class Downloader { private final DownloadManager downloadManager private final Persona me private final File file - private final Pieces downloaded, claimed + private final Pieces pieces private final long length private InfoHash infoHash private final int pieceSize @@ -74,8 +74,7 @@ public class Downloader { nPieces = length / pieceSize + 1 this.nPieces = nPieces - downloaded = new Pieces(nPieces, Constants.DOWNLOAD_SEQUENTIAL_RATIO) - claimed = new Pieces(nPieces) + pieces = new Pieces(nPieces, Constants.DOWNLOAD_SEQUENTIAL_RATIO) } public synchronized InfoHash getInfoHash() { @@ -102,7 +101,7 @@ public class Downloader { return piecesFile.eachLine { int piece = Integer.parseInt(it) - downloaded.markDownloaded(piece) + pieces.markDownloaded(piece) } } @@ -111,7 +110,7 @@ public class Downloader { if (piecesFileClosed) return piecesFile.withPrintWriter { writer -> - downloaded.getDownloaded().each { piece -> + pieces.getDownloaded().each { piece -> writer.println(piece) } } @@ -119,7 +118,7 @@ public class Downloader { } public long donePieces() { - downloaded.donePieces() + pieces.donePieces() } @@ -142,7 +141,7 @@ public class Downloader { allFinished &= it.currentState == WorkerState.FINISHED } if (allFinished) { - if (downloaded.isComplete()) + if (pieces.isComplete()) return DownloadState.FINISHED return DownloadState.FAILED } @@ -240,8 +239,8 @@ public class Downloader { } currentState = WorkerState.DOWNLOADING boolean requestPerformed - while(!downloaded.isComplete()) { - currentSession = new DownloadSession(me.toBase64(), downloaded, claimed, getInfoHash(), endpoint, file, pieceSize, length) + while(!pieces.isComplete()) { + currentSession = new DownloadSession(me.toBase64(), pieces, getInfoHash(), endpoint, file, pieceSize, length) requestPerformed = currentSession.request() if (!requestPerformed) break @@ -251,7 +250,7 @@ public class Downloader { log.log(Level.WARNING,"Exception while downloading",bad) } finally { currentState = WorkerState.FINISHED - if (downloaded.isComplete() && eventFired.compareAndSet(false, true)) { + if (pieces.isComplete() && eventFired.compareAndSet(false, true)) { synchronized(piecesFile) { piecesFileClosed = true piecesFile.delete() diff --git a/core/src/main/groovy/com/muwire/core/download/Pieces.groovy b/core/src/main/groovy/com/muwire/core/download/Pieces.groovy index 98dd27f6..4028a6f7 100644 --- a/core/src/main/groovy/com/muwire/core/download/Pieces.groovy +++ b/core/src/main/groovy/com/muwire/core/download/Pieces.groovy @@ -1,7 +1,7 @@ package com.muwire.core.download class Pieces { - private final BitSet bitSet + private final BitSet done, claimed private final int nPieces private final float ratio private final Random random = new Random() @@ -13,52 +13,53 @@ class Pieces { Pieces(int nPieces, float ratio) { this.nPieces = nPieces this.ratio = ratio - bitSet = new BitSet(nPieces) + done = new BitSet(nPieces) + claimed = new BitSet(nPieces) } - synchronized int getRandomPiece() { - int cardinality = bitSet.cardinality() - if (cardinality == nPieces) + synchronized int claim() { + int claimedCardinality = claimed.cardinality() + if (claimedCardinality == nPieces) return -1 // if fuller than ratio just do sequential - if ( (1.0f * cardinality) / nPieces > ratio) { - return bitSet.nextClearBit(0) + if ( (1.0f * claimedCardinality) / nPieces > ratio) { + int rv = claimed.nextClearBit(0) + claimed.set(rv) + return rv } while(true) { int start = random.nextInt(nPieces) - if (bitSet.get(start)) + if (claimed.get(start)) continue + claimed.set(start) return start } } - def getDownloaded() { + synchronized def getDownloaded() { def rv = [] - for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i+1)) { + for (int i = done.nextSetBit(0); i >= 0; i = done.nextSetBit(i+1)) { rv << i } rv } synchronized void markDownloaded(int piece) { - bitSet.set(piece) + done.set(piece) + claimed.set(piece) } - synchronized void clear(int piece) { - bitSet.clear(piece) + synchronized void unclaim(int piece) { + claimed.clear(piece) } synchronized boolean isComplete() { - bitSet.cardinality() == nPieces - } - - synchronized boolean isMarked(int piece) { - bitSet.get(piece) + done.cardinality() == nPieces } synchronized int donePieces() { - bitSet.cardinality() + done.cardinality() } }