#!/usr/bin/python3 from __future__ import annotations import os import socket import struct import typing # Type alias for clarity Address = str # Default timeout for socket operations (seconds) _DEFAULT_TIMEOUT = 10.0 # Return codes SUCCESS = 0 ERR_TYPE = 1 # invalid data type (TypeError) ERR_PAYLOAD_TOO_LARGE = 2 # payload exceeds supported length (ValueError) ERR_SOCKET_ERROR = 3 # plain TCP connection error or timeout ERR_SEND_FAILED = 4 # failure while sending data ERR_SOCKS_MISSING = 5 # PySocks (socks) module not installed ERR_INVALID_SOCKS_PORT = 6 # I2P_SOCKS_PORT environment variable not an integer ERR_PROXY_CONNECTION = 7 # error connecting through SOCKS proxy (I2P) ERR_UNKNOWN = 8 # unknown/unexpected error def _prepare_payload(data: typing.Union[bytes, str]) -> bytes: """ Convert data into bytes. If data is a str, encode with UTF-8. Raises: TypeError: if data is not bytes or str. """ if isinstance(data, bytes): return data if isinstance(data, str): return data.encode('utf-8') raise TypeError("data must be bytes or str") def _pack_message(payload: bytes) -> bytes: """ Prepend a 4-byte big-endian unsigned length to payload. The protocol supports payload lengths up to 2**32 - 1 bytes. """ length = len(payload) if length >= (1 << 32): raise ValueError("payload too large (must be less than 2**32 bytes)") return struct.pack("!I", length) + payload def send_data(data: typing.Union[bytes, str], addr: Address, port: int) -> int: """ Send a single length-prefixed message over plain TCP to (addr, port). Parameters: - data: bytes or str (if str, encoded as UTF-8) - addr: IPv4/IPv6 address or hostname reachable directly - port: TCP port on the destination Behaviour: - Connects, sends the framed message, then closes the socket. - Returns 0 on success, otherwise returns an error code. """ try: payload = _prepare_payload(data) except TypeError: return ERR_TYPE try: message = _pack_message(payload) except ValueError: return ERR_PAYLOAD_TOO_LARGE # Create a TCP connection with a timeout try: with socket.create_connection((addr, port), timeout=_DEFAULT_TIMEOUT) as sock: # Ensure blocking mode for sendall sock.setblocking(True) try: sock.sendall(message) except (socket.timeout, OSError): # Sending failed after connection return ERR_SEND_FAILED except (socket.timeout, OSError): # Could not create connection (including DNS, connect, timeout) return ERR_SOCKET_ERROR except Exception: return ERR_UNKNOWN return SUCCESS def send_data_i2p(data: typing.Union[bytes, str], addr: Address, port: int) -> int: """ Send a single length-prefixed message to an I2P destination via a local i2pd SOCKS5 proxy. Parameters: - data: bytes or str (if str, encoded as UTF-8) - addr: I2P hostname, e.g. 'xxxxxxxxxxxxxx.b32.i2p' or 'mydest.i2p' - port: destination port exposed by the I2P server tunnel Behaviour: - Uses PySocks (socks) to create a socket that speaks SOCKS5 to the local proxy. - Uses remote DNS resolution (rdns=True) so that the proxy resolves .i2p names. - Connects, sends the framed message, then closes the socket. - Returns 0 on success, otherwise returns an error code. """ try: import socks # PySocks; license: MIT except Exception: return ERR_SOCKS_MISSING try: payload = _prepare_payload(data) except TypeError: return ERR_TYPE try: message = _pack_message(payload) except ValueError: return ERR_PAYLOAD_TOO_LARGE socks_host = os.environ.get("I2P_SOCKS_HOST", "127.0.0.1") socks_port_str = os.environ.get("I2P_SOCKS_PORT", "4447") try: socks_port = int(socks_port_str) except ValueError: return ERR_INVALID_SOCKS_PORT # Create a socksified socket. socks.socksocket has the same interface as socket.socket. s = socks.socksocket() # Use SOCKS5; rdns=True ensures the proxy resolves the .i2p name instead of local DNS. s.set_proxy(proxy_type=socks.SOCKS5, addr=socks_host, port=socks_port, rdns=True) s.settimeout(_DEFAULT_TIMEOUT) try: # Connect through the proxy to the I2P destination. # When rdns=True, the proxy will perform name resolution for .i2p addresses. s.connect((addr, port)) except (socket.timeout, OSError, Exception): # Proxy connection / resolution failed try: s.close() except Exception: pass return ERR_PROXY_CONNECTION try: # Ensure blocking mode for sendall s.setblocking(True) try: s.sendall(message) except (socket.timeout, OSError): return ERR_SEND_FAILED except Exception: return ERR_UNKNOWN finally: try: s.close() except Exception: pass return SUCCESS