parse X-Alt

pull/5/head
Zlatin Balevsky 2019-06-21 12:25:04 +01:00
parent 56125f6df8
commit 05b02834af
4 changed files with 60 additions and 3 deletions

View File

@ -3,7 +3,9 @@ package com.muwire.core.download;
import net.i2p.data.Base64 import net.i2p.data.Base64
import com.muwire.core.Constants import com.muwire.core.Constants
import com.muwire.core.EventBus
import com.muwire.core.InfoHash import com.muwire.core.InfoHash
import com.muwire.core.Persona
import com.muwire.core.connection.Endpoint import com.muwire.core.connection.Endpoint
import com.muwire.core.util.DataUtil import com.muwire.core.util.DataUtil
@ -25,6 +27,7 @@ class DownloadSession {
private static int SAMPLES = 10 private static int SAMPLES = 10
private final EventBus eventBus
private final String meB64 private final String meB64
private final Pieces pieces private final Pieces pieces
private final InfoHash infoHash private final InfoHash infoHash
@ -40,8 +43,9 @@ class DownloadSession {
private ByteBuffer mapped private ByteBuffer mapped
DownloadSession(String meB64, Pieces pieces, InfoHash infoHash, Endpoint endpoint, File file, DownloadSession(EventBus eventBus, String meB64, Pieces pieces, InfoHash infoHash, Endpoint endpoint, File file,
int pieceSize, long fileLength, Set<Integer> available) { int pieceSize, long fileLength, Set<Integer> available) {
this.eventBus = eventBus
this.meB64 = meB64 this.meB64 = meB64
this.pieces = pieces this.pieces = pieces
this.endpoint = endpoint this.endpoint = endpoint
@ -121,6 +125,15 @@ class DownloadSession {
headers[key] = value.trim() headers[key] = value.trim()
} }
// prase X-Alt if present
if (headers.containsKey("X-Alt")) {
headers["X-Alt"].split(",").each {
byte [] raw = Base64.decode(it)
Persona source = new Persona(new ByteArrayInputStream(raw))
eventBus.publish(new SourceDiscoveredEvent(infoHash : infoHash, source : source))
}
}
// parse X-Have if present // parse X-Have if present
if (headers.containsKey("X-Have")) { if (headers.containsKey("X-Have")) {
DataUtil.decodeXHave(headers["X-Have"]).each { DataUtil.decodeXHave(headers["X-Have"]).each {

View File

@ -247,7 +247,7 @@ public class Downloader {
currentState = WorkerState.DOWNLOADING currentState = WorkerState.DOWNLOADING
boolean requestPerformed boolean requestPerformed
while(!pieces.isComplete()) { while(!pieces.isComplete()) {
currentSession = new DownloadSession(me.toBase64(), pieces, getInfoHash(), endpoint, incompleteFile, pieceSize, length) currentSession = new DownloadSession(eventBus, me.toBase64(), pieces, getInfoHash(), endpoint, incompleteFile, pieceSize, length)
requestPerformed = currentSession.request() requestPerformed = currentSession.request()
if (!requestPerformed) if (!requestPerformed)
break break

View File

@ -0,0 +1,10 @@
package com.muwire.core.download
import com.muwire.core.Event
import com.muwire.core.InfoHash
import com.muwire.core.Persona
class SourceDiscoveredEvent extends Event {
InfoHash infoHash
Persona source
}

View File

@ -3,9 +3,13 @@ package com.muwire.core.download
import static org.junit.Assert.fail import static org.junit.Assert.fail
import org.junit.After import org.junit.After
import org.junit.Before
import org.junit.Test import org.junit.Test
import com.muwire.core.EventBus
import com.muwire.core.InfoHash import com.muwire.core.InfoHash
import com.muwire.core.Persona
import com.muwire.core.Personas
import com.muwire.core.connection.Endpoint import com.muwire.core.connection.Endpoint
import com.muwire.core.files.FileHasher import com.muwire.core.files.FileHasher
import static com.muwire.core.util.DataUtil.readTillRN import static com.muwire.core.util.DataUtil.readTillRN
@ -16,6 +20,7 @@ import net.i2p.util.ConcurrentHashSet
class DownloadSessionTest { class DownloadSessionTest {
private EventBus eventBus
private File source, target private File source, target
private InfoHash infoHash private InfoHash infoHash
private Endpoint endpoint private Endpoint endpoint
@ -32,6 +37,12 @@ class DownloadSessionTest {
private Set<Integer> available = new ConcurrentHashSet<>() private Set<Integer> available = new ConcurrentHashSet<>()
private volatile IOException thrown private volatile IOException thrown
@Before
public void setUp() {
eventBus = new EventBus()
}
private void initSession(int size, def claimedPieces = []) { private void initSession(int size, def claimedPieces = []) {
Random r = new Random() Random r = new Random()
byte [] content = new byte[size] byte [] content = new byte[size]
@ -64,7 +75,7 @@ class DownloadSessionTest {
toUploader = new PipedOutputStream(fromDownloader) toUploader = new PipedOutputStream(fromDownloader)
endpoint = new Endpoint(null, fromUploader, toUploader, null) endpoint = new Endpoint(null, fromUploader, toUploader, null)
session = new DownloadSession("",pieces, infoHash, endpoint, target, pieceSize, size, available) session = new DownloadSession(eventBus, "",pieces, infoHash, endpoint, target, pieceSize, size, available)
downloadThread = new Thread( { perform() } as Runnable) downloadThread = new Thread( { perform() } as Runnable)
downloadThread.setDaemon(true) downloadThread.setDaemon(true)
downloadThread.start() downloadThread.start()
@ -289,6 +300,29 @@ class DownloadSessionTest {
assert start == 0 assert start == 0
} }
@Test
public void testXAlt() throws Exception {
Personas personas = new Personas()
def sources = []
def listener = new Object() {
public void onSourceDiscoveredEvent(SourceDiscoveredEvent e) {
sources << e.source
}
}
eventBus.register(SourceDiscoveredEvent.class, listener)
initSession(20)
readAllHeaders(fromDownloader)
toDownloader.write("416 don't have it\r\n".bytes)
toDownloader.write("X-Alt: ${personas.persona1.toBase64()},${personas.persona2.toBase64()}\r\n\r\n".bytes)
toDownloader.flush()
Thread.sleep(150)
assert sources.contains(personas.persona1)
assert sources.contains(personas.persona2)
assert 2 == sources.size()
}
private static Set<String> readAllHeaders(InputStream is) { private static Set<String> readAllHeaders(InputStream is) {
Set<String> rv = new HashSet<>() Set<String> rv = new HashSet<>()
String header String header