Ressenger/ressenger_common.py

165 lines
6.9 KiB
Python
Executable File

#!/usr/bin/python3
import ressenger_cryptography, ressenger_exceptions, ressenger_client
import pickle, pathlib, time, os, tempfile, contextlib, hashlib
try:
import fcntl
_has_fcntl = True
except Exception:
_has_fcntl = False
@contextlib.contextmanager
def _file_lock(lock_path: pathlib.Path, timeout: float = 10.0):
"""
Cross-process lock:
- On POSIX, use fcntl.flock (advisory lock).
- On platforms without fcntl, fallback to atomic lockfile creation.
"""
lock_path.parent.mkdir(parents=True, exist_ok=True)
if _has_fcntl:
# Use a lock file with flock
with open(lock_path, 'w') as lf:
fcntl.flock(lf, fcntl.LOCK_EX)
try:
yield
finally:
fcntl.flock(lf, fcntl.LOCK_UN)
else:
# Fallback: atomic lockfile creation (O_EXCL).
# Note: stale locks are possible if the process crashes.
start = time.time()
fd = None
while True:
try:
fd = os.open(str(lock_path), os.O_CREAT | os.O_EXCL | os.O_WRONLY)
os.write(fd, str(os.getpid()).encode())
os.close(fd)
fd = None
break
except FileExistsError:
if time.time() - start >= timeout:
raise TimeoutError(f"timed out acquiring lock {lock_path}")
time.sleep(0.05)
try:
yield
finally:
try:
os.unlink(str(lock_path))
except FileNotFoundError:
pass
def load_user(password, username='default'):
user_path=pathlib.Path(f'~/.ressenger/{username}').expanduser()
if not user_path.exists():
raise ressenger_exceptions.UserDoesNotExistError(f'File ~/.ressenger/{username} does not exist.')
else:
with open(user_path, 'rb') as file:
bt=file.read()
return pickle.loads(ressenger_cryptography.decrypt_bytes(bt, password))
def dump_user(password, user, username='default'):
"""
Safely write (dump) a user file:
- Performs an atomic replace of the user file to avoid partial writes.
- Serialises concurrent writers using the module's _file_lock (if available).
"""
user_path = pathlib.Path(f'~/.ressenger/{username}').expanduser()
if not user_path.exists():
# Keep original behaviour: raise if target user file does not exist.
raise ressenger_exceptions.UserDoesNotExistError(f'File ~/.ressenger/{username} does not exist.')
else:
# Prepare lock and directory variables
lock_path = user_path.parent / (user_path.name + '.lock')
dirpath = user_path.parent
# Acquire cross-process lock to serialise writers
with _file_lock(lock_path):
# Serialize (pickle) and encrypt in memory first
payload = pickle.dumps(user, protocol=pickle.HIGHEST_PROTOCOL)
encrypted = ressenger_cryptography.encrypt_bytes(payload, password)
# Write encrypted payload to a temporary file in the same directory
with tempfile.NamedTemporaryFile(dir=str(dirpath), delete=False) as tf:
tmpname = tf.name
tf.write(encrypted)
tf.flush()
os.fsync(tf.fileno()) # ensure data is flushed to disk
# Atomically replace the old file with the new file
os.replace(tmpname, str(user_path))
# Try to fsync the directory for extra durability (best-effort)
try:
dirfd = os.open(str(dirpath), os.O_RDONLY)
try:
os.fsync(dirfd)
finally:
os.close(dirfd)
except Exception:
# Some platforms / filesystems may not support this; ignore failures
pass
def contact_hash(enc_pub, sig_pub):
h = hashlib.sha256()
h.update(enc_pub)
h.update(sig_pub)
return h.digest()
def send_message(text, destination_addr, destination_port, reply_addr, reply_port, enc_pub_key, sig_pri_key, sig_pub_key):
data={'type':'text', 'sent_time'=time.time_ns(), 'data':{'content':text}, 'reply':{'addr':reply_addr, 'port':reply_port}}
data_encrypted, data_signature=ressenger_cryptography.encrypt(pickle.dumps(data, protocol=pickle.HIGHEST_PROTOCOL), enc_pub_key, sig_pri_key)
packet={'data':data_encrypted, 'sig':data_signature, 'enc_pub':enc_pub_key, 'sig_pub':sig_pub_key}
errorno=ressenger_client.send_data_i2p(pickle.dumps(packet, protocol=pickle.HIGHEST_PROTOCOL), destination_addr, destination_port)
event={'type':'sent', 'data':data, 'contact':contact_hash(enc_pub_key, sig_pub_key), 'errorno':errorno}
return event
def send_file(filename, filebytes, destination_addr, destination_port, reply_addr, reply_port, enc_pub_key, sig_pri_key, sig_pub_key)
data={'type':'file', 'sent_time'=time.time_ns(), 'data':{'content':filebytes, 'filename':filename}, 'reply':{'addr':reply_addr, 'port':reply_port}}
data_encrypted, data_signature=ressenger_cryptography.encrypt(pickle.dumps(data, protocol=pickle.HIGHEST_PROTOCOL), enc_pub_key, sig_pri_key)
packet={'data':data_encrypted, 'sig':data_signature, 'enc_pub':enc_pub_key, 'sig_pub':sig_pub_key}
errorno=ressenger_client.send_data_i2p(pickle.dumps(packet, protocol=pickle.HIGHEST_PROTOCOL), destination_addr, destination_port)
event={'type':'sent', 'data':data, 'contact':contact_hash(enc_pub_key, sig_pub_key), 'errorno':errorno}
return event
def write_event(username: str, event):
"""
Safely write user cache:
- Guarantees atomic replacement of the cache file.
- Serialises concurrent writers if the platform supports locking.
"""
event_cache = pathlib.Path(f"~/.ressenger/cache_{username}").expanduser()
event_cache.parent.mkdir(parents=True, exist_ok=True)
lock_path = event_cache.parent / (event_cache.name + '.lock')
# Lock protects read-modify-write sequence
with _file_lock(lock_path):
# Read existing events
with open(event_cache, 'rb') as f:
events = pickle.loads(f.read())
events[time.time_ns()] = event
# Write to a temporary file in the same directory
dirpath = event_cache.parent
with tempfile.NamedTemporaryFile(dir=str(dirpath), delete=False) as tf:
tmpname = tf.name
tf.write(pickle.dumps(events, protocol=pickle.HIGHEST_PROTOCOL))
tf.flush()
os.fsync(tf.fileno()) # ensure data is flushed to disk
# Atomically replace the old file with the new one
os.replace(tmpname, str(event_cache))
# Try fsync on the directory for extra durability (Linux/Unix)
try:
dirfd = os.open(str(dirpath), os.O_RDONLY)
try:
os.fsync(dirfd)
finally:
os.close(dirfd)
except Exception:
# On some platforms this may fail, ignore silently
pass