mirror of http://git.simp.i2p/simp/b32lookup.git
error handling for bad host.txt files, subprocess import, readme update
parent
1bc17f5829
commit
5660f50b17
|
@ -4,6 +4,12 @@ Tool to convert hostname, b32, b64 from addressbooks. Downloads hosts.txt files
|
|||
|
||||
On first run it makes `.settings` in same directory, some settings can be changed by the script like `-sdef reg` to set the default addressbook to reg. To add sites to watch_list or more addressbook subscriptions edit .settings.
|
||||
|
||||
## Get / Run
|
||||
|
||||
There are no outside dependencies besides python, just download `lookup.py`:
|
||||
|
||||
curl --retry 5 -x http://127.0.0.1:4444 -O http://git.simp.i2p/simp/b32lookup/raw/branch/main/lookup.py && python3 lookup.py -h
|
||||
|
||||
## Commands
|
||||
|
||||
-h, --help show this help message and exit
|
||||
|
|
45
lookup.py
45
lookup.py
|
@ -1,5 +1,5 @@
|
|||
#!/usr/bin/env python3
|
||||
import os, argparse, shutil; from concurrent.futures import ThreadPoolExecutor
|
||||
import os, argparse, shutil, subprocess; from concurrent.futures import ThreadPoolExecutor
|
||||
class Cl:
|
||||
red:str = '\033[91m'; green:str = '\033[92m'; yellow:str = "\033[93m"; normal:str = '\033[0m'
|
||||
class Settings:
|
||||
|
@ -190,11 +190,13 @@ def eepget_curl_thread(url:str)->bool:
|
|||
try:
|
||||
download_dir:str = os.path.join(Settings.script_directory, Settings.config['host_files'])
|
||||
o_:str = shorten(short_url(url)).replace('.i2p', '.txt')
|
||||
msg:str = f'ECHO {Cl.green}Downloading {o_}{Cl.normal}'
|
||||
# msg:str = f'echo {Cl.green}Downloading {o_}{Cl.normal}'
|
||||
print(f'{Cl.yellow}+ Downloading {o_}{Cl.normal}...')
|
||||
cmd = f"curl --retry {Settings.config['max_tries']} -x http://{Settings.config['http_host']}:{Settings.config['http_port']} --create-dirs -o '{o_}' --output-dir {download_dir} {url}"
|
||||
result = subprocess.run(cmd, shell=True, capture_output=True)
|
||||
# result = subprocess.run(cmd, shell=True)
|
||||
if result.returncode == 0:
|
||||
print(f"{Cl.green}Downloaded {o_}{Cl.normal}")
|
||||
print(f"{Cl.green}> Downloaded {o_}{Cl.normal}")
|
||||
else:
|
||||
print(f"{Cl.red}Error. Return code: {result.returncode}{Cl.normal}")
|
||||
print(f"{Cl.red}Error message: {result.stderr.decode('utf-8')}{Cl.normal}")
|
||||
|
@ -204,7 +206,6 @@ def eepget_curl_thread(url:str)->bool:
|
|||
return False
|
||||
|
||||
def eepget_curl(urls_:list)->str:
|
||||
import subprocess
|
||||
print_title('Downloading host.txt files', f'Using {len(urls_)} sources...')
|
||||
with ThreadPoolExecutor() as pool:
|
||||
pool.map(eepget_curl_thread, urls_)
|
||||
|
@ -392,31 +393,39 @@ def get_unique_hosts()->str:
|
|||
def m_hostname(hostname)->str:
|
||||
hosts_dict:dict = get_dict(Mem.lines_list, 3)
|
||||
url = shorten(hostname)
|
||||
if url in hosts_dict:
|
||||
output_msg:str = f"{hosts_dict[url]['b32']}\n{hosts_dict[url]['b64']}"
|
||||
else:
|
||||
output_msg = f'{Cl.yellow}[Not found]{Cl.normal} {hostname}'
|
||||
try:
|
||||
if url in hosts_dict:
|
||||
output_msg:str = f"{hosts_dict[url]['b32']}\n{hosts_dict[url]['b64']}"
|
||||
else:
|
||||
output_msg = f'{Cl.yellow}[Not found]{Cl.normal} {hostname}'
|
||||
except Exception as e:
|
||||
output_msg = f"{Cl.red}[Error] bad host.txt file {e}{Cl.normal}"
|
||||
return output_msg
|
||||
|
||||
def m_b64(b64)->str:
|
||||
b64_dict:dict = get_dict(Mem.lines_list, 2)
|
||||
if b64 in b64_dict:
|
||||
output_msg:str = f"{b64_dict[b64]['host']}\n{b64_dict[b64]['b32']}"
|
||||
else:
|
||||
output_msg = f"{Cl.yellow}[Not found]{Cl.normal} {b64}"
|
||||
try:
|
||||
if b64 in b64_dict:
|
||||
output_msg:str = f"{b64_dict[b64]['host']}\n{b64_dict[b64]['b32']}"
|
||||
else:
|
||||
output_msg = f"{Cl.yellow}[Not found]{Cl.normal} {b64}"
|
||||
except Exception as e:
|
||||
output_msg = f"{Cl.red}[Error] bad host.txt file {e}{Cl.normal}"
|
||||
return output_msg
|
||||
|
||||
def m_b32(b32)->str:
|
||||
b32_dict:dict = get_dict(Mem.lines_list, 1)
|
||||
url:str = shorten(b32)
|
||||
if url in b32_dict:
|
||||
output_msg:str = f"{b32_dict[url]['host']}\n{b32_dict[url]['b64']}"
|
||||
else:
|
||||
output_msg = f"{Cl.yellow}[Not found]{Cl.normal} {b32}"
|
||||
try:
|
||||
if url in b32_dict:
|
||||
output_msg:str = f"{b32_dict[url]['host']}\n{b32_dict[url]['b64']}"
|
||||
else:
|
||||
output_msg = f"{Cl.yellow}[Not found]{Cl.normal} {b32}"
|
||||
except Exception as e:
|
||||
output_msg = f"{Cl.red}[Error] bad host.txt file {e}{Cl.normal}"
|
||||
return output_msg
|
||||
|
||||
def curl_command_show(url):
|
||||
import subprocess
|
||||
print_title('notbob New', 'New sites on i2p')
|
||||
try:
|
||||
cmd = f"curl --retry {Settings.config['max_tries']} -x {Settings.config['http_host']}:{Settings.config['http_port']} -s {url}"
|
||||
|
@ -428,7 +437,6 @@ def curl_command_show(url):
|
|||
print(f"{Cl.red}[Error]{Cl.normal} {e}")
|
||||
|
||||
def watch_hosts_notbob():
|
||||
import subprocess
|
||||
cmd = f"http://notbob.i2p/cgi-bin/line24.cgi?c=1&lw=20&h={'&h='.join(Settings.config['watch_list'])}"
|
||||
subprocess.run(f'''watch -n {Settings.config['notbob_watch_seconds']} -t --color "curl -x http://{Settings.config['http_host']}:{Settings.config['http_port']} -s '{cmd}'"''', shell=True)
|
||||
|
||||
|
@ -483,5 +491,6 @@ def start():
|
|||
output_msg = f'{Cl.green}[Lookup] {Cl.normal}Accepts b32, b64, or hostname. -h, --help for options.'
|
||||
if output_msg:
|
||||
print(output_msg)
|
||||
|
||||
if __name__ == "__main__":
|
||||
start()
|
||||
|
|
Loading…
Reference in New Issue