features: inter-index discrepancy checker, find all duplicates, show all hosts, download several host.txt files, ability to change settings.

main
simp 2025-09-25 21:40:11 +00:00
parent 6bc7a6de7c
commit c3a0267d74
1 changed files with 306 additions and 13 deletions

319
lookup.py
View File

@ -1,9 +1,51 @@
#!/usr/bin/env python3
import hashlib, base64, os, argparse
def read_hosts_file()->tuple:
import hashlib, base64, os, argparse, re, glob, shutil; from concurrent.futures import ThreadPoolExecutor
class Cl:
red:str = '\033[91m'; green:str = '\033[92m'; yellow:str = "\033[93m"; normal:str = '\033[0m'
class Settings:
script_directory:str = os.path.dirname(os.path.abspath(__file__))
hosts_txt:str = os.path.join(script_directory, 'hosts.txt')
config:dict = {
'http_host': '127.0.0.1',
'http_port': '4444',
'hosts_txt': 'notbob',
'host_files': 'i2phosts',
'file': '.settings',
'max_tries': 3,
'subscriptions': [
'http://stats.i2p/cgi-bin/newhosts.txt',
'http://i2p-projekt.i2p/hosts.txt',
'http://notbob.i2p/hosts.txt',
'http://reg.i2p/hosts.txt',
'http://skank.i2p/hosts.txt'
'http://identiguy.i2p/hosts.txt',
'http://simp.i2p/hosts.txt',
'http://website.i2p/hosts.txt',
'http://identiguy.i2p/hosts.txt',
'http://rus.i2p/hosts.txt',
]
}
def print_title(menu_title:str, sub_title:str)->None:
title:str = ' '.join(menu_title).upper(); terminal_width:int = shutil.get_terminal_size().columns
line_break:str = f'{Cl.green}{"" * terminal_width}{Cl.normal}'
spacer:str = " " * int(((terminal_width / 2)-(len(title) / 2)))
title_final:list = ['\n', line_break, f"{Cl.green}{spacer}{title}{Cl.normal}", line_break]
if sub_title != "":
title_final.insert(3, f'{Cl.yellow}{sub_title}{Cl.normal}')
for item in title_final:
print(item)
def strip_filename(host_file:str)->tuple:
if host_file[-4:].casefold() == '.txt':
txt_file:str = host_file
host_name:str = host_file[:-4]
else:
txt_file = f'{host_file}.txt'
host_name = host_file
return host_name, txt_file
def read_hosts_file(file_name:str)->tuple:
host_name, txt_file = strip_filename(file_name)
hosts_txt:str = os.path.join(Settings.script_directory, txt_file)
lines_list:list = []
if os.path.isfile(hosts_txt):
with open(hosts_txt, 'r') as file:
@ -11,8 +53,23 @@ def read_hosts_file()->tuple:
rm_line_break:str = line.replace('\n', '')
if rm_line_break[:1].casefold() != '#':
lines_list.append(rm_line_break)
return lines_list, script_directory
failed = False
else:
failed = True
try:
if lines_list[0][:1] == '<':
failed = True
except Exception as e:
failed = True
return lines_list, failed
def short_url(url:str)->str:
p:str = '(?:http.*://)?(?P<host>[^:/ ]+).?(?P<port>[0-9]*).*'
m:object = re.search(p,url)
host:str = m.group('host')
f_host:str = f'http://{host}'
return f_host
def shorten(input:str)->str:
try:
url:str = input.replace('http://', '').replace('https://', '').replace('/', '')
@ -58,18 +115,236 @@ def get_dict(lines_list:list, which:int)->dict:
if which == 3:
hosts_dict[host] = {'b32': b32,'b64': b64,}
return hosts_dict
def registrar_txt_files()->list:
text_file_directory:str = os.path.join(Settings.script_directory, Settings.config['host_files'])
txt_files:list = glob.glob(f'{text_file_directory}/*.txt')
reg_txt_files:list = []
for file in txt_files:
reg_txt_files.append(os.path.basename(file))
return reg_txt_files
def discrepancy_checker()->str:
host_compare:dict = {}
registrars = registrar_txt_files()
unique_hosts:list = []
for registrar in registrars:
temp:dict = {}
lines_list, failed = read_hosts_file(os.path.join(Settings.config['host_files'], registrar))
if not failed:
for line in lines_list:
split_line:list = line.split('=')
hostname:str = split_line[0]
temp[hostname] = line.replace(f'{hostname}=', '')
if hostname not in unique_hosts:
unique_hosts.append(hostname)
host_compare[registrar] = temp
different_b64:dict = {}
for host in unique_hosts:
check_b32:list = []
for registrar in host_compare:
if host in host_compare[registrar]:
b32 = get_b32(host_compare[registrar][host])
if b32 not in check_b32:
check_b32.append(get_b32(host_compare[registrar][host]))
if len(check_b32) > 1:
temp = {}
for registrar in host_compare:
if host in host_compare[registrar]:
temp[registrar] = get_b32(host_compare[registrar][host])
different_b64[host] = temp
if len(different_b64) > 0:
subtitle:str = f"{len(different_b64)} discrepancies found"
output_list:list = []
for host_unique in different_b64:
output_list.append(f'> {host_unique}')
for reg, unique_b32 in different_b64[host_unique].items():
output_list.append(f' {unique_b32} - {reg}')
output_msg:str = '\n'.join(output_list)
else:
subtitle = f'{Cl.green}No discrepancies found{Cl.normal}'
output_msg = 'Nothing to see here'
print_title('Registrar Discrepancy Checker', subtitle)
return output_msg
def eepget_curl_thread(url:str)->bool:
try:
download_dir:str = os.path.join(Settings.script_directory, Settings.config['host_files'])
o_:str = shorten(short_url(url)).replace('.i2p', '.txt')
msg:str = f'ECHO {Cl.green}Downloading {o_}{Cl.normal}'
os.system(f"{msg} && curl --parallel --parallel-immediate --parallel-max 5 --retry {Settings.config['max_tries']} -x http://{Settings.config['http_host']}:{Settings.config['http_port']} --create-dirs -o '{o_}' --output-dir {download_dir} {url}")
return True
except Exception as e:
print(f"{Cl.red}[Error]{Cl.normal} {e}")
return False
def eepget_curl(urls_:list)->str:
print_title('Downloading host.txt files', f'Using {len(urls_)} sources...')
with ThreadPoolExecutor() as pool:
pool.map(eepget_curl_thread, urls_)
return f'{Cl.green}Completed{Cl.normal}'
def duplicates(host_file:str)->str:
host_name, txt_file = strip_filename(host_file)
lines_list, failed = read_hosts_file(os.path.join(Settings.config['host_files'], txt_file))
if failed:
return f'{Cl.red}ERROR: {txt_file} not found or malformed.{Cl.normal}'
hosts_dict = get_dict(lines_list, 3)
unique_b64 = {}
found = []
for k, v in hosts_dict.items():
if v['b64'] not in unique_b64:
unique_b64[v['b64']] = [k]
else:
unique_b64[v['b64']].append(k)
for k, v in unique_b64.items():
if len(v) > 1:
found.append(f'{get_b32(k)} - {", ".join(v)}')
print_title(f'Duplicate hostnames from {host_name}', f'Found {len(found)} unique b32s that are registered to multiple hostnames')
return '\n'.join(found)
def update_settings(msg: str, txtfile:str, update_msg:str)->str:
file_:str = f"{Settings.script_directory}/{txtfile}"
file_exists:bool = os.path.exists(file_)
new_line:str = f'{msg}\n'
try:
if not file_exists:
file_object:object = open(file_, 'a')
file_object.write(new_line)
file_object.close()
else:
file_object = open(file_, 'w')
file_object.writelines(new_line)
file_object.close()
return f'{Cl.green}Settings updated{update_msg}{Cl.normal}'
except Exception as e:
return f'{Cl.red}ERROR UPDATING SETTINGS{Cl.normal} {e}'
def read_txt_file(file_:str)->tuple:
lines_list:list = []
if os.path.isfile(file_):
with open(file_, 'r') as file:
for line_number, line in enumerate(file, start=1):
rm_line_break:str = line.replace('\n', '')
lines_list.append(rm_line_break)
failed:bool = False
else:
failed = True
return lines_list, failed
def parse_conf(lines:list)->dict:
config:dict = {}
for line in lines:
kv = line.split("#")[0].split('=')
if len(kv) > 1:
key:str = kv[0].replace(' ', '')
value:str = kv[1].replace(' ', '')
if key == 'subscriptions':
config[key] = value.split(',')
elif key == 'max_tries':
config[key] = int(value)
else:
config[key] = value
return config
def dict_to_config(config:dict)->str:
msg:list = []
for k, v in config.items():
if type(v) is list:
msg.append(f'{k} = {",".join(v)}')
else:
msg.append(f'{k} = {v}')
return '\n'.join(msg)
def get_settings()->None:
lines, failed = read_txt_file(Settings.config['file'])
if failed:
print(f"{Cl.green}Making .settings file at {os.path.join(Settings.script_directory, Settings.config['host_files'], Settings.config['file'])}{Cl.normal}")
update_settings(dict_to_config(Settings.config), Settings.config['file'], '')
else:
keys:list = ['http_host', 'http_port', 'hosts_txt', 'host_files', 'max_tries', 'subscriptions']
config = parse_conf(lines)
for k in keys:
if k in config:
Settings.config[k] = config[k]
def args_settings_update(setting:str, config_key:str)->str:
old:str = Settings.config[config_key]
Settings.config[config_key] = setting
output_msg:str = update_settings(dict_to_config(Settings.config), Settings.config['file'], f": [{config_key}] {old} > {setting}")
return output_msg
def display_registrar_host_files():
reg_list = registrar_txt_files()
reg_dict:dict = {}
max_len:int = 0
for reg in reg_list:
if len(reg) > max_len:
max_len = len(reg)
reg_dict[reg] = len(read_hosts_file(os.path.join(Settings.config['host_files'], reg))[0])
msg_list = []
print_title(f'Subscriptions: {len(reg_list)}', f'Index / hosts')
width:int = max_len + 10
terminal_width:int = shutil.get_terminal_size().columns
center:int = int((terminal_width - width) / 2)
bar_horizontal:str = f"{' ' * center}{Cl.green}{'' * (width)}{Cl.normal}"
bar_vertical_left:str = f"{' ' * center}{Cl.green}{Cl.normal}"
bar_vertical:str = f'{Cl.green}{Cl.normal}'
msg_list.append(bar_horizontal)
for k, v in reg_dict.items():
spacer1:str = " " * (max_len - len(k))
spacer2:str = " " * (5 - len(str(v)))
msg_list.append(f'{bar_vertical_left}{k} {spacer1}{bar_vertical} {v}{spacer2}{bar_vertical}')
msg_list.append(bar_horizontal)
return '\n'.join(msg_list)
def get_unique_hosts():
reg_list = registrar_txt_files()
reg_dict:dict = {}
reg_names_count:dict = {}
reg_names_list_order = []
for reg in reg_list:
reg_ = reg.replace('.txt', '')
hostname_list, failed = read_hosts_file(os.path.join(Settings.config['host_files'], reg))
if not failed:
reg_names_list_order.append(reg_)
for hostname in hostname_list:
host, b64, b32 = parse_hostentry(hostname)
if not host in reg_dict:
reg_dict[host] = {'b32': b32, 'b64': b64, 'registrar': reg_}
if not reg_ in reg_names_count:
reg_names_count[reg_] = 1
else:
reg_names_count[reg_] += 1
print_title(f'All unique hosts from {len(reg_list)} subs', 'Shows the unique hosts from all subs. Highest ranked subs take precedence.')
for reg in reg_names_list_order:
if reg in reg_names_count:
print(f'{reg} - {reg_names_count[reg]}')
output_msg = f'{Cl.green}Found {(len(reg_dict))} unique hosts.{Cl.normal}'
return output_msg
if __name__ == "__main__":
class Cl:
red:str = '\033[91m'; green:str = '\033[92m'; yellow:str = "\033[93m"; normal:str = '\033[0m'
get_settings()
parser = argparse.ArgumentParser()
parser.add_argument("-3", "--b32", help="lookup b32", type=str)
parser.add_argument("-6", "--b64", help="lookup b64", type=str)
parser.add_argument("-hn", "--hostname", help="lookup hostname", type=str)
parser.add_argument("-dup", "--duplicates", help=f"Checks a given host.txt file for duplicates. The default is {Settings.config['hosts_txt']}", action='store_true')
parser.add_argument("-chk", "--checkmismatched", help="Checks all hosts.txt files for discrepancies.", action='store_true')
parser.add_argument("-up", "--updatehosts", help="Uses curl to download all host.txt files. --setport to change http port", action='store_true')
parser.add_argument("-subs", "--subscriptions", help="Shows subscription list for host.txt files", action='store_true')
parser.add_argument("-all", "--allunique", help="shows all unique b32s between all host.txt files", action='store_true')
parser.add_argument("-shost", "--sethost", help=f"Set the host for http proxy to use. Current is {Settings.config['http_host']}", type=str)
parser.add_argument("-sport", "--setport", help=f"Set the HTTP port for http proxy. Current is {Settings.config['http_port']}", type=int)
parser.add_argument("-sdef", "--setdefault", help=f"Set the default host.txt file to use. Current is {Settings.config['hosts_txt']}", type=str)
parser.add_argument("-stries", "--setmaxtries", help=f"Set the max tries for curl command {Settings.config['max_tries']}", type=int)
args = parser.parse_args()
lines_list, script_directory = read_hosts_file()
if len(lines_list) == 0:
output_msg:str = f'{Cl.red}[hosts.txt file not found]{Cl.normal} Place in same directory as this script:\n{script_directory}'
elif args.b32:
lines_list, failed = read_hosts_file(os.path.join(Settings.config['host_files'], Settings.config['hosts_txt']))
if failed:
print(f'{Cl.red}[hosts.txt file not found or malformed]{Cl.normal} run -up / --updatehosts to download\nOr put in {Settings.script_directory}/{Settings.config["host_files"]}')
if args.b32:
b32_dict:dict = get_dict(lines_list, 1)
url:str = shorten(args.b32)
if url in b32_dict:
@ -89,6 +364,24 @@ if __name__ == "__main__":
output_msg = f"{hosts_dict[url]['b32']}\n{hosts_dict[url]['b64']}"
else:
output_msg = f'{Cl.yellow}[Not found]{Cl.normal} {args.hostname}'
elif args.duplicates:
output_msg = duplicates(Settings.config['hosts_txt'])
elif args.subscriptions:
output_msg = display_registrar_host_files()
elif args.allunique:
output_msg = get_unique_hosts()
elif args.updatehosts:
output_msg = eepget_curl(Settings.config['subscriptions'])
elif args.checkmismatched:
output_msg = discrepancy_checker()
elif args.setdefault:
output_msg = args_settings_update(args.setdefault, 'hosts_txt')
elif args.sethost:
output_msg = args_settings_update(args.sethost, 'http_host')
elif args.setport:
output_msg = args_settings_update(args.setport, 'http_port')
elif args.setmaxtries:
output_msg = args_settings_update(args.setmaxtries, 'max_tries')
else:
output_msg = f'{Cl.green}[Lookup] {Cl.normal}Accepts b32, b64, or hostname. -h, --help for options.'
output_msg = f'{Cl.green}[Lookup] {Cl.normal}Accepts b32, b64, or hostname. -h, --help for options.'
print(output_msg)