refactoring to enable hashlist uploads

pull/5/head
Zlatin Balevsky 2019-06-12 17:33:43 +01:00
parent 5172e19627
commit e426b3ccbd
8 changed files with 226 additions and 93 deletions

View File

@ -108,6 +108,9 @@ class ConnectionAcceptor {
case (byte)'G':
processGET(e)
break
case (byte)'H':
processHashList(e)
break
case (byte)'P':
processPOST(e)
break
@ -178,9 +181,18 @@ class ConnectionAcceptor {
dis.readFully(et)
if (et != "ET ".getBytes(StandardCharsets.US_ASCII))
throw new IOException("Invalid GET connection")
uploadManager.processEndpoint(e)
uploadManager.processGET(e)
}
private void processHashList(Endpoint e) {
byte[] ashList = new byte[8]
final DataInputStream dis = new DataInputStream(e.getInputStream())
dis.readFully(ashList)
if (ashList != "ASHLIST ".getBytes(StandardCharsets.US_ASCII))
throw new IOException("Invalid HASHLIST connection")
uploadManager.processHashList(e)
}
private void processPOST(final Endpoint e) throws IOException {
byte [] ost = new byte[4]
final DataInputStream dis = new DataInputStream(e.getInputStream())

View File

@ -0,0 +1,5 @@
package com.muwire.core.upload
class ContentRequest extends Request {
Range range
}

View File

@ -0,0 +1,54 @@
package com.muwire.core.upload
import java.nio.ByteBuffer
import java.nio.channels.FileChannel
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.nio.file.StandardOpenOption
import com.muwire.core.connection.Endpoint
class ContentUploader extends Uploader {
private final File file
private final ContentRequest request
ContentUploader(File file, ContentRequest request, Endpoint endpoint) {
super(endpoint)
this.file = file
this.request = request
}
@Override
void respond() {
OutputStream os = endpoint.getOutputStream()
Range range = request.getRange()
if (range.start >= file.length() || range.end >= file.length()) {
os.write("416 Range Not Satisfiable\r\n\r\n".getBytes(StandardCharsets.US_ASCII))
os.flush()
return
}
os.write("200 OK\r\n".getBytes(StandardCharsets.US_ASCII))
os.write("Content-Range: $range.start-$range.end\r\n\r\n".getBytes(StandardCharsets.US_ASCII))
FileChannel channel
try {
channel = Files.newByteChannel(file.toPath(), EnumSet.of(StandardOpenOption.READ))
mapped = channel.map(FileChannel.MapMode.READ_ONLY, range.start, range.end - range.start + 1)
byte [] tmp = new byte[0x1 << 13]
while(mapped.hasRemaining()) {
int start = mapped.position()
synchronized(this) {
mapped.get(tmp, 0, Math.min(tmp.length, mapped.remaining()))
}
int read = mapped.position() - start
endpoint.getOutputStream().write(tmp, 0, read)
}
} finally {
try {channel?.close() } catch (IOException ignored) {}
endpoint.getOutputStream().flush()
}
}
}

View File

@ -0,0 +1,4 @@
package com.muwire.core.upload
class HashListRequest extends Request {
}

View File

@ -0,0 +1,35 @@
package com.muwire.core.upload
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import com.muwire.core.InfoHash
import com.muwire.core.connection.Endpoint
class HashListUploader extends Uploader {
private final InfoHash infoHash
private final HashListRequest request
HashListUploader(Endpoint endpoint, InfoHash infoHash, HashListRequest request) {
super(endpoint)
this.infoHash = infoHash
mapped = ByteBuffer.wrap(infoHash.getHashList())
}
void respond() {
OutputStream os = endpoint.getOutputStream()
os.write("200 OK\r\n".getBytes(StandardCharsets.US_ASCII))
os.write("Content-Range: 0-${mapped.remaining()}")
byte[]tmp = new byte[0x1 << 13]
while(mapped.hasRemaining()) {
int start = mapped.position()
synchronized(this) {
mapped.get(tmp, 0, Math.min(tmp.length, mapped.remaining()))
}
int read = mapped.position() - start
endpoint.getOutputStream().write(tmp, 0, read)
}
endpoint.getOutputStream().flush()
}
}

View File

@ -16,57 +16,12 @@ class Request {
private static final byte N = "\n".getBytes(StandardCharsets.US_ASCII)[0]
InfoHash infoHash
Range range
Persona downloader
Map<String, String> headers
static Request parse(InfoHash infoHash, InputStream is) throws IOException {
Map<String,String> headers = new HashMap<>()
byte [] tmp = new byte[Constants.MAX_HEADER_SIZE]
while(headers.size() < Constants.MAX_HEADERS) {
boolean r = false
boolean n = false
int idx = 0
while (true) {
byte read = is.read()
if (read == -1)
throw new IOException("Stream closed")
if (!r && read == N)
throw new IOException("Received N before R")
if (read == R) {
if (r)
throw new IOException("double R")
r = true
continue
}
if (r && !n) {
if (read != N)
throw new IOException("R not followed by N")
n = true
break
}
if (idx == 0x1 << 14)
throw new IOException("Header too long")
tmp[idx++] = read
}
if (idx == 0)
break
String header = new String(tmp, 0, idx, StandardCharsets.US_ASCII)
log.fine("Read header $header")
int keyIdx = header.indexOf(":")
if (keyIdx < 1)
throw new IOException("Header key not found")
if (keyIdx == header.length())
throw new IOException("Header value not found")
String key = header.substring(0, keyIdx)
String value = header.substring(keyIdx + 1)
headers.put(key, value)
}
static Request parseContentRequest(InfoHash infoHash, InputStream is) throws IOException {
Map<String, String> headers = parseHeaders(is)
if (!headers.containsKey("Range"))
throw new IOException("Range header not found")
@ -93,7 +48,69 @@ class Request {
def decoded = Base64.decode(encoded)
downloader = new Persona(new ByteArrayInputStream(decoded))
}
new Request( infoHash : infoHash, range : new Range(start, end), headers : headers, downloader : downloader)
new ContentRequest( infoHash : infoHash, range : new Range(start, end),
headers : headers, downloader : downloader)
}
static Request parseHashListRequest(InfoHash infoHash, InputStream is) throws IOException {
Map<String,String> headers = parseHeaders(is)
Persona downloader = null
if (headers.containsKey("X-Persona")) {
def encoded = headers["X-Persona"].trim()
def decoded = Base64.decode(encoded)
downloader = new Persona(new ByteArrayInputStream(decoded))
}
new HashListRequest(infoHash : infoHash, headers : headers, downloader : downloader)
}
private static Map<String, String> parseHeaders(InputStream is) {
Map<String,String> headers = new HashMap<>()
byte [] tmp = new byte[Constants.MAX_HEADER_SIZE]
while(headers.size() < Constants.MAX_HEADERS) {
boolean r = false
boolean n = false
int idx = 0
while (true) {
byte read = is.read()
if (read == -1)
throw new IOException("Stream closed")
if (!r && read == N)
throw new IOException("Received N before R")
if (read == R) {
if (r)
throw new IOException("double R")
r = true
continue
}
if (r && !n) {
if (read != N)
throw new IOException("R not followed by N")
n = true
break
}
if (idx == 0x1 << 14)
throw new IOException("Header too long")
tmp[idx++] = read
}
if (idx == 0)
break
String header = new String(tmp, 0, idx, StandardCharsets.US_ASCII)
log.fine("Read header $header")
int keyIdx = header.indexOf(":")
if (keyIdx < 1)
throw new IOException("Header key not found")
if (keyIdx == header.length())
throw new IOException("Header value not found")
String key = header.substring(0, keyIdx)
String value = header.substring(keyIdx + 1)
headers.put(key, value)
}
headers
}
}

View File

@ -23,9 +23,9 @@ public class UploadManager {
this.fileManager = fileManager
}
public void processEndpoint(Endpoint e) throws IOException {
public void processGET(Endpoint e) throws IOException {
byte [] infoHashStringBytes = new byte[44]
DataInputStream dis = new DataInputStream(e.getInputStream())
DataInputStream dis = new DataInputStream(e.getInputStream())
boolean first = true
while(true) {
if (first)
@ -61,13 +61,13 @@ public class UploadManager {
return
}
Request request = Request.parse(new InfoHash(infoHashRoot), e.getInputStream())
Request request = Request.parseContentRequest(new InfoHash(infoHashRoot), e.getInputStream())
if (request.downloader != null && request.downloader.destination != e.destination) {
log.info("Downloader persona doesn't match their destination")
e.close()
return
}
Uploader uploader = new Uploader(sharedFiles.iterator().next().file, request, e)
Uploader uploader = new ContentUploader(sharedFiles.iterator().next().file, request, e)
eventBus.publish(new UploadEvent(uploader : uploader))
try {
uploader.respond()
@ -75,7 +75,46 @@ public class UploadManager {
eventBus.publish(new UploadFinishedEvent(uploader : uploader))
}
}
}
public void processHashList(Endpoint e) {
byte [] infoHashStringBytes = new byte[44]
DataInputStream dis = new DataInputStream(e.getInputStream())
dis.readFully(infoHashStringBytes)
String infoHashString = new String(infoHashStringBytes, StandardCharsets.US_ASCII)
log.info("Responding to hashlist request for root $infoHashString")
byte [] infoHashRoot = Base64.decode(infoHashString)
Set<SharedFile> sharedFiles = fileManager.getSharedFiles(infoHashRoot)
if (sharedFiles == null || sharedFiles.isEmpty()) {
log.info "file not found"
e.getOutputStream().write("404 File Not Found\r\n\r\n".getBytes(StandardCharsets.US_ASCII))
e.getOutputStream().flush()
e.close()
return
}
byte [] rn = new byte[2]
dis.readFully(rn)
if (rn != "\r\n".getBytes(StandardCharsets.US_ASCII)) {
log.warning("Malformed HASHLIST header")
e.close()
return
}
Request request = Request.parseHashListRequest(new InfoHash(infoHashRoot), e.getInputStream())
if (request.downloader != null && request.downloader.destination != e.destination) {
log.info("Downloader persona doesn't match their destination")
e.close()
return
}
Uploader uploader = new HashListUploader(e, sharedFiles.iterator().next().infoHash, request, request)
eventBus.publish(new UploadEvent(uploader : uploader))
try {
uploader.respond()
} finally {
eventBus.publish(new UploadFinishedEvent(uploader : uploader))
}
}
}

View File

@ -8,49 +8,16 @@ import java.nio.file.StandardOpenOption
import com.muwire.core.connection.Endpoint
class Uploader {
private final File file
private final Request request
private final Endpoint endpoint
private ByteBuffer mapped
abstract class Uploader {
protected final Endpoint endpoint
protected ByteBuffer mapped
Uploader(File file, Request request, Endpoint endpoint) {
this.file = file
this.request = request
Uploader(Endpoint endpoint) {
this.endpoint = endpoint
}
void respond() {
OutputStream os = endpoint.getOutputStream()
Range range = request.getRange()
if (range.start >= file.length() || range.end >= file.length()) {
os.write("416 Range Not Satisfiable\r\n\r\n".getBytes(StandardCharsets.US_ASCII))
os.flush()
return
}
os.write("200 OK\r\n".getBytes(StandardCharsets.US_ASCII))
os.write("Content-Range: $range.start-$range.end\r\n\r\n".getBytes(StandardCharsets.US_ASCII))
FileChannel channel
try {
channel = Files.newByteChannel(file.toPath(), EnumSet.of(StandardOpenOption.READ))
mapped = channel.map(FileChannel.MapMode.READ_ONLY, range.start, range.end - range.start + 1)
byte [] tmp = new byte[0x1 << 13]
while(mapped.hasRemaining()) {
int start = mapped.position()
synchronized(this) {
mapped.get(tmp, 0, Math.min(tmp.length, mapped.remaining()))
}
int read = mapped.position() - start
endpoint.getOutputStream().write(tmp, 0, read)
}
} finally {
try {channel?.close() } catch (IOException ignored) {}
endpoint.getOutputStream().flush()
}
}
abstract void respond()
public synchronized int getPosition() {
if (mapped == null)
return -1