From c3a0267d747d5655ae23540afbd9e21127d79ff7 Mon Sep 17 00:00:00 2001 From: simp Date: Thu, 25 Sep 2025 21:40:11 +0000 Subject: [PATCH] features: inter-index discrepancy checker, find all duplicates, show all hosts, download several host.txt files, ability to change settings. --- lookup.py | 319 +++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 306 insertions(+), 13 deletions(-) diff --git a/lookup.py b/lookup.py index 674cdf0..3b08bc9 100644 --- a/lookup.py +++ b/lookup.py @@ -1,9 +1,51 @@ #!/usr/bin/env python3 -import hashlib, base64, os, argparse - -def read_hosts_file()->tuple: +import hashlib, base64, os, argparse, re, glob, shutil; from concurrent.futures import ThreadPoolExecutor +class Cl: + red:str = '\033[91m'; green:str = '\033[92m'; yellow:str = "\033[93m"; normal:str = '\033[0m' +class Settings: script_directory:str = os.path.dirname(os.path.abspath(__file__)) - hosts_txt:str = os.path.join(script_directory, 'hosts.txt') + config:dict = { + 'http_host': '127.0.0.1', + 'http_port': '4444', + 'hosts_txt': 'notbob', + 'host_files': 'i2phosts', + 'file': '.settings', + 'max_tries': 3, + 'subscriptions': [ + 'http://stats.i2p/cgi-bin/newhosts.txt', + 'http://i2p-projekt.i2p/hosts.txt', + 'http://notbob.i2p/hosts.txt', + 'http://reg.i2p/hosts.txt', + 'http://skank.i2p/hosts.txt' + 'http://identiguy.i2p/hosts.txt', + 'http://simp.i2p/hosts.txt', + 'http://website.i2p/hosts.txt', + 'http://identiguy.i2p/hosts.txt', + 'http://rus.i2p/hosts.txt', + ] + } +def print_title(menu_title:str, sub_title:str)->None: + title:str = ' '.join(menu_title).upper(); terminal_width:int = shutil.get_terminal_size().columns + line_break:str = f'{Cl.green}{"⎯" * terminal_width}{Cl.normal}' + spacer:str = " " * int(((terminal_width / 2)-(len(title) / 2))) + title_final:list = ['\n', line_break, f"{Cl.green}{spacer}{title}{Cl.normal}", line_break] + if sub_title != "": + title_final.insert(3, f'{Cl.yellow}{sub_title}{Cl.normal}') + for item in title_final: + print(item) + +def strip_filename(host_file:str)->tuple: + if host_file[-4:].casefold() == '.txt': + txt_file:str = host_file + host_name:str = host_file[:-4] + else: + txt_file = f'{host_file}.txt' + host_name = host_file + return host_name, txt_file + +def read_hosts_file(file_name:str)->tuple: + host_name, txt_file = strip_filename(file_name) + hosts_txt:str = os.path.join(Settings.script_directory, txt_file) lines_list:list = [] if os.path.isfile(hosts_txt): with open(hosts_txt, 'r') as file: @@ -11,8 +53,23 @@ def read_hosts_file()->tuple: rm_line_break:str = line.replace('\n', '') if rm_line_break[:1].casefold() != '#': lines_list.append(rm_line_break) - return lines_list, script_directory - + failed = False + else: + failed = True + try: + if lines_list[0][:1] == '<': + failed = True + except Exception as e: + failed = True + return lines_list, failed + +def short_url(url:str)->str: + p:str = '(?:http.*://)?(?P[^:/ ]+).?(?P[0-9]*).*' + m:object = re.search(p,url) + host:str = m.group('host') + f_host:str = f'http://{host}' + return f_host + def shorten(input:str)->str: try: url:str = input.replace('http://', '').replace('https://', '').replace('/', '') @@ -58,18 +115,236 @@ def get_dict(lines_list:list, which:int)->dict: if which == 3: hosts_dict[host] = {'b32': b32,'b64': b64,} return hosts_dict + +def registrar_txt_files()->list: + text_file_directory:str = os.path.join(Settings.script_directory, Settings.config['host_files']) + txt_files:list = glob.glob(f'{text_file_directory}/*.txt') + reg_txt_files:list = [] + for file in txt_files: + reg_txt_files.append(os.path.basename(file)) + return reg_txt_files + +def discrepancy_checker()->str: + host_compare:dict = {} + registrars = registrar_txt_files() + unique_hosts:list = [] + for registrar in registrars: + temp:dict = {} + lines_list, failed = read_hosts_file(os.path.join(Settings.config['host_files'], registrar)) + if not failed: + for line in lines_list: + split_line:list = line.split('=') + hostname:str = split_line[0] + temp[hostname] = line.replace(f'{hostname}=', '') + if hostname not in unique_hosts: + unique_hosts.append(hostname) + host_compare[registrar] = temp + different_b64:dict = {} + for host in unique_hosts: + check_b32:list = [] + for registrar in host_compare: + if host in host_compare[registrar]: + b32 = get_b32(host_compare[registrar][host]) + if b32 not in check_b32: + check_b32.append(get_b32(host_compare[registrar][host])) + if len(check_b32) > 1: + temp = {} + for registrar in host_compare: + if host in host_compare[registrar]: + temp[registrar] = get_b32(host_compare[registrar][host]) + different_b64[host] = temp + if len(different_b64) > 0: + subtitle:str = f"{len(different_b64)} discrepancies found" + output_list:list = [] + for host_unique in different_b64: + output_list.append(f'> {host_unique}') + for reg, unique_b32 in different_b64[host_unique].items(): + output_list.append(f' {unique_b32} - {reg}') + output_msg:str = '\n'.join(output_list) + else: + subtitle = f'{Cl.green}No discrepancies found{Cl.normal}' + output_msg = 'Nothing to see here' + print_title('Registrar Discrepancy Checker', subtitle) + return output_msg + +def eepget_curl_thread(url:str)->bool: + try: + download_dir:str = os.path.join(Settings.script_directory, Settings.config['host_files']) + o_:str = shorten(short_url(url)).replace('.i2p', '.txt') + msg:str = f'ECHO {Cl.green}Downloading {o_}{Cl.normal}' + os.system(f"{msg} && curl --parallel --parallel-immediate --parallel-max 5 --retry {Settings.config['max_tries']} -x http://{Settings.config['http_host']}:{Settings.config['http_port']} --create-dirs -o '{o_}' --output-dir {download_dir} {url}") + return True + except Exception as e: + print(f"{Cl.red}[Error]{Cl.normal} {e}") + return False + +def eepget_curl(urls_:list)->str: + print_title('Downloading host.txt files', f'Using {len(urls_)} sources...') + with ThreadPoolExecutor() as pool: + pool.map(eepget_curl_thread, urls_) + return f'{Cl.green}Completed{Cl.normal}' + +def duplicates(host_file:str)->str: + host_name, txt_file = strip_filename(host_file) + lines_list, failed = read_hosts_file(os.path.join(Settings.config['host_files'], txt_file)) + if failed: + return f'{Cl.red}ERROR: {txt_file} not found or malformed.{Cl.normal}' + hosts_dict = get_dict(lines_list, 3) + unique_b64 = {} + found = [] + for k, v in hosts_dict.items(): + if v['b64'] not in unique_b64: + unique_b64[v['b64']] = [k] + else: + unique_b64[v['b64']].append(k) + for k, v in unique_b64.items(): + if len(v) > 1: + found.append(f'{get_b32(k)} - {", ".join(v)}') + print_title(f'Duplicate hostnames from {host_name}', f'Found {len(found)} unique b32s that are registered to multiple hostnames') + return '\n'.join(found) + +def update_settings(msg: str, txtfile:str, update_msg:str)->str: + file_:str = f"{Settings.script_directory}/{txtfile}" + file_exists:bool = os.path.exists(file_) + new_line:str = f'{msg}\n' + try: + if not file_exists: + file_object:object = open(file_, 'a') + file_object.write(new_line) + file_object.close() + else: + file_object = open(file_, 'w') + file_object.writelines(new_line) + file_object.close() + return f'{Cl.green}Settings updated{update_msg}{Cl.normal}' + except Exception as e: + return f'{Cl.red}ERROR UPDATING SETTINGS{Cl.normal} {e}' + +def read_txt_file(file_:str)->tuple: + lines_list:list = [] + if os.path.isfile(file_): + with open(file_, 'r') as file: + for line_number, line in enumerate(file, start=1): + rm_line_break:str = line.replace('\n', '') + lines_list.append(rm_line_break) + failed:bool = False + else: + failed = True + return lines_list, failed + +def parse_conf(lines:list)->dict: + config:dict = {} + for line in lines: + kv = line.split("#")[0].split('=') + if len(kv) > 1: + key:str = kv[0].replace(' ', '') + value:str = kv[1].replace(' ', '') + if key == 'subscriptions': + config[key] = value.split(',') + elif key == 'max_tries': + config[key] = int(value) + else: + config[key] = value + + return config + +def dict_to_config(config:dict)->str: + msg:list = [] + for k, v in config.items(): + if type(v) is list: + msg.append(f'{k} = {",".join(v)}') + else: + msg.append(f'{k} = {v}') + return '\n'.join(msg) + +def get_settings()->None: + lines, failed = read_txt_file(Settings.config['file']) + if failed: + print(f"{Cl.green}Making .settings file at {os.path.join(Settings.script_directory, Settings.config['host_files'], Settings.config['file'])}{Cl.normal}") + update_settings(dict_to_config(Settings.config), Settings.config['file'], '') + else: + keys:list = ['http_host', 'http_port', 'hosts_txt', 'host_files', 'max_tries', 'subscriptions'] + config = parse_conf(lines) + for k in keys: + if k in config: + Settings.config[k] = config[k] + +def args_settings_update(setting:str, config_key:str)->str: + old:str = Settings.config[config_key] + Settings.config[config_key] = setting + output_msg:str = update_settings(dict_to_config(Settings.config), Settings.config['file'], f": [{config_key}] {old} > {setting}") + return output_msg + +def display_registrar_host_files(): + reg_list = registrar_txt_files() + reg_dict:dict = {} + max_len:int = 0 + for reg in reg_list: + if len(reg) > max_len: + max_len = len(reg) + reg_dict[reg] = len(read_hosts_file(os.path.join(Settings.config['host_files'], reg))[0]) + msg_list = [] + print_title(f'Subscriptions: {len(reg_list)}', f'Index / hosts') + width:int = max_len + 10 + terminal_width:int = shutil.get_terminal_size().columns + center:int = int((terminal_width - width) / 2) + bar_horizontal:str = f"{' ' * center}{Cl.green}{'┄' * (width)}{Cl.normal}" + bar_vertical_left:str = f"{' ' * center}{Cl.green}┊{Cl.normal}" + bar_vertical:str = f'{Cl.green}┊{Cl.normal}' + msg_list.append(bar_horizontal) + for k, v in reg_dict.items(): + spacer1:str = " " * (max_len - len(k)) + spacer2:str = " " * (5 - len(str(v))) + msg_list.append(f'{bar_vertical_left}{k} {spacer1}{bar_vertical} {v}{spacer2}{bar_vertical}') + msg_list.append(bar_horizontal) + return '\n'.join(msg_list) + +def get_unique_hosts(): + reg_list = registrar_txt_files() + reg_dict:dict = {} + reg_names_count:dict = {} + reg_names_list_order = [] + for reg in reg_list: + reg_ = reg.replace('.txt', '') + hostname_list, failed = read_hosts_file(os.path.join(Settings.config['host_files'], reg)) + if not failed: + reg_names_list_order.append(reg_) + for hostname in hostname_list: + host, b64, b32 = parse_hostentry(hostname) + if not host in reg_dict: + reg_dict[host] = {'b32': b32, 'b64': b64, 'registrar': reg_} + if not reg_ in reg_names_count: + reg_names_count[reg_] = 1 + else: + reg_names_count[reg_] += 1 + + print_title(f'All unique hosts from {len(reg_list)} subs', 'Shows the unique hosts from all subs. Highest ranked subs take precedence.') + for reg in reg_names_list_order: + if reg in reg_names_count: + print(f'{reg} - {reg_names_count[reg]}') + output_msg = f'{Cl.green}Found {(len(reg_dict))} unique hosts.{Cl.normal}' + return output_msg + if __name__ == "__main__": - class Cl: - red:str = '\033[91m'; green:str = '\033[92m'; yellow:str = "\033[93m"; normal:str = '\033[0m' + get_settings() parser = argparse.ArgumentParser() parser.add_argument("-3", "--b32", help="lookup b32", type=str) parser.add_argument("-6", "--b64", help="lookup b64", type=str) parser.add_argument("-hn", "--hostname", help="lookup hostname", type=str) + parser.add_argument("-dup", "--duplicates", help=f"Checks a given host.txt file for duplicates. The default is {Settings.config['hosts_txt']}", action='store_true') + parser.add_argument("-chk", "--checkmismatched", help="Checks all hosts.txt files for discrepancies.", action='store_true') + parser.add_argument("-up", "--updatehosts", help="Uses curl to download all host.txt files. --setport to change http port", action='store_true') + parser.add_argument("-subs", "--subscriptions", help="Shows subscription list for host.txt files", action='store_true') + parser.add_argument("-all", "--allunique", help="shows all unique b32s between all host.txt files", action='store_true') + parser.add_argument("-shost", "--sethost", help=f"Set the host for http proxy to use. Current is {Settings.config['http_host']}", type=str) + parser.add_argument("-sport", "--setport", help=f"Set the HTTP port for http proxy. Current is {Settings.config['http_port']}", type=int) + parser.add_argument("-sdef", "--setdefault", help=f"Set the default host.txt file to use. Current is {Settings.config['hosts_txt']}", type=str) + parser.add_argument("-stries", "--setmaxtries", help=f"Set the max tries for curl command {Settings.config['max_tries']}", type=int) args = parser.parse_args() - lines_list, script_directory = read_hosts_file() - if len(lines_list) == 0: - output_msg:str = f'{Cl.red}[hosts.txt file not found]{Cl.normal} Place in same directory as this script:\n{script_directory}' - elif args.b32: + lines_list, failed = read_hosts_file(os.path.join(Settings.config['host_files'], Settings.config['hosts_txt'])) + if failed: + print(f'{Cl.red}[hosts.txt file not found or malformed]{Cl.normal} run -up / --updatehosts to download\nOr put in {Settings.script_directory}/{Settings.config["host_files"]}') + if args.b32: b32_dict:dict = get_dict(lines_list, 1) url:str = shorten(args.b32) if url in b32_dict: @@ -89,6 +364,24 @@ if __name__ == "__main__": output_msg = f"{hosts_dict[url]['b32']}\n{hosts_dict[url]['b64']}" else: output_msg = f'{Cl.yellow}[Not found]{Cl.normal} {args.hostname}' + elif args.duplicates: + output_msg = duplicates(Settings.config['hosts_txt']) + elif args.subscriptions: + output_msg = display_registrar_host_files() + elif args.allunique: + output_msg = get_unique_hosts() + elif args.updatehosts: + output_msg = eepget_curl(Settings.config['subscriptions']) + elif args.checkmismatched: + output_msg = discrepancy_checker() + elif args.setdefault: + output_msg = args_settings_update(args.setdefault, 'hosts_txt') + elif args.sethost: + output_msg = args_settings_update(args.sethost, 'http_host') + elif args.setport: + output_msg = args_settings_update(args.setport, 'http_port') + elif args.setmaxtries: + output_msg = args_settings_update(args.setmaxtries, 'max_tries') else: - output_msg = f'{Cl.green}[Lookup] {Cl.normal}Accepts b32, b64, or hostname. -h, --help for options.' + output_msg = f'{Cl.green}[Lookup] {Cl.normal}Accepts b32, b64, or hostname. -h, --help for options.' print(output_msg) \ No newline at end of file