diff --git a/UserUnlock.ui b/UserUnlock.ui new file mode 100644 index 0000000..a86dd13 --- /dev/null +++ b/UserUnlock.ui @@ -0,0 +1,83 @@ + + + Dialog + + + + 0 + 0 + 480 + 180 + + + + + 480 + 180 + + + + + 480 + 180 + + + + Unlock the User + + + + + 20 + 40 + 441 + 18 + + + + Password: + + + + + + 20 + 70 + 441 + 32 + + + + QLineEdit::EchoMode::Password + + + + + + 370 + 130 + 88 + 34 + + + + OK + + + + + + 260 + 130 + 88 + 34 + + + + Cancel + + + + + + diff --git a/ressenger.py b/ressenger.py index 4502abf..2a7c781 100755 --- a/ressenger.py +++ b/ressenger.py @@ -1,12 +1,45 @@ #!/usr/bin/python3 -import sys, pathlib, re, subprocess -import ressenger_initialisation, ressenger_client, ressenger_common, ressenger_cryptography, ressenger_exceptions, ressenger_server +import sys, pathlib, re, subprocess, signal, time, os +import ressenger_initialisation, ressenger_client, ressenger_common, ressenger_cryptography, ressenger_exceptions from PySide6.QtUiTools import QUiLoader from PySide6.QtWidgets import QApplication, QListWidget, QTextBrowser, QTextEdit, QPushButton, QFileDialog, QMessageBox, QDialog, QVBoxLayout, QLineEdit, QLabel from PySide6.QtCore import QFile, QIODevice, Qt _B32_RE = re.compile(r'^[a-z2-7]{52}\.b32\.i2p\.?$', re.IGNORECASE) +def run_in_background(pyfile, args=None): + args = args or [] + cmd = [sys.executable, pyfile] + args + # start_new_session=True makes the child its own process group leader + p = subprocess.Popen(cmd, start_new_session=True) + return p + +def graceful_stop(proc, timeout=10.0): + if proc is None: + return + try: + # On POSIX: send to process group + os.killpg(os.getpgid(proc.pid), signal.SIGTERM) + except Exception: + try: + proc.terminate() + except Exception: + pass + try: + proc.wait(timeout=timeout) + except Exception: + try: + os.killpg(os.getpgid(proc.pid), signal.SIGKILL) + except Exception: + try: + proc.kill() + except Exception: + pass + try: + proc.wait(timeout=5.0) + except Exception: + pass + def is_i2p_b32_address(s: str) -> bool: if not isinstance(s, str): return False @@ -147,6 +180,42 @@ def initialise_user(profile): if not succeed: exit() +def user_unlock(username): + ui_path = "UserUnlock.ui" + in_ui_file = QFile(ui_path) + if not in_ui_file.open(QIODevice.ReadOnly): + print(f"Cannot open {ui_path}: {in_ui_file.errorString()}") + return None + + loader = QUiLoader() + dlg = loader.load(in_ui_file) + in_ui_file.close() + + if dlg is None: + return None + + line_edit = dlg.findChild(QLineEdit, "lineEdit") # QLineEdit (line edit) + ok_btn = dlg.findChild(QPushButton, "pushButton") # QPushButton (push button) + cancel_btn = dlg.findChild(QPushButton, "pushButton_2") # QPushButton (push button) + + if ok_btn is not None: + ok_btn.clicked.connect(dlg.accept) + if cancel_btn is not None: + cancel_btn.clicked.connect(dlg.reject) + + result = dlg.exec() # QDialog.Accepted or QDialog.Rejected + password = line_edit.text() if line_edit is not None else "" + + if result==QDialog.Accepted: + try: + ressenger_common.load_user(password, username) + except: + message_box_1('Error', 'Invalid Password!') + return None + return password + else: + return None + if __name__ == "__main__": args=parseArguments() @@ -183,6 +252,14 @@ List of options: else: exit() + user_cache=pathlib.Path(f"~/.ressenger/cache_{profile}").expanduser() + if not user_cache.exists(): + credentials=user_unlock(profile) + if credentials==None: + exit() + user_daemon = run_in_background('ressenger_user.py', args=[profile, credentials]) + credentials=None + ui_file_name = "MainWindow.ui" ui_file = QFile(ui_file_name) if not ui_file.open(QIODevice.ReadOnly): diff --git a/ressenger_common.py b/ressenger_common.py index e267a9a..7739f1a 100755 --- a/ressenger_common.py +++ b/ressenger_common.py @@ -107,15 +107,15 @@ def contact_hash(enc_pub, sig_pub): return h.digest() def send_message(text, destination_addr, destination_port, reply_addr, reply_port, enc_pub_key, sig_pri_key, sig_pub_key): - data={'type':'text', 'sent_time'=time.time_ns(), 'data':{'content':text}, 'reply':{'addr':reply_addr, 'port':reply_port}} + data={'type':'text', 'sent_time':time.time_ns(), 'data':{'content':text}, 'reply':{'addr':reply_addr, 'port':reply_port}} data_encrypted, data_signature=ressenger_cryptography.encrypt(pickle.dumps(data, protocol=pickle.HIGHEST_PROTOCOL), enc_pub_key, sig_pri_key) packet={'data':data_encrypted, 'sig':data_signature, 'enc_pub':enc_pub_key, 'sig_pub':sig_pub_key} errorno=ressenger_client.send_data_i2p(pickle.dumps(packet, protocol=pickle.HIGHEST_PROTOCOL), destination_addr, destination_port) event={'type':'sent', 'data':data, 'contact':contact_hash(enc_pub_key, sig_pub_key), 'errorno':errorno} return event -def send_file(filename, filebytes, destination_addr, destination_port, reply_addr, reply_port, enc_pub_key, sig_pri_key, sig_pub_key) - data={'type':'file', 'sent_time'=time.time_ns(), 'data':{'content':filebytes, 'filename':filename}, 'reply':{'addr':reply_addr, 'port':reply_port}} +def send_file(filename, filebytes, destination_addr, destination_port, reply_addr, reply_port, enc_pub_key, sig_pri_key, sig_pub_key): + data={'type':'file', 'sent_time':time.time_ns(), 'data':{'content':filebytes, 'filename':filename}, 'reply':{'addr':reply_addr, 'port':reply_port}} data_encrypted, data_signature=ressenger_cryptography.encrypt(pickle.dumps(data, protocol=pickle.HIGHEST_PROTOCOL), enc_pub_key, sig_pri_key) packet={'data':data_encrypted, 'sig':data_signature, 'enc_pub':enc_pub_key, 'sig_pub':sig_pub_key} errorno=ressenger_client.send_data_i2p(pickle.dumps(packet, protocol=pickle.HIGHEST_PROTOCOL), destination_addr, destination_port) diff --git a/ressenger_server.py b/ressenger_server.py index 16cfc1e..c27890e 100755 --- a/ressenger_server.py +++ b/ressenger_server.py @@ -12,10 +12,11 @@ import traceback import signal import atexit -HOST = "0.0.0.0" -CACHE_PATH = pathlib.Path(f"~/.ressenger/cache_{sys.argv[1]}").expanduser() -CACHE = {} # in-memory cache: {timestamp_ns: bytes} -cache_lock = threading.Lock() # protect access to CACHE +if __name__ == "__main__": + HOST = "0.0.0.0" + CACHE_PATH = pathlib.Path(f"~/.ressenger/cache_{sys.argv[1]}").expanduser() + CACHE = {} # in-memory cache: {timestamp_ns: bytes} + cache_lock = threading.Lock() # protect access to CACHE # Globals used by cleanup / signal handlers _writer = None # will hold CacheWriter instance diff --git a/ressenger_user.py b/ressenger_user.py index cf55e42..03d1d56 100755 --- a/ressenger_user.py +++ b/ressenger_user.py @@ -1,51 +1,279 @@ #!/usr/bin/python3 -import sys, pathlib, subprocess, os -import ressenger_common, ressenger_exceptions, ressenger_cryptography +# -*- coding: utf-8 -*- -# TODO: Add interruption handling to this user daemon +import sys +import pathlib +import subprocess +import os +import signal +import time +import pickle +import traceback +import ressenger_common +import ressenger_exceptions +import ressenger_cryptography + +# --- helper: start process in background (returns subprocess.Popen) --- def run_in_background(pyfile, args=None, stdout=None, stderr=None): args = args or [] cmd = [sys.executable, pyfile] + args - p = subprocess.Popen( cmd, stdout=stdout or subprocess.DEVNULL, stderr=stderr or subprocess.DEVNULL, - start_new_session=True, + start_new_session=True, # new session -> we can signal the whole process group via os.killpg close_fds=True ) return p +# --- helper: terminate process group (POSIX), with fallback --- +def terminate_process_group(proc, sig=signal.SIGTERM, wait_timeout=5.0): + """Send sig to the process group of proc (POSIX), wait for wait_timeout seconds. + If the process does not exit, send SIGKILL. Falls back to signalling the single + process if process-group signalling is unavailable. + """ + if proc is None: + return + try: + os.killpg(os.getpgid(proc.pid), sig) + except Exception: + # Fallback: attempt to signal the process itself + try: + proc.send_signal(sig) + except Exception: + try: + proc.terminate() + except Exception: + pass + # Wait for process to exit + try: + proc.wait(timeout=wait_timeout) + except Exception: + # Force kill if still alive + try: + os.killpg(os.getpgid(proc.pid), signal.SIGKILL) + except Exception: + try: + proc.kill() + except Exception: + pass + try: + proc.wait(timeout=2.0) + except Exception: + pass + +# --- main daemon --- def user_daemon(username, password): - user=ressenger_common.load_user(password, username) - run_in_background('ressenger_server.py', args=[user['port']]) + # shutdown flag (set by signal handler) + shutdown_requested = False + + def _sig_handler(signum, frame): + nonlocal shutdown_requested + print(f"[user_daemon] Signal {signum} received: requesting shutdown...") + # Only set the flag; main loop will check and do graceful shutdown + shutdown_requested = True + + # Register signal handlers + for s in (signal.SIGINT, signal.SIGTERM, signal.SIGHUP, getattr(signal, "SIGQUIT", None)): + if s is not None: + signal.signal(s, _sig_handler) + + # Load user data + user = ressenger_common.load_user(password, username) + + # Start server (Code A) + server_proc = run_in_background('ressenger_server.py', args=[str(user['port'])]) + print(f"[user_daemon] Started server pid={server_proc.pid}") + cache1_path = pathlib.Path(f"~/.ressenger/cache_{user['port']}").expanduser() cache2_path = pathlib.Path(f"~/.ressenger/cache_{username}").expanduser() - with open(cache2_path, 'wb') as file: - file.write(pickle.dumps({}, protocol=pickle.HIGHEST_PROTOCOL)) - cache1_processed=[] - cache2_processed=[] - while True: - with open(cache1_path, 'rb') as file: - cache1=pickle.loads(file.read()) - with open(cache2_path, 'rb') as file: - cache2=pickle.loads(file.read()) - for i1 in cache1.keys(): - if not (i1 in cache1_processed): - packet=pickle.loads(cache1[i1]) - decrypted_data_bytes, validity = ressenger_cryptography.decrypt(packet['data'], user['enc_pri'], packet['sig_pub']) - if validity: - data=pickle.loads(decrypted_data_bytes) - event={'type':'recv', 'data':data, 'contact':contact_hash(packet['enc_pub'], packet['sig_pub'])} - user['events'][time.time_ns()]=event - cache1_processed.append(i1) - for i1 in cache2.keys(): - if not (i1 in cache2_processed): - event=cache2[i1] - user['events'][time.time_ns()]=event - cache2_processed.append(i1) - ressenger_common.dump_user(password, user, username) -if __name__=='__main__': + # Wait for cache1 to appear (the server should create it). Timeout is a safety net. + cache1_wait_timeout = 10.0 # seconds + start_time = time.time() + while not cache1_path.exists(): + if time.time() - start_time > cache1_wait_timeout: + # Server did not create cache1 in time -> terminate server and abort. + print(f"[user_daemon] Timeout waiting for cache1 at {cache1_path}. Terminating server.") + try: + terminate_process_group(server_proc, sig=signal.SIGTERM, wait_timeout=2.0) + except Exception: + pass + raise RuntimeError(f"cache1 not created by server within {cache1_wait_timeout} seconds") + time.sleep(0.1) + + # Now create cache2 atomically; fail if it already exists + try: + # If cache2 already exists, this is an error condition + if cache2_path.exists(): + # Clean up server and exit with error + print(f"[user_daemon] ERROR: cache2 at {cache2_path} already exists. Aborting.") + try: + terminate_process_group(server_proc, sig=signal.SIGTERM, wait_timeout=2.0) + except Exception: + pass + raise FileExistsError(f"cache2 {cache2_path} already exists") + # Atomically create and write empty dict (pickle) using O_EXCL so creation is atomic. + flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL + mode = 0o600 + fd = os.open(str(cache2_path), flags, mode) + try: + with os.fdopen(fd, 'wb') as f: + f.write(pickle.dumps({}, protocol=pickle.HIGHEST_PROTOCOL)) + except Exception: + # Ensure file removed on write failure + try: + os.remove(cache2_path) + except Exception: + pass + raise + print(f"[user_daemon] Created cache2 at {cache2_path}") + except Exception: + # propagate after cleanup already attempted above + raise + + cache1_processed = [] + cache2_processed = [] + + try: + # Main loop: poll both cache files + while not shutdown_requested: + try: + # Read cache1 (from server) + try: + with open(cache1_path, 'rb') as f: + cache1 = pickle.loads(f.read()) + except FileNotFoundError: + cache1 = {} + except Exception: + # File may be in the middle of being written; ignore and retry + traceback.print_exc() + cache1 = {} + + # Read cache2 (local) + try: + with open(cache2_path, 'rb') as f: + cache2 = pickle.loads(f.read()) + except FileNotFoundError: + cache2 = {} + except Exception: + traceback.print_exc() + cache2 = {} + + # Process new entries in cache1 + for i1 in list(cache1.keys()): + if i1 not in cache1_processed: + try: + packet = pickle.loads(cache1[i1]) + decrypted_data_bytes, validity = ressenger_cryptography.decrypt( + packet['data'], user['enc_pri'], packet['sig_pub'] + ) + if validity: + data = pickle.loads(decrypted_data_bytes) + # contact_hash is assumed available in this scope (import if necessary) + event = {'type': 'recv', 'data': data, 'contact': contact_hash(packet['enc_pub'], packet['sig_pub'])} + user['events'][time.time_ns()] = event + except Exception: + traceback.print_exc() + cache1_processed.append(i1) + + # Process new entries in cache2 + for i2 in list(cache2.keys()): + if i2 not in cache2_processed: + try: + event = cache2[i2] + user['events'][time.time_ns()] = event + except Exception: + traceback.print_exc() + cache2_processed.append(i2) + + # Persist user state + try: + ressenger_common.dump_user(password, user, username) + except Exception: + traceback.print_exc() + + # Sleep briefly to avoid busy loop + time.sleep(0.1) + + except Exception: + # Protect main loop from unexpected exceptions + traceback.print_exc() + time.sleep(0.5) + + finally: + # Graceful shutdown: final processing and dump_user, then stop server, then remove cache2. + print("[user_daemon] Shutdown requested: performing final processing...") + + # Best-effort: read cache1/cache2 once more + try: + try: + with open(cache1_path, 'rb') as f: + cache1 = pickle.loads(f.read()) + except Exception: + cache1 = {} + try: + with open(cache2_path, 'rb') as f: + cache2 = pickle.loads(f.read()) + except Exception: + cache2 = {} + + for i1 in list(cache1.keys()): + if i1 not in cache1_processed: + try: + packet = pickle.loads(cache1[i1]) + decrypted_data_bytes, validity = ressenger_cryptography.decrypt( + packet['data'], user['enc_pri'], packet['sig_pub'] + ) + if validity: + data = pickle.loads(decrypted_data_bytes) + event = {'type': 'recv', 'data': data, 'contact': contact_hash(packet['enc_pub'], packet['sig_pub'])} + user['events'][time.time_ns()] = event + except Exception: + traceback.print_exc() + cache1_processed.append(i1) + + for i2 in list(cache2.keys()): + if i2 not in cache2_processed: + try: + event = cache2[i2] + user['events'][time.time_ns()] = event + except Exception: + traceback.print_exc() + cache2_processed.append(i2) + + except Exception: + traceback.print_exc() + + # Final dump_user + try: + print("[user_daemon] Dumping user state to disk...") + ressenger_common.dump_user(password, user, username) + print("[user_daemon] dump_user finished.") + except Exception: + traceback.print_exc() + + # Terminate server process gracefully (trigger Code A interruption handling) + try: + print(f"[user_daemon] Requesting server (pid={server_proc.pid}) to terminate...") + terminate_process_group(server_proc, sig=signal.SIGTERM, wait_timeout=5.0) + print("[user_daemon] Server termination requested / waited.") + except Exception: + traceback.print_exc() + + # Remove cache2 (must be removed) + try: + if cache2_path.exists(): + os.remove(cache2_path) + print(f"[user_daemon] Removed cache2 at {cache2_path}") + except Exception: + traceback.print_exc() + + print("[user_daemon] Exiting.") + +if __name__ == '__main__': + if len(sys.argv) < 3: + print("Usage: ressenger_user_daemon.py ") + sys.exit(2) user_daemon(sys.argv[1], sys.argv[2])