i2music/app.py

858 lines
30 KiB
Python
Executable File

#!/usr/bin/env python3
import os, re, urllib.request, markdown, bleach, json, random, threading, time, subprocess, sys
from flask import Flask, render_template, session, abort, send_file, request as req_flask
from datetime import datetime, timedelta
from configload import *
from updateplaylists import update_api, random_string, read_txt_file, remove_track_count
app = Flask(__name__)
app.config["SECRET_KEY"] = secret_key
class Th():
playlist_thread = False
def get_short_url(long_url:str)->str:
'''regex to get shortened url'''
p = '(?:http.*://)?(?P<host>[^:/ ]+).?(?P<port>[0-9]*).*'
m = re.search(p,long_url)
short = m.group('host')
port = m.group('port')
if port != '':
x = f':{port}'
else:
x = ''
url = f'{short}{x}'
return url
def get_extension(filename:str)->tuple:
'''given a string converts to filename and extension'''
split_ = filename.split('.')
ext_length = len(split_[len(split_)-1])+1
name_ = filename[:-ext_length]
ext_ = filename[-ext_length:]
return name_, ext_
def get_svg(absolute_location:str, filename:str)->str:
'''Convert svg file for inline html'''
try:
if filename == '':
filename = 'home'
with open(f'{absolute_location}/static/icons/{str(filename)}.svg', 'r') as file:
icon = file.read().replace('\n', '')
except Exception as e:
icon = ''
return icon
def sanitize_input(input_str:str)->str:
'''sanitize user input.'''
allowed_tags = ''
x1 = bleach.clean(input_str, tags=allowed_tags)
sanitized_str = re.sub(r'<script\b[^>]*>(.*?)</script>', '', x1, flags=re.IGNORECASE)
sanitized_ = sanitized_str.replace('&amp;', '&')
return sanitized_
def get_footer()->str:
'''Generate footer. 2 options based on input being present or not.'''
footer_list = [
left_footer,
'<a href="/tos" target="info_pannel">TOS</a>',
'<a href="/changelog" target="info_pannel">Changelog</a>',
'<a href="/about" target="info_pannel">About</a>',
f'<a href="{ah}">AH</a>',
right_footer,
]
footer = ' | '.join(footer_list)
return footer
def get_player_icons()->dict:
'''Gets svg files for inline. Icons list are the names of svg files to read as icons.'''
icons_dict = {}
icons_list = [
'stop',
'pause',
'next',
'previous',
'back',
'backsmall',
'playlist',
'exit',
'delay',
'repeat',
'play',
'playsmall',
'theme',
'magnet',
'download',
'search',
'listeners',
'download_large',
]
for item in icons_list:
icons_dict[item] = get_svg(script_directory, item)
return icons_dict
def load_track_data(all_tracks:dict, playlist:dict, t_dict:dict)->tuple:
'''load the track data from data.json file'''
json_file = (os.path.join(script_directory, 'data.json'))
try:
with open(json_file) as json_file:
data = json.load(json_file)
all_tracks = data['all_tracks']
playlist = data['playlist']
t_dict = data['torrent']
except Exception as e:
print(f'Error: {e}')
return all_tracks, playlist, t_dict
return all_tracks, playlist, t_dict
def get_themes(theme:str, mini:bool)->str:
'''convert session theme str to the needed css file'''
if theme.casefold() == 'darkgreen':
index_theme = 'darkgreen.css'
mini_theme = 'darkgreen_mini.css'
elif theme.casefold() == 'pink':
index_theme = 'pink.css'
mini_theme = 'pink_mini.css'
elif theme.casefold() == 'blue':
index_theme = 'blue.css'
mini_theme = 'blue_mini.css'
elif theme.casefold() == 'red':
index_theme = 'red.css'
mini_theme = 'red_mini.css'
if mini:
theme_html = f'/static/{mini_theme}'
else:
theme_html = f'/static/{index_theme}'
return theme_html
def search_tracks(term:str)->list:
search_dict = {}
for item in all_tracks:
matching_keys = [key for key, value in all_tracks[item].items() if re.search(term, key, re.IGNORECASE)]
if len(matching_keys) > 0:
search_dict[item] = matching_keys
result_list = []
if len(search_dict) > 0:
for pl in search_dict:
# print(search_dict[pl])
for i in range(0, len(search_dict[pl])):
track_name = get_extension(search_dict[pl][i])[0]
# print(track_name)
track_href = urllib.parse.quote(f'/track/{pl}/{search_dict[pl][i]}', safe='/', encoding=None, errors=None)
playlist_href = urllib.parse.quote(f'/track_list/{pl}', safe='/', encoding=None, errors=None)
result_list.append(f'<tr><td style="font-size:80%"><a href="{track_href}" target="player">{track_name}</a></td><td style="font-size:80%"><a href="{playlist_href}" target="track_list">{pl}</a></td></tr>')
return result_list
def load_balance_b32s(b32_dict:dict)->int:
'''with dict of b32's chose the one with the least listeners, or random if multiple are the same'''
f = []
for k, v in b32_dict.items():
f.append(len(b32_dict[k]['timestamps']))
smallest = min(f)
occurs = f.count(smallest)
# print(occurs)
if occurs >= 1:
rand_list = []
for i in range(0,len(f)):
if f[i] == smallest:
rand_list.append(i)
chosen = random.choice(rand_list)
# print(rand_list)
else:
chosen = smallest
return chosen
def get_listeners(b32_dict:dict)->int:
'''get number of listeners'''
# print(b32_dict)
listeners = 0
seen_list = []
for k, v in b32_dict.items():
if len(b32_dict[k]['timestamps']) > 0:
for i in range(0, len(b32_dict[k]['timestamps'])):
if b32_dict[k]['timestamps'][i][1] not in seen_list:
seen_list.append(b32_dict[k]['timestamps'][i][1])
# print(seen_list)
listeners = len(seen_list)
return listeners
def get_listeners_single(b32_dict:dict)->int:
'''get number of listeners from single b32'''
listeners = 0
seen_list = []
if len(b32_dict['timestamps']) > 0:
for i in range (len(b32_dict['timestamps'])):
if b32_dict['timestamps'][i][1] not in seen_list:
seen_list.append(b32_dict['timestamps'][i][1])
listeners = len(seen_list)
return listeners
# def update_playlists_thread(exit_event, script_directory):
# while not exit_event.is_set():
# Th.playlist_thread = True
# try:
# # command = f'python3 {os.path.join(script_directory, '__main__.py')}'
# # subprocess.run(["python3", os.path.join(script_directory, '__main__.py')])
# # os.system(command)
# msg = 'UPDATE'
# update_api(msg, '.runupdate')
# except Exception as e:
# print(f'Error {e}')
# Th.playlist_thread = False
# exit_event.set()
# sys.exit()
icons_dict = get_player_icons()
themes_dict = {}
endpoint = random_string(20)
update_api(endpoint, '.endpoint')
for item in theme_list:
color = item.split(':')
themes_dict[color[0]] = f'''<div class="svg_themes"><div data-tooltip="{color[0]}"><span style="color: {color[1]};">{icons_dict['stop']}</span></div></div>'''
all_tracks, playlist, t_dict = load_track_data({}, {}, {})
footer:str = get_footer()
if get_peercounts and make_playlist_torrents:
scraping_on:bool = True
else:
scraping_on:bool = False
@app.route('/', methods=["GET"])
def home():
try:
theme = session['theme']
except KeyError as e:
theme = default_theme
theme_html = get_themes(theme, False)
body = f'''
<iframe name="track_list" src="/track_list" allowtransparency="true" scrolling="yes" width="300px" style="position: absolute; height: 100%; border: none; left: 0;"></iframe>
<div class="centered"><iframe name="player" src="/track" allowtransparency="true" scrolling="no" height="490px" width="500px" frameborder="0"></iframe></div>
<div class="upper_right"><iframe name="settings_pannel" src="/settings_init" allowtransparency="true" scrolling="yes" width="300px" style="position: absolute; height: 100%; border: none; right: 0;"></iframe></div>
<div class="lower_right"><iframe name="info_pannel" src="/info" allowtransparency="true" scrolling="yes" width="300px" style="position: absolute; height: 100%; border: none; right: 0;"></iframe></div>
'''
return render_template('index.html', body=body, title=site_title, footer=footer, theme_html=theme_html)
@app.route('/settings', methods=["GET"])
def settings():
# global script_directory
refresh_time = 60
title = 'Settings'
try:
delay = session['delay']
except KeyError as e:
delay = default_delay
try:
repeat = session['repeat']
except KeyError as e:
repeat = False
try:
theme = session['theme']
except KeyError as e:
theme = default_theme
theme_html = get_themes(theme, True)
toggle = 'on'
if repeat:
repeat_msg = 'on'
toggle = 'off'
else:
repeat_msg = 'off'
e = f'''<a href="/set_delay"><div class="svg_settings"><div data-tooltip="delay: {delay}s">{icons_dict['delay']}</div></a></div>
<a href="/set_repeat/{toggle}"><div class="svg_settings"><div data-tooltip="repeat: {repeat_msg}">{icons_dict['repeat']}</div></a></div>
<a href="/set_theme"><div class="svg_settings"><div data-tooltip="theme: {theme}">{icons_dict['theme']}</div></a></div>
<a href="/search"><div class="svg_settings"><div data-tooltip="search">{icons_dict['search']}</div></a></div>
<a href="/listeners" target="info_pannel"><div class="svg_settings"><div data-tooltip="listeners: {get_listeners(b32_dict)}">{icons_dict['listeners']}</div></a></div>
'''
body = e
meta_reload = f'''<meta http-equiv="refresh" content="{refresh_time};url=/settings">'''
return render_template('tracklist.html', body=body, title=title, theme_html=theme_html, meta=meta_reload)
@app.route('/listeners', methods=["GET"])
def settings_listeners():
try:
theme = session['theme']
except KeyError as e:
theme = default_theme
theme_html = get_themes(theme, True)
title = 'Settings: listeners'
stats_html = []
b32_table = []
for k, v in b32_dict.items():
dest = get_short_url(b32_dict[k]['b32'])[:4]
b32_table.append((f"<tr><td>{dest}</td><td>{get_listeners_single(b32_dict[k])}</td></tr>"))
stats_html.append(f'''<table><th>Dest</th><th>Listeners</th>{''.join(b32_table)}</table>''')
body = f'''<div class="column" style="margin:2.5% 2.5% 45% 2.5%;">{' '.join(stats_html)}</div>'''
e = f'''<a href="/info"><div class="svg_exit">{icons_dict['exit']}</a></div>'''
return render_template('md_file.html', body=body, title=title, exit=e, theme_html=theme_html)
@app.route('/search', methods=["GET", "POST"])
def search():
try:
theme = session['theme']
except KeyError as e:
theme = default_theme
theme_html = get_themes(theme, True)
title = 'Search'
if req_flask.method == "POST":
search_html = []
term = sanitize_input((req_flask.form.get("term")))
if len(term) <= 32:
results = search_tracks(term)
if len(results) > 0:
results_table = f'''<table><th>Track ({len(results)})</th><th>Playlist</th>{''.join(results)}</table>'''
search_html.append(results_table)
else:
search_html.append('No results.')
else:
search_html.append('Query too long.')
body = f'''<div class="column" style="margin:2.5% 2.5% 45% 2.5%;">{' '.join(search_html)}</div>'''
e = f'''<a href="/info"><div class="svg_exit">{icons_dict['exit']}</a></div>'''
return render_template('md_file.html', body=body, title=title, exit=e, theme_html=theme_html)
else:
search_html = []
search_html.append(f'''<form action="/search" target="info_pannel" method="post"><input type="text" name="term" placeholder="Search..">
<button type="submit" class="button">Submit</button></form>''')
search_html.append(f'''<a href="/settings"><div class="svg_exit"><div data-tooltip="back">{icons_dict['back']}</div></a></div>''')
body = ' '.join(search_html)
return render_template('tracklist.html', body=body, title=title, theme_html=theme_html)
@app.route('/set_theme', methods=["GET"])
def settings_theme_main():
try:
theme = session['theme']
except KeyError as e:
theme = default_theme
theme_html = get_themes(theme, True)
options = themes_dict
options_html = []
for item in options:
# print(item)
options_html.append(f'<a href="/set_theme/{item}">{options[item]}</a>')
title = 'Settings: theme'
options_html.append(f'''<a href="/settings"><div class="svg_exit"><div data-tooltip="back">{icons_dict['back']}</div></a></div>''')
body = ' '.join(options_html)
return render_template('tracklist.html', body=body, title=title, theme_html=theme_html)
@app.route('/set_theme/<input_string>', methods=["GET"])
def settings_theme_main_option(input_string):
option_val = sanitize_input(input_string)
options_html = []
try:
theme = session['theme']
except KeyError as e:
theme = default_theme
theme_html = get_themes(theme, True)
if len(option_val) < 24:
try:
if option_val in themes_dict:
failed = False
else:
failed = True
except Exception as e:
failed = True
else:
failed = True
meta_reload = ''
try:
if not failed:
session['theme'] = option_val
options_html.append(f'Theme: {option_val}')
else:
options_html.append(f'Invalid value.')
except KeyError as e:
options_html.append('Cookies needed to work.')
meta_reload = f'''<meta http-equiv="refresh" content="3;url=/settings">'''
title = 'Settings: theme'
changeit_now = [
f'''<a href="/settings"><div class="svg_exit"><div data-tooltip="back">{icons_dict['back']}</div></a></div>''',
'<p><a href="/" target="_parent">[Refresh now]</a></p>',
]
for item in changeit_now:
options_html.append(item)
body = ' '.join(options_html)
return render_template('tracklist.html', body=body, title=title, meta=meta_reload, theme_html=theme_html)
@app.route('/settings_init', methods=["GET"])
def settings_init():
try:
theme = session['theme']
except KeyError as e:
theme = default_theme
theme_html = get_themes(theme, True)
title = 'Loading'
wait_time = '2'
body = f'''<div style="position: fixed; top: 0; right: 0;"><div class="loader"></div></div>'''
meta_reload = f'''<meta http-equiv="refresh" content="{wait_time};url=/settings">'''
return render_template('tracklist.html', body=body, title=title, meta=meta_reload, theme_html=theme_html)
@app.route('/set_repeat/<input_string>', methods=["GET"])
def settings_repeat_main_option(input_string):
option_val = sanitize_input(input_string)
options_html = []
try:
theme = session['theme']
except KeyError as e:
theme = default_theme
theme_html = get_themes(theme, True)
if len(option_val) <= 3:
try:
if option_val.casefold() == 'on'.casefold():
converted = True
elif option_val.casefold() == 'off'.casefold():
converted = False
else:
failed = True
failed = False
except Exception as e:
failed = True
else:
failed = True
try:
if not failed:
session['repeat'] = converted
options_html.append(f'Repeat {option_val}.')
else:
options_html.append(f'Invalid value.')
except KeyError as e:
options_html.append('Cookies needed to work.')
meta_reload = f'''<meta http-equiv="refresh" content="3;url=/settings">'''
title = 'Settings: repeat'
options_html.append(f'''<a href="/settings"><div class="svg_exit"><div data-tooltip="back">{icons_dict['back']}</div></a></div>''')
body = ' '.join(options_html)
return render_template('tracklist.html', body=body, title=title, meta=meta_reload, theme_html=theme_html)
@app.route('/set_delay', methods=["GET"])
def settings_delay_main():
try:
theme = session['theme']
except KeyError as e:
theme = default_theme
theme_html = get_themes(theme, True)
options = ['1', '3', '5', '10']
options_html = []
options_html.append('Options (seconds):')
for item in options:
options_html.append(f'<a href="/set_delay/{item}">[{item}]</a>')
title = 'Settings: delay'
options_html.append(f'''<a href="/settings"><div class="svg_exit"><div data-tooltip="back">{icons_dict['back']}</div></a></div>''')
body = ' '.join(options_html)
return render_template('tracklist.html', body=body, title=title, theme_html=theme_html)
@app.route('/set_delay/<input_string>', methods=["GET"])
def settings_delay_main_option(input_string):
try:
theme = session['theme']
except KeyError as e:
theme = default_theme
theme_html = get_themes(theme, True)
option_val = sanitize_input(input_string)
options_html = []
if len(option_val) <= 2:
try:
converted = int(option_val)
failed = False
except Exception as e:
failed = True
else:
failed = True
try:
if not failed:
session['delay'] = converted
options_html.append(f'Updated to {option_val}.')
else:
options_html.append(f'Invalid value.')
except KeyError as e:
options_html.append('Cookies needed to work.')
meta_reload = f'''<meta http-equiv="refresh" content="3;url=/settings">'''
title = 'Settings: delay'
options_html.append(f'''<a href="/settings"><div class="svg_exit"><div data-tooltip="back">{icons_dict['back']}</div></a></div>''')
body = ' '.join(options_html)
return render_template('tracklist.html', body=body, title=title, meta=meta_reload, theme_html=theme_html)
@app.route('/track_list', methods=["GET"])
def playlist_home():
try:
theme = session['theme']
except KeyError as e:
theme = default_theme
theme_html = get_themes(theme, True)
title = 'Home'
playlists_list = []
playlists_list.append('<div class="column" style="margin:2.5% 2.5% 45% 2.5%;"><table><th>Playlists</th>')
# print(playlist)
# <div class="svg"><div data-tooltip="stop"><a href="/track">{icons_dict['stop']}</a></div>
for k, v in playlist.items():
start_ = next(iter(all_tracks[k]))
playlists_list.append(f'''<tr><td><div class="svg" style="width: 22px;"><div data-tooltip="play"><a href="/track/{k}/{start_}" target="player">{icons_dict['playsmall']}</a></div></div><a href="/track_list/{k}" style="line-height: 20px">{k} ({len(v)})</a></td></tr>''')
playlists_list.append('</table></div>')
body = ''.join(playlists_list)
return render_template('tracklist.html', body=body, title=title, theme_html=theme_html)
@app.route('/track_list/<input_string>', methods=["GET"])
def track_list(input_string):
try:
theme = session['theme']
except KeyError as e:
theme = default_theme
theme_html = get_themes(theme, True)
pl = sanitize_input(input_string)
title = pl
if len(t_dict) > 0:
t_dict[pl]['infohash']
if scraping_on:
seeds = t_dict[pl]['seeds']
leeches = t_dict[pl]['leeches']
c_ = t_dict[pl]['completed']
if c_ != 0:
completed = f' ({c_})'
tt = ' (completed)'
else:
completed = ''
tt = ''
peers = ''
peers = f'''<td><div data-tooltip="seeds/leeches{tt}">{seeds}/{leeches}{completed}</div></td>'''
else:
peers = ''
# t_row = ''
t_row = f'''
</table><table><tr><td>
<a href="{t_dict[pl]['magnet']}"><div class="svg"><div data-tooltip="magnet link">{icons_dict['magnet']}</a></div></div>
<a href="/static/{subpath}/{pl}.torrent"><div class="svg"><div data-tooltip="download {pl}.torrent">{icons_dict['download']}</a></div></div>
</td>
{peers}
<td>{t_dict[pl]['total_size']}{t_dict[pl]['unit']}</td></tr>
'''
else:
t_row = ''
body = f'''<div class="column" style="margin:2.5% 2.5% 45% 2.5%;"><table><th style="line-height: 20px">{pl} <a href="/track_list"><div class="svg"><div data-tooltip="back">{icons_dict['backsmall']}</a></div></div></th>{(''.join(playlist[pl]))}{t_row}</table></div>'''
return render_template('tracklist.html', body=body, title=title, theme_html=theme_html)
@app.route('/track/<input_string1>/<input_string2>', methods=["GET"])
def track(input_string1, input_string2):
global b32_dict
try:
b32_n = session['b32_n']
ident = session['ident']
try:
b32_ = b32s_[b32_n]
except IndexError as e:
if len(b32_dict[0]['b32']) > 4 :
b32_n = load_balance_b32s(b32_dict)
b32_ = b32s_[b32_n]
session['b32_n'] = b32_n
preconnect = f'''<link rel="preconnect" href="{b32_}" />'''
else:
b32_n = 0
b32_ = ''
b32_dict[b32_n]['timestamps'].append([datetime.now(), ident])
except KeyError as e:
ident = random_string(10)
session['ident'] = ident
print(len(b32_dict[0]['b32']))
if len(b32_dict[0]['b32']) > 4 :
b32_n = load_balance_b32s(b32_dict)
print(b32_n)
print(b32s_)
b32_ = b32s_[b32_n]
else:
b32_n = 0
b32_ = ''
b32_dict[b32_n]['timestamps'].append([datetime.now(), ident])
session['b32_n'] = b32_n
if b32_ != '':
preconnect = f'''<link rel="preconnect" href="{b32_}" />'''
else:
preconnect = ''
playlist_dir = sanitize_input(input_string1)
track_name = sanitize_input(input_string2)
track_path = os.path.join(script_directory, f'static/{subpath}/{playlist_dir}', track_name)
exists_ = os.path.exists(track_path)
try:
session['current_track'] = track_name
session['current_playlist'] = playlist_dir
except Exception as e:
print(e)
try:
delayed = session['delay']
except KeyError as e:
delayed = default_delay
try:
repeat = session['repeat']
except KeyError as e:
repeat = False
try:
theme = session['theme']
except KeyError as e:
theme = default_theme
theme_html = get_themes(theme, True)
try:
img = all_tracks[playlist_dir][track_name]['image']
except KeyError as e:
img = None
if img != None:
img_url_ = f'/static/{subpath}/{playlist_dir}/{img}'
img_url_converted = urllib.parse.quote(img_url_, safe='/', encoding=None, errors=None)
else:
img_url_safe = urllib.parse.quote(default_image, safe='/', encoding=None, errors=None)
img_url_converted = f'/static/{subpath}/{img_url_safe}'
track_list_safe = urllib.parse.quote(f'/track_list/{playlist_dir}', safe='/', encoding=None, errors=None)
if exists_:
keys_iter = iter(all_tracks[playlist_dir])
nxt_key = None
last_key_ = None
for key in keys_iter:
if key == track_name:
nxt_key = next(keys_iter, None)
prev_key = last_key_
break
else:
last_key_ = key
if nxt_key == None:
nxt_key = list(all_tracks[playlist_dir])[0]
if prev_key == None:
g_ = list(all_tracks[playlist_dir])
prev_key = g_[len(g_) - 1]
print(prev_key)
final_track = list(all_tracks[playlist_dir])[-1]
if track_name.casefold() == final_track.casefold():
is_last = True
else:
is_last = False
if is_last:
if repeat:
nxt_load_url = f'/track/{playlist_dir}/{nxt_key}'
else:
nxt_load_url = '/track'
else:
nxt_load_url = f'/track/{playlist_dir}/{nxt_key}'
length = all_tracks[playlist_dir][track_name]['length'] + delayed
track_n = os.path.splitext(os.path.basename(track_name))[0]
meta_reload = f'''<meta http-equiv="refresh" content="{length};url={nxt_load_url}">'''
body = f'''<div style="height: 500px; background-size: 500px 500px; background-repeat: no-repeat; background-image: url({img_url_converted});">
<div1><div class="track">
<div class="svg"><div data-tooltip="previous"><a href="/track/{playlist_dir}/{prev_key}">{icons_dict['previous']}</a></div></div>
<div class="svg"><div data-tooltip="next"><a href="/track/{playlist_dir}/{nxt_key}">{icons_dict['next']}</a></div></div>
<div class="svg"><div data-tooltip="stop"><a href="/track">{icons_dict['stop']}</a></div></div>
<div class="svg"><div data-tooltip="download"><a href="{b32_}/static/{subpath}/{playlist_dir}/{track_name}" target="_blank">{icons_dict['download_large']}</a></div></div>
<div class="svg"><div data-tooltip="playlist: {playlist_dir}"><a href="{track_list_safe}" target="track_list">{icons_dict['playlist']}</a></div></div>
{remove_track_count(track_n)}</div></div></div><audio controls autoplay><source src="{b32_}/static/{subpath}/{playlist_dir}/{track_name}" type="audio/mpeg">Your browser does not support the audio element.</audio>'''
else:
body = f'''ERROR: Audio Not Found!'''
meta_reload = f'''<meta http-equiv="refresh" content="5;url=/track">'''
return render_template('track.html', body=body, title=track_name, preconnect=preconnect, meta_reload=meta_reload, theme_html=theme_html)
@app.route('/track', methods=["GET"])
def track_home():
img_url_safe = urllib.parse.quote(default_image, safe='/', encoding=None, errors=None)
img_url_converted = f'/static/{img_url_safe}'
body = f'''<img src="img_url_converted" class="center" width="150px" height="150px">'''
try:
full_track = session['current_track']
track_name = os.path.splitext(os.path.basename(full_track))[0]
playlist_dir = session['current_playlist']
if track_name != None and playlist_dir != None:
session['current_track'] = None
session['current_playlist'] = None
last_song = True
else:
last_song = False
except KeyError as e:
print(e)
last_song = False
except TypeError as e:
print(e)
last_song = False
try:
theme = session['theme']
except KeyError as e:
theme = default_theme
theme_html = get_themes(theme, True)
if last_song:
url_safe = urllib.parse.quote(f'/track/{playlist_dir}/{full_track}', safe='/', encoding=None, errors=None)
resume = f'''<div1><div class="track"><div class="svg"><div data-tooltip="Resume"><a href="{url_safe}">{icons_dict['play']}</a></div></div>Resume: {track_name}</div></div>'''
else:
resume = ''
body = f'''<div style="height: 500px; background-size: 500px 500px; background-repeat: no-repeat; background-image: url({img_url_converted});">{resume}</div>'''
track_name = 'Home'
return render_template('track.html', body=body, title=track_name, theme_html=theme_html)
@app.route('/info', methods=["GET"])
def info():
try:
theme = session['theme']
except KeyError as e:
theme = default_theme
theme_html = get_themes(theme, True)
title = 'info'
body = ''
return render_template('tracklist.html', body=body, title=title, theme_html=theme_html)
@app.route('/changelog', methods = ['GET'])
def changelog():
try:
theme = session['theme']
except KeyError as e:
theme = default_theme
theme_html = get_themes(theme, True)
lines_list = []
title = 'Changelog'
e = f'''<a href="/info"><div class="svg_exit">{icons_dict['exit']}</a></div>'''
if os.path.isfile(changelog_txt):
with open(changelog_txt, 'r') as file:
for line_number, line in enumerate(file, start=1):
rm_line_break = line.replace('\n', '')
div = f'<div>{rm_line_break}</div>'
lines_list.append(div)
pre = ['<div class="column"><pre contenteditable="true" spellcheck="false">', '</pre></div>']
body = f"{pre[0]}{''.join(lines_list)}{pre[1]}"
return render_template('md_file.html', body=body, title=title, exit=e, theme_html=theme_html)
@app.route('/about', methods = ['GET'])
def about():
try:
theme = session['theme']
except KeyError as e:
theme = default_theme
theme_html = get_themes(theme, True)
e = f'''<a href="/info"><div class="svg_exit">{icons_dict['exit']}</a></div>'''
file = os.path.join(script_directory, 'md_files/about.md')
title = 'About'
try:
with open(file, 'r') as f:
tempMd= f.read()
body = f'<div class="text_column">{markdown.markdown(tempMd)}</div>'
except (FileNotFoundError) as e:
body = 'ERROR - no content found for this!'
return render_template('md_file.html', body=body, title=title, exit=e, theme_html=theme_html)
@app.route('/tos', methods = ['GET'])
def tos():
try:
theme = session['theme']
except KeyError as e:
theme = default_theme
theme_html = get_themes(theme, True)
e = f'''<a href="/info"><div class="svg_exit">{icons_dict['exit']}</a></div>'''
file = os.path.join(script_directory, 'md_files/tos.md')
title = 'TOS'
try:
with open(file, 'r') as f:
tempMd= f.read()
body = f'<div class="text_column">{markdown.markdown(tempMd)}</div>'
except (FileNotFoundError) as e:
body = 'ERROR - no content found for this!'
return render_template('md_file.html', body=body, title=title, exit=e, theme_html=theme_html)
@app.route('/robots.txt', methods = ['GET'])
def robots():
return send_file('robots.txt', as_attachment=True)
@app.route("/ping", methods = ['GET'])
def ping():
return "PONG"
@app.route(f'/{endpoint}', methods=['POST'])
def refresh():
global all_tracks, playlist, t_dict
data = req_flask.json
# try:
api_key_path = os.path.join(script_directory, '.apikey')
if os.path.exists(api_key_path):
api_key = read_txt_file(os.path.join(script_directory, '.apikey'))[0]
key_ = sanitize_input(data.get('key'))
if len(key_) == 40:
if key_ == api_key:
cmd = sanitize_input(data.get('command'))
if cmd == 'refresh':
all_tracks, playlist, t_dict = load_track_data({}, {}, {})
if scraping_on:
for pl in t_dict:
t_dict[pl]['seeds'], t_dict[pl]['leeches'], t_dict[pl]['completed'] = 0, 0 ,0
t_dict = asyncio.run(scrape_trackers(t_dict))
return 'success'
elif cmd == 'update_playlists':
try:
msg = 'UPDATE'
update_api(msg, '.runupdate')
except Exception as e:
print(f'Error {e}')
# if not Th.playlist_thread:
# exit_event = threading.Event()
# thread_update_playlists = threading.Thread(target=update_playlists_thread, args=(exit_event, script_directory))
# thread_update_playlists.start()
# return 'running'
# else:
# return 'already running'
else:
return 'failed'
else:
abort(404)
else:
abort(404)
else:
print(f'No api key found')
abort(400)
@app.route('/<input_string>', methods = ['GET'])
def kill(input_string):
abort(404)
def tracker_scrape_thread(exit_event, get_peercounts):
global t_dict, b32_dict
sc = 0
purge_ts = 30
time_difference = timedelta(seconds=15*60)
while not exit_event.is_set():
if scraping_on:
if sc <= 0:
try:
t_dict = asyncio.run(scrape_trackers(t_dict))
# print(f'Threads: {threading.active_count()}')
time.sleep(scrape_interval)
except Exception as e:
print('Scraper error {e}')
sc = scrape_interval
dn = datetime.now()
for k, v in b32_dict.items():
temp_list = []
already_seen = []
for n in range(0, len(b32_dict[k]['timestamps'])):
if len(b32_dict[k]['timestamps']) != 0:
if (b32_dict[k]['timestamps'][n][1]) not in already_seen:
if (dn - b32_dict[k]['timestamps'][n][0]) > time_difference:
pass
else:
temp_list.append(b32_dict[k]['timestamps'][n])
already_seen.append((b32_dict[k]['timestamps'][n][1]))
b32_dict[k]['timestamps'] = temp_list
# for k, v in b32_dict.items():
# print(f"{b32_dict[k]['b32']}:{len(b32_dict[k]['timestamps'])}")
# print(b32_dict)
time.sleep(purge_ts)
if scraping_on:
sc -= purge_ts
if scraping_on:
from scrape import scrape_trackers
import asyncio
for pl in t_dict:
t_dict[pl]['seeds'], t_dict[pl]['leeches'], t_dict[pl]['completed'] = 0, 0 ,0
exit_event = threading.Event()
thread_torrent_scraper = threading.Thread(target=tracker_scrape_thread, args=(exit_event, get_peercounts))
thread_torrent_scraper.start()
if __name__ == '__main__':
app.run(host='127.0.0.1', port=site_port)
app.run(debug=True)