* Use MessageThrottle to throttle incoming queries

* Drop queries without "originator" Persona field
* Enforce that first hop queries originator == connection endpoint
auto-update
Zlatin Balevsky 2022-02-10 17:00:41 +00:00
parent 92f30aeab7
commit e2bd1dc613
No known key found for this signature in database
GPG Key ID: A72832072D525E41
1 changed files with 15 additions and 25 deletions

View File

@ -1,13 +1,10 @@
package com.muwire.core.connection
import com.muwire.core.util.MessageThrottle
import java.nio.charset.StandardCharsets
import java.util.concurrent.BlockingQueue
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadFactory
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.logging.Level
@ -47,7 +44,7 @@ abstract class Connection implements Closeable {
private final AtomicBoolean running = new AtomicBoolean()
private final BlockingQueue messages = new LinkedBlockingQueue()
private final Thread reader, writer
private final LinkedList<Long> searchTimestamps = new LinkedList<>()
private final MessageThrottle queryThrottle = new MessageThrottle(INTERVAL, SEARCHES)
protected final String name
@ -226,22 +223,8 @@ abstract class Connection implements Closeable {
}
}
private boolean throttleSearch() {
final long now = System.currentTimeMillis()
if (searchTimestamps.size() < SEARCHES) {
searchTimestamps.addLast(now)
return false
}
Long oldest = searchTimestamps.getFirst()
if (now - oldest.longValue() < INTERVAL)
return true
searchTimestamps.addLast(now)
searchTimestamps.removeFirst()
false
}
protected void handleSearch(def search) {
if (throttleSearch()) {
if (!queryThrottle.allow(System.currentTimeMillis())) {
log.info("dropping excessive search")
return
}
@ -263,13 +246,20 @@ abstract class Connection implements Closeable {
return
}
Persona originator = null
Persona originator
if (search.originator != null) {
originator = new Persona(new ByteArrayInputStream(Base64.decode(search.originator)))
if (originator.destination != replyTo) {
log.info("originator doesn't match destination")
return
}
if (search.firstHop && originator.destination != endpoint.getDestination()) {
log.info("first hop query originator does not match endpoint address")
return
}
} else {
log.info("dropping search without originator")
return
}
boolean oob = false
@ -291,7 +281,7 @@ abstract class Connection implements Closeable {
if (search.regex != null)
regex = search.regex
byte[] sig = null
byte[] sig
if (search.sig != null) {
sig = Base64.decode(search.sig)
byte [] payload
@ -311,8 +301,8 @@ abstract class Connection implements Closeable {
return
}
byte[] sig2 = null
long queryTime = 0
byte[] sig2
long queryTime
if (search.sig2 != null) {
if (search.queryTime == null) {
log.info("extended signature but no timestamp")