From e2bd1dc613dac4eeda5a7ece0919a3622015a6da Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Thu, 10 Feb 2022 17:00:41 +0000 Subject: [PATCH] * Use MessageThrottle to throttle incoming queries * Drop queries without "originator" Persona field * Enforce that first hop queries originator == connection endpoint --- .../muwire/core/connection/Connection.groovy | 40 +++++++------------ 1 file changed, 15 insertions(+), 25 deletions(-) diff --git a/core/src/main/groovy/com/muwire/core/connection/Connection.groovy b/core/src/main/groovy/com/muwire/core/connection/Connection.groovy index ee5e3d8b..703e72dd 100644 --- a/core/src/main/groovy/com/muwire/core/connection/Connection.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/Connection.groovy @@ -1,13 +1,10 @@ package com.muwire.core.connection +import com.muwire.core.util.MessageThrottle + import java.nio.charset.StandardCharsets import java.util.concurrent.BlockingQueue -import java.util.concurrent.CountDownLatch -import java.util.concurrent.ExecutorService -import java.util.concurrent.Executors import java.util.concurrent.LinkedBlockingQueue -import java.util.concurrent.ThreadFactory -import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean import java.util.logging.Level @@ -47,7 +44,7 @@ abstract class Connection implements Closeable { private final AtomicBoolean running = new AtomicBoolean() private final BlockingQueue messages = new LinkedBlockingQueue() private final Thread reader, writer - private final LinkedList searchTimestamps = new LinkedList<>() + private final MessageThrottle queryThrottle = new MessageThrottle(INTERVAL, SEARCHES) protected final String name @@ -226,22 +223,8 @@ abstract class Connection implements Closeable { } } - private boolean throttleSearch() { - final long now = System.currentTimeMillis() - if (searchTimestamps.size() < SEARCHES) { - searchTimestamps.addLast(now) - return false - } - Long oldest = searchTimestamps.getFirst() - if (now - oldest.longValue() < INTERVAL) - return true - searchTimestamps.addLast(now) - searchTimestamps.removeFirst() - false - } - protected void handleSearch(def search) { - if (throttleSearch()) { + if (!queryThrottle.allow(System.currentTimeMillis())) { log.info("dropping excessive search") return } @@ -263,13 +246,20 @@ abstract class Connection implements Closeable { return } - Persona originator = null + Persona originator if (search.originator != null) { originator = new Persona(new ByteArrayInputStream(Base64.decode(search.originator))) if (originator.destination != replyTo) { log.info("originator doesn't match destination") return } + if (search.firstHop && originator.destination != endpoint.getDestination()) { + log.info("first hop query originator does not match endpoint address") + return + } + } else { + log.info("dropping search without originator") + return } boolean oob = false @@ -291,7 +281,7 @@ abstract class Connection implements Closeable { if (search.regex != null) regex = search.regex - byte[] sig = null + byte[] sig if (search.sig != null) { sig = Base64.decode(search.sig) byte [] payload @@ -311,8 +301,8 @@ abstract class Connection implements Closeable { return } - byte[] sig2 = null - long queryTime = 0 + byte[] sig2 + long queryTime if (search.sig2 != null) { if (search.queryTime == null) { log.info("extended signature but no timestamp")