Ressenger/ressenger_server.py

273 lines
9.2 KiB
Python
Executable File

#!/usr/bin/env python3
"""
TCP server that receives binary blobs and persists them to a cache file.
This variant ensures the cache file is removed on shutdown (where possible).
Behaviour:
- Always start with a fresh, empty CACHE (do NOT load existing file).
- Use a dedicated CacheWriter thread to debounce and atomically write CACHE.
- On exit (signals, KeyboardInterrupt, normal interpreter exit) stop writer and remove cache file.
Note: SIGKILL (kill -9) and sudden power loss cannot be trapped; cleanup cannot run then.
"""
# English comments throughout (British English spelling).
import socket
import threading
import struct
import time
import sys
import pickle
import pathlib
import tempfile
import os
import traceback
import signal
import atexit
HOST = "0.0.0.0"
CACHE_PATH = pathlib.Path(f"~/.ressenger/cache_{sys.argv[1]}").expanduser()
CACHE = {} # in-memory cache: {timestamp_ns: bytes}
cache_lock = threading.Lock() # protect access to CACHE
# Globals used by cleanup / signal handlers
_writer = None # will hold CacheWriter instance
_server_sock = None # will hold listening socket
_shutdown_event = threading.Event()
_cleaned = False # ensure cleanup runs only once
def atomic_dump(obj, path: pathlib.Path):
"""Atomically write `obj` (pickled) to `path`.
Procedure:
* create a temporary file in the same directory,
* pickle into it, flush and fsync,
* os.replace to atomically replace the target file.
"""
path.parent.mkdir(parents=True, exist_ok=True)
fd, tmp_path = tempfile.mkstemp(dir=str(path.parent), prefix=path.name + ".", suffix=".tmp")
try:
with os.fdopen(fd, "wb") as f:
pickle.dump(obj, f, protocol=pickle.HIGHEST_PROTOCOL)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, str(path))
except Exception:
try:
os.remove(tmp_path)
except Exception:
pass
raise
def recv_exact(conn: socket.socket, n: int):
"""Read exactly n bytes from conn or return None on EOF."""
buf = bytearray()
while len(buf) < n:
chunk = conn.recv(n - len(buf))
if not chunk:
return None
buf.extend(chunk)
return bytes(buf)
class CacheWriter(threading.Thread):
"""Dedicated writer thread that writes the latest CACHE to disk.
Behaviour:
- When notified, waits `debounce` seconds to coalesce rapid updates,
- Then grabs a snapshot of CACHE (under cache_lock) and writes it atomically.
- On stop, writes once more to ensure latest data is persisted.
"""
def __init__(self, path: pathlib.Path, debounce: float = 0.05):
super().__init__(daemon=True)
self.path = path
self.debounce = debounce
self._event = threading.Event()
self._stop = threading.Event()
def notify(self):
"""Notify the writer that there is new data to persist."""
self._event.set()
def stop(self):
"""Signal the writer to stop and wait for it to finish."""
self._stop.set()
self._event.set()
self.join()
def run(self):
try:
while not self._stop.is_set():
# Wait until notified or stop is requested
self._event.wait()
if self._stop.is_set():
break
# Short sleep to coalesce multiple rapid updates
time.sleep(self.debounce)
# Take a snapshot of CACHE under lock
with cache_lock:
snapshot = dict(CACHE)
# Clear the event before writing so new notifications set it again
self._event.clear()
# Atomically write snapshot
try:
atomic_dump(snapshot, self.path)
except Exception:
traceback.print_exc()
# Final flush just before exit
with cache_lock:
final_snapshot = dict(CACHE)
try:
atomic_dump(final_snapshot, self.path)
except Exception:
traceback.print_exc()
except Exception:
traceback.print_exc()
def handle_client(conn: socket.socket, addr, writer: CacheWriter):
"""Per-connection handler: receive framed messages and update CACHE."""
print(f"Connected: {addr}")
try:
while True:
header = recv_exact(conn, 4)
if header is None:
print(f"Client {addr} disconnected")
break
length = struct.unpack("!I", header)[0] # big-endian unsigned int
if length == 0:
# optional: zero-length message as heartbeat
print(f"Received an empty message from {addr}")
continue
data = recv_exact(conn, length)
if data is None:
print(f"Client {addr} disconnected prematurely while reading data")
break
# Update in-memory CACHE under lock and notify writer
ts = time.time_ns()
with cache_lock:
# Using time.time_ns() as key is usually fine; for extreme throughput
# consider uuid.uuid4() or a sequence number to avoid collisions.
CACHE[ts] = data
# Notify writer to persist latest CACHE (writer snapshots under lock)
writer.notify()
print(f"Received {len(data)} bytes from {addr} and queued for persist")
except Exception:
traceback.print_exc()
finally:
try:
conn.close()
except Exception:
pass
print(f"Closed connection {addr}")
def _cleanup():
"""Stop writer, close server socket and remove cache file. Safe to call multiple times."""
global _cleaned, _writer, _server_sock
if _cleaned:
return
_cleaned = True
print("Cleanup: stopping writer and removing cache file (if present)...")
# Stop writer thread first (this will perform a final flush)
try:
if _writer is not None:
_writer.stop()
print("Writer thread stopped.")
except Exception:
traceback.print_exc()
# Close server socket if it exists
try:
if _server_sock is not None:
_server_sock.close()
print("Server socket closed.")
except Exception:
traceback.print_exc()
# Remove cache file (best-effort)
try:
if CACHE_PATH.exists():
os.remove(CACHE_PATH)
print(f"Removed cache file {CACHE_PATH}")
except Exception:
traceback.print_exc()
print("Cleanup complete.")
def _signal_handler(signum, frame):
"""Signal handler: set shutdown event and attempt to interrupt accept by closing socket."""
print(f"Signal {signum} received: initiating shutdown...")
_shutdown_event.set()
# attempt to close server socket to break accept() quickly
try:
if _server_sock is not None:
_server_sock.close()
except Exception:
pass
# Register atexit cleanup (runs on normal interpreter exit)
atexit.register(_cleanup)
# Register signal handlers for common termination signals
for sig in (signal.SIGINT, signal.SIGTERM, signal.SIGHUP, getattr(signal, "SIGQUIT", None)):
if sig is not None:
signal.signal(sig, _signal_handler)
def run_server(port: int):
"""Main server routine: start writer, accept connections, and honour shutdown requests."""
global _writer, _server_sock
# DO NOT load existing cache file on startup — always start fresh.
try:
CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
with cache_lock:
atomic_dump(CACHE, CACHE_PATH)
print("Initialised fresh empty CACHE on disk (existing file not loaded).")
except Exception:
print("Failed to initialise cache file on disk")
traceback.print_exc()
# Start writer thread
_writer = CacheWriter(CACHE_PATH, debounce=0.05)
_writer.start()
# Create listening socket with short timeout so we can check shutdown_event regularly
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
_server_sock = sock
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((HOST, port))
sock.listen()
sock.settimeout(1.0) # 1 second timeout to check for shutdown_event
print(f"Server listening on {HOST}:{port}...")
try:
while not _shutdown_event.is_set():
try:
conn, addr = sock.accept()
except socket.timeout:
continue
except OSError:
# socket may be closed from signal handler; check shutdown flag
if _shutdown_event.is_set():
break
else:
raise
# start handler thread for the accepted connection
t = threading.Thread(target=handle_client, args=(conn, addr, _writer), daemon=True)
t.start()
except KeyboardInterrupt:
print("Server shutting down (KeyboardInterrupt)...")
except Exception:
traceback.print_exc()
finally:
# Ensure cleanup runs here as well (stop writer and remove cache file)
_cleanup()
if __name__ == "__main__":
run_server(int(sys.argv[1]))