165 lines
6.9 KiB
Python
Executable File
165 lines
6.9 KiB
Python
Executable File
#!/usr/bin/python3
|
|
import ressenger_cryptography, ressenger_exceptions, ressenger_client
|
|
import pickle, pathlib, time, os, tempfile, contextlib, hashlib
|
|
|
|
try:
|
|
import fcntl
|
|
_has_fcntl = True
|
|
except Exception:
|
|
_has_fcntl = False
|
|
|
|
@contextlib.contextmanager
|
|
def _file_lock(lock_path: pathlib.Path, timeout: float = 10.0):
|
|
"""
|
|
Cross-process lock:
|
|
- On POSIX, use fcntl.flock (advisory lock).
|
|
- On platforms without fcntl, fallback to atomic lockfile creation.
|
|
"""
|
|
lock_path.parent.mkdir(parents=True, exist_ok=True)
|
|
if _has_fcntl:
|
|
# Use a lock file with flock
|
|
with open(lock_path, 'w') as lf:
|
|
fcntl.flock(lf, fcntl.LOCK_EX)
|
|
try:
|
|
yield
|
|
finally:
|
|
fcntl.flock(lf, fcntl.LOCK_UN)
|
|
else:
|
|
# Fallback: atomic lockfile creation (O_EXCL).
|
|
# Note: stale locks are possible if the process crashes.
|
|
start = time.time()
|
|
fd = None
|
|
while True:
|
|
try:
|
|
fd = os.open(str(lock_path), os.O_CREAT | os.O_EXCL | os.O_WRONLY)
|
|
os.write(fd, str(os.getpid()).encode())
|
|
os.close(fd)
|
|
fd = None
|
|
break
|
|
except FileExistsError:
|
|
if time.time() - start >= timeout:
|
|
raise TimeoutError(f"timed out acquiring lock {lock_path}")
|
|
time.sleep(0.05)
|
|
try:
|
|
yield
|
|
finally:
|
|
try:
|
|
os.unlink(str(lock_path))
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
def load_user(password, username='default'):
|
|
user_path=pathlib.Path(f'~/.ressenger/{username}').expanduser()
|
|
if not user_path.exists():
|
|
raise ressenger_exceptions.UserDoesNotExistError(f'File ~/.ressenger/{username} does not exist.')
|
|
else:
|
|
with open(user_path, 'rb') as file:
|
|
bt=file.read()
|
|
return pickle.loads(ressenger_cryptography.decrypt_bytes(bt, password))
|
|
|
|
def dump_user(password, user, username='default'):
|
|
"""
|
|
Safely write (dump) a user file:
|
|
- Performs an atomic replace of the user file to avoid partial writes.
|
|
- Serialises concurrent writers using the module's _file_lock (if available).
|
|
"""
|
|
user_path = pathlib.Path(f'~/.ressenger/{username}').expanduser()
|
|
|
|
if not user_path.exists():
|
|
# Keep original behaviour: raise if target user file does not exist.
|
|
raise ressenger_exceptions.UserDoesNotExistError(f'File ~/.ressenger/{username} does not exist.')
|
|
else:
|
|
# Prepare lock and directory variables
|
|
lock_path = user_path.parent / (user_path.name + '.lock')
|
|
dirpath = user_path.parent
|
|
|
|
# Acquire cross-process lock to serialise writers
|
|
with _file_lock(lock_path):
|
|
# Serialize (pickle) and encrypt in memory first
|
|
payload = pickle.dumps(user, protocol=pickle.HIGHEST_PROTOCOL)
|
|
encrypted = ressenger_cryptography.encrypt_bytes(payload, password)
|
|
|
|
# Write encrypted payload to a temporary file in the same directory
|
|
with tempfile.NamedTemporaryFile(dir=str(dirpath), delete=False) as tf:
|
|
tmpname = tf.name
|
|
tf.write(encrypted)
|
|
tf.flush()
|
|
os.fsync(tf.fileno()) # ensure data is flushed to disk
|
|
|
|
# Atomically replace the old file with the new file
|
|
os.replace(tmpname, str(user_path))
|
|
|
|
# Try to fsync the directory for extra durability (best-effort)
|
|
try:
|
|
dirfd = os.open(str(dirpath), os.O_RDONLY)
|
|
try:
|
|
os.fsync(dirfd)
|
|
finally:
|
|
os.close(dirfd)
|
|
except Exception:
|
|
# Some platforms / filesystems may not support this; ignore failures
|
|
pass
|
|
|
|
def contact_hash(enc_pub, sig_pub):
|
|
h = hashlib.sha256()
|
|
h.update(enc_pub)
|
|
h.update(sig_pub)
|
|
return h.digest()
|
|
|
|
def send_message(text, destination_addr, destination_port, reply_addr, reply_port, enc_pub_key, sig_pri_key, sig_pub_key):
|
|
data={'type':'text', 'sent_time'=time.time_ns(), 'data':{'content':text}, 'reply':{'addr':reply_addr, 'port':reply_port}}
|
|
data_encrypted, data_signature=ressenger_cryptography.encrypt(pickle.dumps(data, protocol=pickle.HIGHEST_PROTOCOL), enc_pub_key, sig_pri_key)
|
|
packet={'data':data_encrypted, 'sig':data_signature, 'enc_pub':enc_pub_key, 'sig_pub':sig_pub_key}
|
|
errorno=ressenger_client.send_data_i2p(pickle.dumps(packet, protocol=pickle.HIGHEST_PROTOCOL), destination_addr, destination_port)
|
|
event={'type':'sent', 'data':data, 'contact':contact_hash(enc_pub_key, sig_pub_key), 'errorno':errorno}
|
|
return event
|
|
|
|
def send_file(filename, filebytes, destination_addr, destination_port, reply_addr, reply_port, enc_pub_key, sig_pri_key, sig_pub_key)
|
|
data={'type':'file', 'sent_time'=time.time_ns(), 'data':{'content':filebytes, 'filename':filename}, 'reply':{'addr':reply_addr, 'port':reply_port}}
|
|
data_encrypted, data_signature=ressenger_cryptography.encrypt(pickle.dumps(data, protocol=pickle.HIGHEST_PROTOCOL), enc_pub_key, sig_pri_key)
|
|
packet={'data':data_encrypted, 'sig':data_signature, 'enc_pub':enc_pub_key, 'sig_pub':sig_pub_key}
|
|
errorno=ressenger_client.send_data_i2p(pickle.dumps(packet, protocol=pickle.HIGHEST_PROTOCOL), destination_addr, destination_port)
|
|
event={'type':'sent', 'data':data, 'contact':contact_hash(enc_pub_key, sig_pub_key), 'errorno':errorno}
|
|
return event
|
|
|
|
def write_event(username: str, event):
|
|
"""
|
|
Safely write user cache:
|
|
- Guarantees atomic replacement of the cache file.
|
|
- Serialises concurrent writers if the platform supports locking.
|
|
"""
|
|
event_cache = pathlib.Path(f"~/.ressenger/cache_{username}").expanduser()
|
|
event_cache.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
lock_path = event_cache.parent / (event_cache.name + '.lock')
|
|
|
|
# Lock protects read-modify-write sequence
|
|
with _file_lock(lock_path):
|
|
# Read existing events
|
|
with open(event_cache, 'rb') as f:
|
|
events = pickle.loads(f.read())
|
|
|
|
events[time.time_ns()] = event
|
|
|
|
# Write to a temporary file in the same directory
|
|
dirpath = event_cache.parent
|
|
with tempfile.NamedTemporaryFile(dir=str(dirpath), delete=False) as tf:
|
|
tmpname = tf.name
|
|
tf.write(pickle.dumps(events, protocol=pickle.HIGHEST_PROTOCOL))
|
|
tf.flush()
|
|
os.fsync(tf.fileno()) # ensure data is flushed to disk
|
|
|
|
# Atomically replace the old file with the new one
|
|
os.replace(tmpname, str(event_cache))
|
|
|
|
# Try fsync on the directory for extra durability (Linux/Unix)
|
|
try:
|
|
dirfd = os.open(str(dirpath), os.O_RDONLY)
|
|
try:
|
|
os.fsync(dirfd)
|
|
finally:
|
|
os.close(dirfd)
|
|
except Exception:
|
|
# On some platforms this may fail, ignore silently
|
|
pass
|