added automatic image download and compression for webviewing from ardu's bot

main
simp 2025-05-31 15:18:43 -04:00
parent e67536c6dd
commit de642692b7
3 changed files with 240 additions and 8 deletions

81
app.py
View File

@ -157,7 +157,8 @@ class Pl():
point_multiplier:bool = True
artist_bonus:bool = True
expire_days:int = 3
all = {}
instance_url:str = ''
all:dict = {}
db.init_app(app)
with app.app_context():
@ -245,8 +246,7 @@ def home():
guess_list.append(f"{i+1}. {g_list[i]}<br>")
if i > 8:
break
img_ = f'''<a href="/id/{k}"><img src="{v['url']}" alt="{k}" width="100%"></a>'''
img_ = f'''<a href="/id/{k}"><img src="{get_relative_image_url(v['url'], Pl.instance_url)}" alt="{k}" width="100%"></a>'''
guesses = f'''{''.join(guess_list)}'''
div_dict[n] = f'''<div class="column">{h1_}{img_}</div>{guesses}</div>'''
n += 1
@ -302,7 +302,7 @@ def finished():
if i > 8:
break
answer = f'''<div class="image_row">{v['title']} - {v['artist']}'''
img_ = f'''<a href="/id/{k}"><img src="{v['url']}" alt="{k}" width="100%"></a>'''
img_ = f'''<a href="/id/{k}"><img src="{get_relative_image_url(v['url'], Pl.instance_url)}" alt="{k}" width="100%"></a>'''
guesses = f'''{''.join(guess_list)}'''
div_dict[n] = f'''<div class="column">{h1_}{answer}{img_}</div>{guesses}</div>'''
n += 1
@ -427,7 +427,7 @@ def img(input_string):
h1_ = f'''<div class="caption_img"><h1 style="text-align:left">??? - ???</h1>ID {s_id} | TRIES {Pl.all[s_id]['tries_total']} | {expires}</div>'''
else:
h1_ = f'''<div class="caption_img"><h1 style="text-align:left">{Pl.all[s_id]['title']} - {Pl.all[s_id]['artist']}</h1>ID {s_id} | TRIES {Pl.all[s_id]['tries_total']} | WINNER {Pl.all[s_id]['winner']}</div>'''
body = f'''<div style="height: 500px; background-size: 500px 500px; background-repeat: no-repeat; background-image: url({Pl.all[s_id]['url']});">
body = f'''<div style="height: 500px; background-size: 500px 500px; background-repeat: no-repeat; background-image: url({get_relative_image_url(Pl.all[s_id]['url'], Pl.instance_url)});">
<div1><div class="caption" style="height:50px">{h1_}</div>
</div>'''
elif s_id in Pl.finished_songs:
@ -518,6 +518,8 @@ class Sl():
reconnects = 0
reconnect_seconds = 0
reconnect_seconds_baseline = 15
downloading = False
_download_thread = False
class Color():
a = '\x03'
@ -545,6 +547,7 @@ class Th_alive():
_keep_alive = True
kill = False
def delete_file(item:str)->None:
'''tries to delete given file'''
try:
@ -597,6 +600,7 @@ def convert_size(size, digits):
Pl.players_dict = load_json(script_directory, 'data', 'players.json')
Pl.finished_songs = load_json(script_directory, 'data', 'finished.json')
Pl.songs_dict = load_json(script_directory, 'data', 'active.json')
Pl.instance_url = instance_url
class Irc_bot:
def __init__(self, username, password, hostname, port, channel, http_port, primary, auth_users, instance_url):
@ -680,6 +684,7 @@ class Irc_bot:
self.auto_showchan_disable = string_to_list(irc_setting_update('auto_showchan_disable', 0, False, '#saltr', 0, True)[2])
self.instance_url = irc_setting_update('instance_url', 0, False, instance_url, 0, True)[2]
self.join_showchan = irc_setting_update('join_showchan', 0, True, '', 0, True)[1]
Pl.instance_url = self.instance_url
self.hostname = hostname
self.port = port
@ -708,6 +713,48 @@ class Irc_bot:
self.nick_mapped = {}
self.max_posts_dict = {}
self.update_data_first_run = True
self.proxies = {
'http': f'http://127.0.0.1:{http_port}',
'https': f'http://127.0.0.1:{http_port}',
}
def download_files_thread(exit_event, self):
while not exit_event.is_set():
if Sl.downloading:
for k, v in Pl.songs_dict.items():
if 'original' not in Pl.songs_dict[k]:
Pl.songs_dict[k]['original'] = v['url']
proc_img = True
else:
if v['url'] == v['original']:
proc_img = True
else:
proc_img = False
if proc_img:
success, file_name = file_download(v['url'], self.proxies)
if success:
Pl.songs_dict[k]['url'] = f'{convert_http(self.instance_url)}/static/images/{file_name}'
print(Pl.songs_dict)
for k, v in Pl.finished_songs.items():
if 'original' not in Pl.finished_songs[k]:
proc_img = True
Pl.finished_songs[k]['original'] = v['url']
else:
if v['url'] == v['original']:
proc_img = True
else:
proc_img = False
if proc_img:
success, file_name = file_download(v['url'], self.proxies)
if success:
Pl.finished_songs[k]['url'] = f'{convert_http(self.instance_url)}/static/images/{file_name}'
Sl.downloading = False
print('finished downloading')
time.sleep(5)
if Th_alive.kill:
Sl._download_thread = False
exit_event.set()
sys.exit()
def send_filter(input_str:str)->str:
filter_ = {
@ -784,6 +831,27 @@ class Irc_bot:
finished_songs = load_json(script_directory, 'data', 'finished.json')
if finished_songs != Pl.finished_songs:
generate_json(Pl.finished_songs, script_directory, 'data', 'finished.json')
def check_if_download_needed(checking_dict, files_to_download):
for k, v in checking_dict.items():
if 'original' not in checking_dict[k]:
files_to_download += 1
else:
if v['url'] == v['original']:
files_to_download += 1
return files_to_download
files_to_download = 0
files_to_download = check_if_download_needed(Pl.players_dict, files_to_download)
files_to_download = check_if_download_needed(Pl.finished_songs, files_to_download)
print(files_to_download)
if files_to_download > 0 and not Sl.downloading:
print('here')
Sl.downloading = True
if not Sl._download_thread:
Sl._download_thread = True
exit_download_files = threading.Event()
thread_download_files = threading.Thread(target=download_files_thread, args=(exit_download_files, self))
thread_download_files.start()
# do_update = True
# if do_update or self.first_run_update:
# index_ = os.path.join(self.html_directory, 'index.html')
@ -1301,7 +1369,7 @@ class Irc_bot:
do_command = False
if len(line) >= 6:
m = 0
m = 1
try:
new_points = int(line[5])
if new_points >= m:
@ -1578,6 +1646,7 @@ class Irc_bot:
msg = []
short_url = f'{convert_http(line[5])}'
self.instance_url = short_url
Pl.instance_url = short_url
irc_setting_update('instance_url', 0, False, short_url, 0, False)
msg_s = f'instance url set to {short_url}'
msg.append(f'{admin_user} {com_msg} {msg_s} | {rs} {short_url}')

161
game.py
View File

@ -1,8 +1,31 @@
#!/usr/bin/env python3
import Levenshtein, re, bleach, os, string, secrets, random
import Levenshtein, re, bleach, os, string, secrets, random, requests, io
from PIL import Image
from io import BytesIO
from mimetypes import MimeTypes
mime = MimeTypes()
script_directory:str = os.path.dirname(os.path.abspath(__file__))
def get_relative_image_url(url:str, instance_url:str)->str:
"""converts absolute image url to relative
Args:
url (str): absolute image url
instance_url (str): instance base url
Returns:
str: relative image url
"""
domainsplit = url.split(instance_url)
if len(domainsplit) > 1:
fname = url.split('/')
filename = fname[len(fname)-1:][0]
relative_url = f'/static/images/{filename}'
else:
relative_url = url
return relative_url
def rm_na(input_str:str)->str:
'''
Removes non-alphanumerics
@ -399,4 +422,138 @@ def filter_filler_words(input_string:str)->str:
}
for k, v in filler.items():
input_string = input_string.replace(k, v)
return input_string
return input_string
mime_types = {
'image/png': 'png',
'image/jpeg': 'jpeg',
'image/jpg': 'jpeg',
'image/gif': 'gif',
'image/webp': 'webp',
}
bytes_file_accepted = {
'jpeg': {'start': 'FFD8FF'},
'jpg': {'start': 'FFD8FF'},
'png': {'start': '89504E470D0A1A0A'},
'gif': {'start': 'GIF87a'},
'gif': {'start': 'GIF89a'},
'webp': {'start': '1a45dfa3'},
'webp': {'start': '52494646'},
}
def byte_sniffer(raw_data):
image_ = io.BytesIO(raw_data)
with image_ as file:
start_ = file.read(16).hex()
for k, v in bytes_file_accepted.items():
chars = len(v['start'])
s_cf = start_.casefold()
v_cf = v['start'].casefold()
sim = 0
for i in range(0, chars):
if v_cf[i:i+1] != '*':
if s_cf[i:i+1] == v_cf[i:i+1]:
sim += 1
else:
sim += 1
if chars == sim:
filetype = k
return filetype
else:
filetype = 'None'
return filetype
def validate_image_url(url:str)->tuple:
"""is given url from ardu's bot, if so, return filename for download
Args:
url (str): url to validate
Returns:
tuple: fname (filename), valid (bool), filetype
"""
accepted_b32 = [
'achaniseb6rz4sdqx2dg7y4lxlx4jt6zsxowoasx4edouaspf6wa.b32.i2p',
'convos.simp.i2p',
]
for b32 in accepted_b32:
if b32 in url:
s_url = url.split(f'{b32}/')
if len(s_url) > 1:
print('yes')
try:
# filetype = s_url[1].split('.')[1]
fname_, filetype = os.path.splitext(url)
fname = os.path.basename(fname_)
filetype = filetype[1:]
if filetype in bytes_file_accepted:
if filetype == 'jpg':
filetype = 'jpeg'
fname = f'{fname}.jpeg'
valid = True
return fname, valid, filetype
except Exception as e:
valid = False
else:
valid = False
else:
valid = False
else:
valid = False
if not valid:
fname = ''
return fname, valid, 'none'
def resize_image_to_target_size(image, target_size_kb):
while True:
img_buffer = io.BytesIO()
image = image.convert("RGB")
image.save(img_buffer, format="JPEG", quality=75)
image_size_bytes = len(img_buffer.getvalue())
if image_size_bytes <= target_size_kb * 995:
return image, image_size_bytes
width, height = image.size
new_width = int(width * 0.9) # resize factor
new_height = int(height * 0.9)
image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)
def file_download(url:str, proxies:dict)->tuple:
fname, valid, filename_filetype = validate_image_url(url)
print(fname, valid, filename_filetype)
file_name = os.path.join(script_directory, 'static', 'images', fname)
mime_type = mime.guess_type(url, strict=True)
print(mime_type[0])
try:
mime_type_filetype = mime_types[mime_type[0]]
except Exception as e:
mime_type_filetype = 'unknown'
if mime_type[0] in mime_types:
try:
response = requests.get(url, proxies=proxies, stream=True)
if response.status_code == 200:
image_ = (response.raw.data)
byte_sniff_filetype = byte_sniffer(image_)
print(byte_sniff_filetype, mime_type_filetype, filename_filetype)
if byte_sniff_filetype == mime_type_filetype and mime_type_filetype == filename_filetype:
print('valid')
stream = BytesIO(image_)
img_obj = Image.open(stream).convert("RGBA")
resized_image, image_size_bytes = resize_image_to_target_size(img_obj, 150*1000)
resized_image.save(file_name, format="JPEG", quality=75, optimize=True, progressive=True)
success = True
return success, fname
else:
success = False
except Exception as e:
print(f'error downloading {e}')
# log.warning(f'Error downloading: {e}')
success = False
else:
success = False
return success, fname

View File

@ -1,19 +1,25 @@
bleach==6.2.0
blinker==1.9.0
certifi==2025.4.26
charset-normalizer==3.4.2
click==8.2.1
configparser==7.2.0
Flask==3.1.1
Flask-SQLAlchemy==3.1.1
greenlet==3.2.2
gunicorn==23.0.0
idna==3.10
itsdangerous==2.2.0
Jinja2==3.1.6
Levenshtein==0.27.1
logging==0.4.9.6
MarkupSafe==3.0.2
packaging==25.0
pillow==11.2.1
RapidFuzz==3.13.0
requests==2.32.3
SQLAlchemy==2.0.41
typing_extensions==4.13.2
urllib3==2.4.0
webencodings==0.5.1
Werkzeug==3.1.3