260 lines
8.6 KiB
Python
Executable File
260 lines
8.6 KiB
Python
Executable File
#!/usr/bin/python3
|
|
import socket
|
|
import threading
|
|
import struct
|
|
import time
|
|
import sys
|
|
import pickle
|
|
import pathlib
|
|
import tempfile
|
|
import os
|
|
import traceback
|
|
import signal
|
|
import atexit
|
|
|
|
HOST = "0.0.0.0"
|
|
CACHE_PATH = pathlib.Path(f"~/.ressenger/cache_{sys.argv[1]}").expanduser()
|
|
CACHE = {} # in-memory cache: {timestamp_ns: bytes}
|
|
cache_lock = threading.Lock() # protect access to CACHE
|
|
|
|
# Globals used by cleanup / signal handlers
|
|
_writer = None # will hold CacheWriter instance
|
|
_server_sock = None # will hold listening socket
|
|
_shutdown_event = threading.Event()
|
|
_cleaned = False # ensure cleanup runs only once
|
|
|
|
|
|
def atomic_dump(obj, path: pathlib.Path):
|
|
"""Atomically write `obj` (pickled) to `path`.
|
|
|
|
Procedure:
|
|
* create a temporary file in the same directory,
|
|
* pickle into it, flush and fsync,
|
|
* os.replace to atomically replace the target file.
|
|
"""
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
fd, tmp_path = tempfile.mkstemp(dir=str(path.parent), prefix=path.name + ".", suffix=".tmp")
|
|
try:
|
|
with os.fdopen(fd, "wb") as f:
|
|
pickle.dump(obj, f, protocol=pickle.HIGHEST_PROTOCOL)
|
|
f.flush()
|
|
os.fsync(f.fileno())
|
|
os.replace(tmp_path, str(path))
|
|
except Exception:
|
|
try:
|
|
os.remove(tmp_path)
|
|
except Exception:
|
|
pass
|
|
raise
|
|
|
|
|
|
def recv_exact(conn: socket.socket, n: int):
|
|
"""Read exactly n bytes from conn or return None on EOF."""
|
|
buf = bytearray()
|
|
while len(buf) < n:
|
|
chunk = conn.recv(n - len(buf))
|
|
if not chunk:
|
|
return None
|
|
buf.extend(chunk)
|
|
return bytes(buf)
|
|
|
|
|
|
class CacheWriter(threading.Thread):
|
|
"""Dedicated writer thread that writes the latest CACHE to disk.
|
|
|
|
Behaviour:
|
|
- When notified, waits `debounce` seconds to coalesce rapid updates,
|
|
- Then grabs a snapshot of CACHE (under cache_lock) and writes it atomically.
|
|
- On stop, writes once more to ensure latest data is persisted.
|
|
"""
|
|
|
|
def __init__(self, path: pathlib.Path, debounce: float = 0.05):
|
|
super().__init__(daemon=True)
|
|
self.path = path
|
|
self.debounce = debounce
|
|
self._event = threading.Event()
|
|
self._stop = threading.Event()
|
|
|
|
def notify(self):
|
|
"""Notify the writer that there is new data to persist."""
|
|
self._event.set()
|
|
|
|
def stop(self):
|
|
"""Signal the writer to stop and wait for it to finish."""
|
|
self._stop.set()
|
|
self._event.set()
|
|
self.join()
|
|
|
|
def run(self):
|
|
try:
|
|
while not self._stop.is_set():
|
|
# Wait until notified or stop is requested
|
|
self._event.wait()
|
|
if self._stop.is_set():
|
|
break
|
|
# Short sleep to coalesce multiple rapid updates
|
|
time.sleep(self.debounce)
|
|
# Take a snapshot of CACHE under lock
|
|
with cache_lock:
|
|
snapshot = dict(CACHE)
|
|
# Clear the event before writing so new notifications set it again
|
|
self._event.clear()
|
|
# Atomically write snapshot
|
|
try:
|
|
atomic_dump(snapshot, self.path)
|
|
except Exception:
|
|
traceback.print_exc()
|
|
# Final flush just before exit
|
|
with cache_lock:
|
|
final_snapshot = dict(CACHE)
|
|
try:
|
|
atomic_dump(final_snapshot, self.path)
|
|
except Exception:
|
|
traceback.print_exc()
|
|
except Exception:
|
|
traceback.print_exc()
|
|
|
|
|
|
def handle_client(conn: socket.socket, addr, writer: CacheWriter):
|
|
"""Per-connection handler: receive framed messages and update CACHE."""
|
|
print(f"Connected: {addr}")
|
|
try:
|
|
while True:
|
|
header = recv_exact(conn, 4)
|
|
if header is None:
|
|
print(f"Client {addr} disconnected")
|
|
break
|
|
length = struct.unpack("!I", header)[0] # big-endian unsigned int
|
|
if length == 0:
|
|
# optional: zero-length message as heartbeat
|
|
print(f"Received an empty message from {addr}")
|
|
continue
|
|
data = recv_exact(conn, length)
|
|
if data is None:
|
|
print(f"Client {addr} disconnected prematurely while reading data")
|
|
break
|
|
# Update in-memory CACHE under lock and notify writer
|
|
ts = time.time_ns()
|
|
with cache_lock:
|
|
# Using time.time_ns() as key is usually fine; for extreme throughput
|
|
# consider uuid.uuid4() or a sequence number to avoid collisions.
|
|
CACHE[ts] = data
|
|
# Notify writer to persist latest CACHE (writer snapshots under lock)
|
|
writer.notify()
|
|
print(f"Received {len(data)} bytes from {addr} and queued for persist")
|
|
except Exception:
|
|
traceback.print_exc()
|
|
finally:
|
|
try:
|
|
conn.close()
|
|
except Exception:
|
|
pass
|
|
print(f"Closed connection {addr}")
|
|
|
|
|
|
def _cleanup():
|
|
"""Stop writer, close server socket and remove cache file. Safe to call multiple times."""
|
|
global _cleaned, _writer, _server_sock
|
|
if _cleaned:
|
|
return
|
|
_cleaned = True
|
|
print("Cleanup: stopping writer and removing cache file (if present)...")
|
|
# Stop writer thread first (this will perform a final flush)
|
|
try:
|
|
if _writer is not None:
|
|
_writer.stop()
|
|
print("Writer thread stopped.")
|
|
except Exception:
|
|
traceback.print_exc()
|
|
# Close server socket if it exists
|
|
try:
|
|
if _server_sock is not None:
|
|
_server_sock.close()
|
|
print("Server socket closed.")
|
|
except Exception:
|
|
traceback.print_exc()
|
|
# Remove cache file (best-effort)
|
|
try:
|
|
if CACHE_PATH.exists():
|
|
os.remove(CACHE_PATH)
|
|
print(f"Removed cache file {CACHE_PATH}")
|
|
except Exception:
|
|
traceback.print_exc()
|
|
print("Cleanup complete.")
|
|
|
|
|
|
def _signal_handler(signum, frame):
|
|
"""Signal handler: set shutdown event and attempt to interrupt accept by closing socket."""
|
|
print(f"Signal {signum} received: initiating shutdown...")
|
|
_shutdown_event.set()
|
|
# attempt to close server socket to break accept() quickly
|
|
try:
|
|
if _server_sock is not None:
|
|
_server_sock.close()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# Register atexit cleanup (runs on normal interpreter exit)
|
|
atexit.register(_cleanup)
|
|
|
|
# Register signal handlers for common termination signals
|
|
for sig in (signal.SIGINT, signal.SIGTERM, signal.SIGHUP, getattr(signal, "SIGQUIT", None)):
|
|
if sig is not None:
|
|
signal.signal(sig, _signal_handler)
|
|
|
|
|
|
def run_server(port: int):
|
|
"""Main server routine: start writer, accept connections, and honour shutdown requests."""
|
|
global _writer, _server_sock
|
|
|
|
# DO NOT load existing cache file on startup — always start fresh.
|
|
try:
|
|
CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
with cache_lock:
|
|
atomic_dump(CACHE, CACHE_PATH)
|
|
print("Initialised fresh empty CACHE on disk.")
|
|
except Exception:
|
|
print("Failed to initialise cache file on disk")
|
|
traceback.print_exc()
|
|
|
|
# Start writer thread
|
|
_writer = CacheWriter(CACHE_PATH, debounce=0.05)
|
|
_writer.start()
|
|
|
|
# Create listening socket with short timeout so we can check shutdown_event regularly
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
_server_sock = sock
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
sock.bind((HOST, port))
|
|
sock.listen()
|
|
sock.settimeout(1.0) # 1 second timeout to check for shutdown_event
|
|
print(f"Server listening on {HOST}:{port}...")
|
|
|
|
try:
|
|
while not _shutdown_event.is_set():
|
|
try:
|
|
conn, addr = sock.accept()
|
|
except socket.timeout:
|
|
continue
|
|
except OSError:
|
|
# socket may be closed from signal handler; check shutdown flag
|
|
if _shutdown_event.is_set():
|
|
break
|
|
else:
|
|
raise
|
|
# start handler thread for the accepted connection
|
|
t = threading.Thread(target=handle_client, args=(conn, addr, _writer), daemon=True)
|
|
t.start()
|
|
except KeyboardInterrupt:
|
|
print("Server shutting down (KeyboardInterrupt)...")
|
|
except Exception:
|
|
traceback.print_exc()
|
|
finally:
|
|
# Ensure cleanup runs here as well (stop writer and remove cache file)
|
|
_cleanup()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run_server(int(sys.argv[1]))
|