Ressenger/ressenger_server.py

260 lines
8.6 KiB
Python
Executable File

#!/usr/bin/python3
import socket
import threading
import struct
import time
import sys
import pickle
import pathlib
import tempfile
import os
import traceback
import signal
import atexit
HOST = "0.0.0.0"
CACHE_PATH = pathlib.Path(f"~/.ressenger/cache_{sys.argv[1]}").expanduser()
CACHE = {} # in-memory cache: {timestamp_ns: bytes}
cache_lock = threading.Lock() # protect access to CACHE
# Globals used by cleanup / signal handlers
_writer = None # will hold CacheWriter instance
_server_sock = None # will hold listening socket
_shutdown_event = threading.Event()
_cleaned = False # ensure cleanup runs only once
def atomic_dump(obj, path: pathlib.Path):
"""Atomically write `obj` (pickled) to `path`.
Procedure:
* create a temporary file in the same directory,
* pickle into it, flush and fsync,
* os.replace to atomically replace the target file.
"""
path.parent.mkdir(parents=True, exist_ok=True)
fd, tmp_path = tempfile.mkstemp(dir=str(path.parent), prefix=path.name + ".", suffix=".tmp")
try:
with os.fdopen(fd, "wb") as f:
pickle.dump(obj, f, protocol=pickle.HIGHEST_PROTOCOL)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, str(path))
except Exception:
try:
os.remove(tmp_path)
except Exception:
pass
raise
def recv_exact(conn: socket.socket, n: int):
"""Read exactly n bytes from conn or return None on EOF."""
buf = bytearray()
while len(buf) < n:
chunk = conn.recv(n - len(buf))
if not chunk:
return None
buf.extend(chunk)
return bytes(buf)
class CacheWriter(threading.Thread):
"""Dedicated writer thread that writes the latest CACHE to disk.
Behaviour:
- When notified, waits `debounce` seconds to coalesce rapid updates,
- Then grabs a snapshot of CACHE (under cache_lock) and writes it atomically.
- On stop, writes once more to ensure latest data is persisted.
"""
def __init__(self, path: pathlib.Path, debounce: float = 0.05):
super().__init__(daemon=True)
self.path = path
self.debounce = debounce
self._event = threading.Event()
self._stop = threading.Event()
def notify(self):
"""Notify the writer that there is new data to persist."""
self._event.set()
def stop(self):
"""Signal the writer to stop and wait for it to finish."""
self._stop.set()
self._event.set()
self.join()
def run(self):
try:
while not self._stop.is_set():
# Wait until notified or stop is requested
self._event.wait()
if self._stop.is_set():
break
# Short sleep to coalesce multiple rapid updates
time.sleep(self.debounce)
# Take a snapshot of CACHE under lock
with cache_lock:
snapshot = dict(CACHE)
# Clear the event before writing so new notifications set it again
self._event.clear()
# Atomically write snapshot
try:
atomic_dump(snapshot, self.path)
except Exception:
traceback.print_exc()
# Final flush just before exit
with cache_lock:
final_snapshot = dict(CACHE)
try:
atomic_dump(final_snapshot, self.path)
except Exception:
traceback.print_exc()
except Exception:
traceback.print_exc()
def handle_client(conn: socket.socket, addr, writer: CacheWriter):
"""Per-connection handler: receive framed messages and update CACHE."""
print(f"Connected: {addr}")
try:
while True:
header = recv_exact(conn, 4)
if header is None:
print(f"Client {addr} disconnected")
break
length = struct.unpack("!I", header)[0] # big-endian unsigned int
if length == 0:
# optional: zero-length message as heartbeat
print(f"Received an empty message from {addr}")
continue
data = recv_exact(conn, length)
if data is None:
print(f"Client {addr} disconnected prematurely while reading data")
break
# Update in-memory CACHE under lock and notify writer
ts = time.time_ns()
with cache_lock:
# Using time.time_ns() as key is usually fine; for extreme throughput
# consider uuid.uuid4() or a sequence number to avoid collisions.
CACHE[ts] = data
# Notify writer to persist latest CACHE (writer snapshots under lock)
writer.notify()
print(f"Received {len(data)} bytes from {addr} and queued for persist")
except Exception:
traceback.print_exc()
finally:
try:
conn.close()
except Exception:
pass
print(f"Closed connection {addr}")
def _cleanup():
"""Stop writer, close server socket and remove cache file. Safe to call multiple times."""
global _cleaned, _writer, _server_sock
if _cleaned:
return
_cleaned = True
print("Cleanup: stopping writer and removing cache file (if present)...")
# Stop writer thread first (this will perform a final flush)
try:
if _writer is not None:
_writer.stop()
print("Writer thread stopped.")
except Exception:
traceback.print_exc()
# Close server socket if it exists
try:
if _server_sock is not None:
_server_sock.close()
print("Server socket closed.")
except Exception:
traceback.print_exc()
# Remove cache file (best-effort)
try:
if CACHE_PATH.exists():
os.remove(CACHE_PATH)
print(f"Removed cache file {CACHE_PATH}")
except Exception:
traceback.print_exc()
print("Cleanup complete.")
def _signal_handler(signum, frame):
"""Signal handler: set shutdown event and attempt to interrupt accept by closing socket."""
print(f"Signal {signum} received: initiating shutdown...")
_shutdown_event.set()
# attempt to close server socket to break accept() quickly
try:
if _server_sock is not None:
_server_sock.close()
except Exception:
pass
# Register atexit cleanup (runs on normal interpreter exit)
atexit.register(_cleanup)
# Register signal handlers for common termination signals
for sig in (signal.SIGINT, signal.SIGTERM, signal.SIGHUP, getattr(signal, "SIGQUIT", None)):
if sig is not None:
signal.signal(sig, _signal_handler)
def run_server(port: int):
"""Main server routine: start writer, accept connections, and honour shutdown requests."""
global _writer, _server_sock
# DO NOT load existing cache file on startup — always start fresh.
try:
CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
with cache_lock:
atomic_dump(CACHE, CACHE_PATH)
print("Initialised fresh empty CACHE on disk.")
except Exception:
print("Failed to initialise cache file on disk")
traceback.print_exc()
# Start writer thread
_writer = CacheWriter(CACHE_PATH, debounce=0.05)
_writer.start()
# Create listening socket with short timeout so we can check shutdown_event regularly
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
_server_sock = sock
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((HOST, port))
sock.listen()
sock.settimeout(1.0) # 1 second timeout to check for shutdown_event
print(f"Server listening on {HOST}:{port}...")
try:
while not _shutdown_event.is_set():
try:
conn, addr = sock.accept()
except socket.timeout:
continue
except OSError:
# socket may be closed from signal handler; check shutdown flag
if _shutdown_event.is_set():
break
else:
raise
# start handler thread for the accepted connection
t = threading.Thread(target=handle_client, args=(conn, addr, _writer), daemon=True)
t.start()
except KeyboardInterrupt:
print("Server shutting down (KeyboardInterrupt)...")
except Exception:
traceback.print_exc()
finally:
# Ensure cleanup runs here as well (stop writer and remove cache file)
_cleanup()
if __name__ == "__main__":
run_server(int(sys.argv[1]))