mirror of https://github.com/zlatinb/muwire
backend for serving collection requests
parent
2c38bc307c
commit
475a50c9c1
|
@ -429,7 +429,7 @@ public class Core {
|
|||
I2PAcceptor i2pAcceptor = new I2PAcceptor(i2pSocketManager)
|
||||
connectionAcceptor = new ConnectionAcceptor(eventBus, connectionManager, props,
|
||||
i2pAcceptor, hostCache, trustService, searchManager, uploadManager, fileManager, connectionEstablisher,
|
||||
certificateManager, chatServer)
|
||||
certificateManager, chatServer, collectionManager)
|
||||
|
||||
|
||||
log.info("initializing hasher service")
|
||||
|
|
|
@ -51,6 +51,10 @@ class CollectionManager {
|
|||
synchronized List<FileCollection> getCollections() {
|
||||
new ArrayList<>(rootToCollection.values())
|
||||
}
|
||||
|
||||
synchronized FileCollection getByInfoHash(InfoHash ih) {
|
||||
rootToCollection.get(ih)
|
||||
}
|
||||
|
||||
void onAllFilesLoadedEvent(AllFilesLoadedEvent e) {
|
||||
diskIO.execute({load()} as Runnable)
|
||||
|
|
|
@ -24,7 +24,9 @@ class FileCollection {
|
|||
final Set<FileCollectionItem> files = new LinkedHashSet<>()
|
||||
|
||||
final PathTree tree
|
||||
|
||||
|
||||
final List<SearchHit> hits = new ArrayList<>()
|
||||
|
||||
FileCollection(long timestamp, Persona author, String comment, Set<FileCollectionItem> files,
|
||||
SigningPrivateKey spk) {
|
||||
this.timestamp = timestamp;
|
||||
|
@ -144,6 +146,10 @@ class FileCollection {
|
|||
files.size()
|
||||
}
|
||||
|
||||
public void hit(Persona searcher) {
|
||||
hits.add(new SearchHit(searcher))
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
hashCode
|
||||
|
@ -157,4 +163,13 @@ class FileCollection {
|
|||
Objects.equals(comment, other.comment) &&
|
||||
Objects.equals(files, other.files)
|
||||
}
|
||||
|
||||
public static class SearchHit {
|
||||
final Persona searcher
|
||||
final long timestamp
|
||||
SearchHit(Persona searcher) {
|
||||
this.searcher = searcher
|
||||
this.timestamp = System.currentTimeMillis()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,6 +17,8 @@ import com.muwire.core.MuWireSettings
|
|||
import com.muwire.core.Persona
|
||||
import com.muwire.core.SharedFile
|
||||
import com.muwire.core.chat.ChatServer
|
||||
import com.muwire.core.collections.CollectionManager
|
||||
import com.muwire.core.collections.FileCollection
|
||||
import com.muwire.core.filecert.Certificate
|
||||
import com.muwire.core.filecert.CertificateManager
|
||||
import com.muwire.core.filefeeds.FeedItems
|
||||
|
@ -54,6 +56,7 @@ class ConnectionAcceptor {
|
|||
final ConnectionEstablisher establisher
|
||||
final CertificateManager certificateManager
|
||||
final ChatServer chatServer
|
||||
final CollectionManager collectionManager
|
||||
|
||||
final ExecutorService acceptorThread
|
||||
final ExecutorService handshakerThreads
|
||||
|
@ -66,7 +69,7 @@ class ConnectionAcceptor {
|
|||
MuWireSettings settings, I2PAcceptor acceptor, HostCache hostCache,
|
||||
TrustService trustService, SearchManager searchManager, UploadManager uploadManager,
|
||||
FileManager fileManager, ConnectionEstablisher establisher, CertificateManager certificateManager,
|
||||
ChatServer chatServer) {
|
||||
ChatServer chatServer, CollectionManager collectionManager) {
|
||||
this.eventBus = eventBus
|
||||
this.manager = manager
|
||||
this.settings = settings
|
||||
|
@ -79,6 +82,7 @@ class ConnectionAcceptor {
|
|||
this.establisher = establisher
|
||||
this.certificateManager = certificateManager
|
||||
this.chatServer = chatServer
|
||||
this.collectionManager = collectionManager
|
||||
|
||||
acceptorThread = Executors.newSingleThreadExecutor { r ->
|
||||
def rv = new Thread(r)
|
||||
|
@ -166,6 +170,9 @@ class ConnectionAcceptor {
|
|||
case (byte)'F':
|
||||
processFEED(e)
|
||||
break
|
||||
case (byte)'M':
|
||||
processMETAFILE(e)
|
||||
break
|
||||
default:
|
||||
throw new Exception("Invalid read $read")
|
||||
}
|
||||
|
@ -601,5 +608,62 @@ class ConnectionAcceptor {
|
|||
e.close()
|
||||
}
|
||||
}
|
||||
|
||||
private void processMETAFILE(Endpoint e) {
|
||||
DataOutputStream dos = null
|
||||
try {
|
||||
byte [] ETAFILE = new byte[8]
|
||||
DataInputStream dis = new DataInputStream(e.getInputStream())
|
||||
dis.readFully(ETAFILE)
|
||||
if (ETAFILE != "ETAFILE ".getBytes(StandardCharsets.US_ASCII))
|
||||
throw new Exception("invalid METAFILE connection")
|
||||
|
||||
String infoHashString = DataUtil.readTillRN(dis)
|
||||
def infoHashes = infoHashString.split(",").toList().collect {new InfoHash(Base64.decode(it))}
|
||||
infoHashes = new HashSet<>(infoHashes)
|
||||
|
||||
Map<String,String> headers = DataUtil.readAllHeaders(dis)
|
||||
if (headers['Version'] != "1")
|
||||
throw new Exception("Unknown version ${headers['Version']}")
|
||||
|
||||
Persona client = null
|
||||
if (headers.containsKey("Persona"))
|
||||
client = new Persona(Base64.decode(headers['Persona']))
|
||||
|
||||
Map<InfoHash, FileCollection> available = new HashMap<>()
|
||||
infoHashes.each {
|
||||
FileCollection col = collectionManager.getByInfoHash(it)
|
||||
if (col != null) {
|
||||
available.put(it, col)
|
||||
col.hit(client)
|
||||
}
|
||||
}
|
||||
|
||||
OutputStream os = e.getOutputStream()
|
||||
|
||||
if (available.isEmpty()) {
|
||||
os.write("404\r\n\r\n".getBytes(StandardCharsets.US_ASCII))
|
||||
return
|
||||
}
|
||||
|
||||
os.write("200\r\n".getBytes(StandardCharsets.US_ASCII))
|
||||
os.write("Version:1\r\n".getBytes(StandardCharsets.US_ASCII))
|
||||
os.write("Count:${available.size()}\r\n".getBytes(StandardCharsets.US_ASCII))
|
||||
os.write("\r\n".getBytes(StandardCharsets.US_ASCII))
|
||||
|
||||
dos = new DataOutputStream(new GZIPOutputStream(os))
|
||||
available.each { hash, collection ->
|
||||
dos.write(hash.getRoot())
|
||||
collection.write(dos)
|
||||
}
|
||||
|
||||
} finally {
|
||||
try {
|
||||
dos?.flush()
|
||||
dos?.close()
|
||||
} catch (Exception ignore) {}
|
||||
e.close()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue