mirror of http://git.simp.i2p/simp/i2music.git
858 lines
30 KiB
Python
Executable File
858 lines
30 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
import os, re, urllib.request, markdown, bleach, json, random, threading, time, subprocess, sys
|
|
from flask import Flask, render_template, session, abort, send_file, request as req_flask
|
|
from datetime import datetime, timedelta
|
|
from configload import *
|
|
from updateplaylists import update_api, random_string, read_txt_file, remove_track_count
|
|
|
|
app = Flask(__name__)
|
|
app.config["SECRET_KEY"] = secret_key
|
|
|
|
class Th():
|
|
playlist_thread = False
|
|
|
|
def get_short_url(long_url:str)->str:
|
|
'''regex to get shortened url'''
|
|
p = '(?:http.*://)?(?P<host>[^:/ ]+).?(?P<port>[0-9]*).*'
|
|
m = re.search(p,long_url)
|
|
short = m.group('host')
|
|
port = m.group('port')
|
|
if port != '':
|
|
x = f':{port}'
|
|
else:
|
|
x = ''
|
|
url = f'{short}{x}'
|
|
return url
|
|
|
|
def get_extension(filename:str)->tuple:
|
|
'''given a string converts to filename and extension'''
|
|
split_ = filename.split('.')
|
|
ext_length = len(split_[len(split_)-1])+1
|
|
name_ = filename[:-ext_length]
|
|
ext_ = filename[-ext_length:]
|
|
return name_, ext_
|
|
|
|
def get_svg(absolute_location:str, filename:str)->str:
|
|
'''Convert svg file for inline html'''
|
|
try:
|
|
if filename == '':
|
|
filename = 'home'
|
|
with open(f'{absolute_location}/static/icons/{str(filename)}.svg', 'r') as file:
|
|
icon = file.read().replace('\n', '')
|
|
except Exception as e:
|
|
icon = ''
|
|
return icon
|
|
|
|
def sanitize_input(input_str:str)->str:
|
|
'''sanitize user input.'''
|
|
allowed_tags = ''
|
|
x1 = bleach.clean(input_str, tags=allowed_tags)
|
|
sanitized_str = re.sub(r'<script\b[^>]*>(.*?)</script>', '', x1, flags=re.IGNORECASE)
|
|
sanitized_ = sanitized_str.replace('&', '&')
|
|
return sanitized_
|
|
|
|
def get_footer()->str:
|
|
'''Generate footer. 2 options based on input being present or not.'''
|
|
footer_list = [
|
|
left_footer,
|
|
'<a href="/tos" target="info_pannel">TOS</a>',
|
|
'<a href="/changelog" target="info_pannel">Changelog</a>',
|
|
'<a href="/about" target="info_pannel">About</a>',
|
|
f'<a href="{ah}">AH</a>',
|
|
right_footer,
|
|
]
|
|
footer = ' | '.join(footer_list)
|
|
return footer
|
|
|
|
def get_player_icons()->dict:
|
|
'''Gets svg files for inline. Icons list are the names of svg files to read as icons.'''
|
|
icons_dict = {}
|
|
icons_list = [
|
|
'stop',
|
|
'pause',
|
|
'next',
|
|
'previous',
|
|
'back',
|
|
'backsmall',
|
|
'playlist',
|
|
'exit',
|
|
'delay',
|
|
'repeat',
|
|
'play',
|
|
'playsmall',
|
|
'theme',
|
|
'magnet',
|
|
'download',
|
|
'search',
|
|
'listeners',
|
|
'download_large',
|
|
]
|
|
for item in icons_list:
|
|
icons_dict[item] = get_svg(script_directory, item)
|
|
return icons_dict
|
|
|
|
def load_track_data(all_tracks:dict, playlist:dict, t_dict:dict)->tuple:
|
|
'''load the track data from data.json file'''
|
|
json_file = (os.path.join(script_directory, 'data.json'))
|
|
try:
|
|
with open(json_file) as json_file:
|
|
data = json.load(json_file)
|
|
all_tracks = data['all_tracks']
|
|
playlist = data['playlist']
|
|
t_dict = data['torrent']
|
|
except Exception as e:
|
|
print(f'Error: {e}')
|
|
return all_tracks, playlist, t_dict
|
|
return all_tracks, playlist, t_dict
|
|
|
|
def get_themes(theme:str, mini:bool)->str:
|
|
'''convert session theme str to the needed css file'''
|
|
if theme.casefold() == 'darkgreen':
|
|
index_theme = 'darkgreen.css'
|
|
mini_theme = 'darkgreen_mini.css'
|
|
elif theme.casefold() == 'pink':
|
|
index_theme = 'pink.css'
|
|
mini_theme = 'pink_mini.css'
|
|
elif theme.casefold() == 'blue':
|
|
index_theme = 'blue.css'
|
|
mini_theme = 'blue_mini.css'
|
|
elif theme.casefold() == 'red':
|
|
index_theme = 'red.css'
|
|
mini_theme = 'red_mini.css'
|
|
if mini:
|
|
theme_html = f'/static/{mini_theme}'
|
|
else:
|
|
theme_html = f'/static/{index_theme}'
|
|
return theme_html
|
|
|
|
def search_tracks(term:str)->list:
|
|
search_dict = {}
|
|
for item in all_tracks:
|
|
matching_keys = [key for key, value in all_tracks[item].items() if re.search(term, key, re.IGNORECASE)]
|
|
if len(matching_keys) > 0:
|
|
search_dict[item] = matching_keys
|
|
result_list = []
|
|
if len(search_dict) > 0:
|
|
for pl in search_dict:
|
|
# print(search_dict[pl])
|
|
for i in range(0, len(search_dict[pl])):
|
|
track_name = get_extension(search_dict[pl][i])[0]
|
|
# print(track_name)
|
|
track_href = urllib.parse.quote(f'/track/{pl}/{search_dict[pl][i]}', safe='/', encoding=None, errors=None)
|
|
playlist_href = urllib.parse.quote(f'/track_list/{pl}', safe='/', encoding=None, errors=None)
|
|
result_list.append(f'<tr><td style="font-size:80%"><a href="{track_href}" target="player">{track_name}</a></td><td style="font-size:80%"><a href="{playlist_href}" target="track_list">{pl}</a></td></tr>')
|
|
return result_list
|
|
|
|
def load_balance_b32s(b32_dict:dict)->int:
|
|
'''with dict of b32's chose the one with the least listeners, or random if multiple are the same'''
|
|
f = []
|
|
for k, v in b32_dict.items():
|
|
f.append(len(b32_dict[k]['timestamps']))
|
|
smallest = min(f)
|
|
occurs = f.count(smallest)
|
|
# print(occurs)
|
|
if occurs >= 1:
|
|
rand_list = []
|
|
for i in range(0,len(f)):
|
|
if f[i] == smallest:
|
|
rand_list.append(i)
|
|
chosen = random.choice(rand_list)
|
|
# print(rand_list)
|
|
else:
|
|
chosen = smallest
|
|
return chosen
|
|
|
|
def get_listeners(b32_dict:dict)->int:
|
|
'''get number of listeners'''
|
|
# print(b32_dict)
|
|
listeners = 0
|
|
seen_list = []
|
|
for k, v in b32_dict.items():
|
|
if len(b32_dict[k]['timestamps']) > 0:
|
|
for i in range(0, len(b32_dict[k]['timestamps'])):
|
|
if b32_dict[k]['timestamps'][i][1] not in seen_list:
|
|
seen_list.append(b32_dict[k]['timestamps'][i][1])
|
|
# print(seen_list)
|
|
listeners = len(seen_list)
|
|
return listeners
|
|
|
|
def get_listeners_single(b32_dict:dict)->int:
|
|
'''get number of listeners from single b32'''
|
|
listeners = 0
|
|
seen_list = []
|
|
if len(b32_dict['timestamps']) > 0:
|
|
for i in range (len(b32_dict['timestamps'])):
|
|
if b32_dict['timestamps'][i][1] not in seen_list:
|
|
seen_list.append(b32_dict['timestamps'][i][1])
|
|
listeners = len(seen_list)
|
|
return listeners
|
|
|
|
# def update_playlists_thread(exit_event, script_directory):
|
|
# while not exit_event.is_set():
|
|
# Th.playlist_thread = True
|
|
# try:
|
|
# # command = f'python3 {os.path.join(script_directory, '__main__.py')}'
|
|
# # subprocess.run(["python3", os.path.join(script_directory, '__main__.py')])
|
|
# # os.system(command)
|
|
# msg = 'UPDATE'
|
|
# update_api(msg, '.runupdate')
|
|
# except Exception as e:
|
|
# print(f'Error {e}')
|
|
# Th.playlist_thread = False
|
|
# exit_event.set()
|
|
# sys.exit()
|
|
|
|
icons_dict = get_player_icons()
|
|
themes_dict = {}
|
|
endpoint = random_string(20)
|
|
update_api(endpoint, '.endpoint')
|
|
|
|
for item in theme_list:
|
|
color = item.split(':')
|
|
themes_dict[color[0]] = f'''<div class="svg_themes"><div data-tooltip="{color[0]}"><span style="color: {color[1]};">{icons_dict['stop']}</span></div></div>'''
|
|
|
|
all_tracks, playlist, t_dict = load_track_data({}, {}, {})
|
|
footer:str = get_footer()
|
|
if get_peercounts and make_playlist_torrents:
|
|
scraping_on:bool = True
|
|
else:
|
|
scraping_on:bool = False
|
|
|
|
@app.route('/', methods=["GET"])
|
|
def home():
|
|
try:
|
|
theme = session['theme']
|
|
except KeyError as e:
|
|
theme = default_theme
|
|
theme_html = get_themes(theme, False)
|
|
body = f'''
|
|
<iframe name="track_list" src="/track_list" allowtransparency="true" scrolling="yes" width="300px" style="position: absolute; height: 100%; border: none; left: 0;"></iframe>
|
|
<div class="centered"><iframe name="player" src="/track" allowtransparency="true" scrolling="no" height="490px" width="500px" frameborder="0"></iframe></div>
|
|
<div class="upper_right"><iframe name="settings_pannel" src="/settings_init" allowtransparency="true" scrolling="yes" width="300px" style="position: absolute; height: 100%; border: none; right: 0;"></iframe></div>
|
|
<div class="lower_right"><iframe name="info_pannel" src="/info" allowtransparency="true" scrolling="yes" width="300px" style="position: absolute; height: 100%; border: none; right: 0;"></iframe></div>
|
|
'''
|
|
return render_template('index.html', body=body, title=site_title, footer=footer, theme_html=theme_html)
|
|
|
|
@app.route('/settings', methods=["GET"])
|
|
def settings():
|
|
# global script_directory
|
|
refresh_time = 60
|
|
title = 'Settings'
|
|
try:
|
|
delay = session['delay']
|
|
except KeyError as e:
|
|
delay = default_delay
|
|
try:
|
|
repeat = session['repeat']
|
|
except KeyError as e:
|
|
repeat = False
|
|
try:
|
|
theme = session['theme']
|
|
except KeyError as e:
|
|
theme = default_theme
|
|
theme_html = get_themes(theme, True)
|
|
toggle = 'on'
|
|
if repeat:
|
|
repeat_msg = 'on'
|
|
toggle = 'off'
|
|
else:
|
|
repeat_msg = 'off'
|
|
e = f'''<a href="/set_delay"><div class="svg_settings"><div data-tooltip="delay: {delay}s">{icons_dict['delay']}</div></a></div>
|
|
<a href="/set_repeat/{toggle}"><div class="svg_settings"><div data-tooltip="repeat: {repeat_msg}">{icons_dict['repeat']}</div></a></div>
|
|
<a href="/set_theme"><div class="svg_settings"><div data-tooltip="theme: {theme}">{icons_dict['theme']}</div></a></div>
|
|
<a href="/search"><div class="svg_settings"><div data-tooltip="search">{icons_dict['search']}</div></a></div>
|
|
<a href="/listeners" target="info_pannel"><div class="svg_settings"><div data-tooltip="listeners: {get_listeners(b32_dict)}">{icons_dict['listeners']}</div></a></div>
|
|
'''
|
|
body = e
|
|
meta_reload = f'''<meta http-equiv="refresh" content="{refresh_time};url=/settings">'''
|
|
return render_template('tracklist.html', body=body, title=title, theme_html=theme_html, meta=meta_reload)
|
|
|
|
@app.route('/listeners', methods=["GET"])
|
|
def settings_listeners():
|
|
try:
|
|
theme = session['theme']
|
|
except KeyError as e:
|
|
theme = default_theme
|
|
theme_html = get_themes(theme, True)
|
|
title = 'Settings: listeners'
|
|
stats_html = []
|
|
b32_table = []
|
|
for k, v in b32_dict.items():
|
|
dest = get_short_url(b32_dict[k]['b32'])[:4]
|
|
b32_table.append((f"<tr><td>{dest}</td><td>{get_listeners_single(b32_dict[k])}</td></tr>"))
|
|
stats_html.append(f'''<table><th>Dest</th><th>Listeners</th>{''.join(b32_table)}</table>''')
|
|
|
|
body = f'''<div class="column" style="margin:2.5% 2.5% 45% 2.5%;">{' '.join(stats_html)}</div>'''
|
|
e = f'''<a href="/info"><div class="svg_exit">{icons_dict['exit']}</a></div>'''
|
|
return render_template('md_file.html', body=body, title=title, exit=e, theme_html=theme_html)
|
|
|
|
@app.route('/search', methods=["GET", "POST"])
|
|
def search():
|
|
try:
|
|
theme = session['theme']
|
|
except KeyError as e:
|
|
theme = default_theme
|
|
theme_html = get_themes(theme, True)
|
|
title = 'Search'
|
|
if req_flask.method == "POST":
|
|
search_html = []
|
|
term = sanitize_input((req_flask.form.get("term")))
|
|
if len(term) <= 32:
|
|
results = search_tracks(term)
|
|
if len(results) > 0:
|
|
results_table = f'''<table><th>Track ({len(results)})</th><th>Playlist</th>{''.join(results)}</table>'''
|
|
search_html.append(results_table)
|
|
else:
|
|
search_html.append('No results.')
|
|
else:
|
|
search_html.append('Query too long.')
|
|
body = f'''<div class="column" style="margin:2.5% 2.5% 45% 2.5%;">{' '.join(search_html)}</div>'''
|
|
e = f'''<a href="/info"><div class="svg_exit">{icons_dict['exit']}</a></div>'''
|
|
return render_template('md_file.html', body=body, title=title, exit=e, theme_html=theme_html)
|
|
else:
|
|
search_html = []
|
|
search_html.append(f'''<form action="/search" target="info_pannel" method="post"><input type="text" name="term" placeholder="Search..">
|
|
<button type="submit" class="button">Submit</button></form>''')
|
|
search_html.append(f'''<a href="/settings"><div class="svg_exit"><div data-tooltip="back">{icons_dict['back']}</div></a></div>''')
|
|
body = ' '.join(search_html)
|
|
return render_template('tracklist.html', body=body, title=title, theme_html=theme_html)
|
|
|
|
@app.route('/set_theme', methods=["GET"])
|
|
def settings_theme_main():
|
|
try:
|
|
theme = session['theme']
|
|
except KeyError as e:
|
|
theme = default_theme
|
|
theme_html = get_themes(theme, True)
|
|
options = themes_dict
|
|
options_html = []
|
|
for item in options:
|
|
# print(item)
|
|
options_html.append(f'<a href="/set_theme/{item}">{options[item]}</a>')
|
|
title = 'Settings: theme'
|
|
options_html.append(f'''<a href="/settings"><div class="svg_exit"><div data-tooltip="back">{icons_dict['back']}</div></a></div>''')
|
|
body = ' '.join(options_html)
|
|
return render_template('tracklist.html', body=body, title=title, theme_html=theme_html)
|
|
|
|
@app.route('/set_theme/<input_string>', methods=["GET"])
|
|
def settings_theme_main_option(input_string):
|
|
option_val = sanitize_input(input_string)
|
|
options_html = []
|
|
try:
|
|
theme = session['theme']
|
|
except KeyError as e:
|
|
theme = default_theme
|
|
theme_html = get_themes(theme, True)
|
|
if len(option_val) < 24:
|
|
try:
|
|
if option_val in themes_dict:
|
|
failed = False
|
|
else:
|
|
failed = True
|
|
except Exception as e:
|
|
failed = True
|
|
else:
|
|
failed = True
|
|
meta_reload = ''
|
|
try:
|
|
if not failed:
|
|
session['theme'] = option_val
|
|
options_html.append(f'Theme: {option_val}')
|
|
else:
|
|
options_html.append(f'Invalid value.')
|
|
except KeyError as e:
|
|
options_html.append('Cookies needed to work.')
|
|
meta_reload = f'''<meta http-equiv="refresh" content="3;url=/settings">'''
|
|
title = 'Settings: theme'
|
|
changeit_now = [
|
|
f'''<a href="/settings"><div class="svg_exit"><div data-tooltip="back">{icons_dict['back']}</div></a></div>''',
|
|
'<p><a href="/" target="_parent">[Refresh now]</a></p>',
|
|
]
|
|
for item in changeit_now:
|
|
options_html.append(item)
|
|
body = ' '.join(options_html)
|
|
return render_template('tracklist.html', body=body, title=title, meta=meta_reload, theme_html=theme_html)
|
|
|
|
@app.route('/settings_init', methods=["GET"])
|
|
def settings_init():
|
|
try:
|
|
theme = session['theme']
|
|
except KeyError as e:
|
|
theme = default_theme
|
|
theme_html = get_themes(theme, True)
|
|
title = 'Loading'
|
|
wait_time = '2'
|
|
body = f'''<div style="position: fixed; top: 0; right: 0;"><div class="loader"></div></div>'''
|
|
meta_reload = f'''<meta http-equiv="refresh" content="{wait_time};url=/settings">'''
|
|
return render_template('tracklist.html', body=body, title=title, meta=meta_reload, theme_html=theme_html)
|
|
|
|
@app.route('/set_repeat/<input_string>', methods=["GET"])
|
|
def settings_repeat_main_option(input_string):
|
|
option_val = sanitize_input(input_string)
|
|
options_html = []
|
|
try:
|
|
theme = session['theme']
|
|
except KeyError as e:
|
|
theme = default_theme
|
|
theme_html = get_themes(theme, True)
|
|
if len(option_val) <= 3:
|
|
try:
|
|
if option_val.casefold() == 'on'.casefold():
|
|
converted = True
|
|
elif option_val.casefold() == 'off'.casefold():
|
|
converted = False
|
|
else:
|
|
failed = True
|
|
failed = False
|
|
except Exception as e:
|
|
failed = True
|
|
else:
|
|
failed = True
|
|
try:
|
|
if not failed:
|
|
session['repeat'] = converted
|
|
options_html.append(f'Repeat {option_val}.')
|
|
else:
|
|
options_html.append(f'Invalid value.')
|
|
except KeyError as e:
|
|
options_html.append('Cookies needed to work.')
|
|
meta_reload = f'''<meta http-equiv="refresh" content="3;url=/settings">'''
|
|
title = 'Settings: repeat'
|
|
options_html.append(f'''<a href="/settings"><div class="svg_exit"><div data-tooltip="back">{icons_dict['back']}</div></a></div>''')
|
|
body = ' '.join(options_html)
|
|
return render_template('tracklist.html', body=body, title=title, meta=meta_reload, theme_html=theme_html)
|
|
|
|
@app.route('/set_delay', methods=["GET"])
|
|
def settings_delay_main():
|
|
try:
|
|
theme = session['theme']
|
|
except KeyError as e:
|
|
theme = default_theme
|
|
theme_html = get_themes(theme, True)
|
|
options = ['1', '3', '5', '10']
|
|
options_html = []
|
|
options_html.append('Options (seconds):')
|
|
for item in options:
|
|
options_html.append(f'<a href="/set_delay/{item}">[{item}]</a>')
|
|
title = 'Settings: delay'
|
|
options_html.append(f'''<a href="/settings"><div class="svg_exit"><div data-tooltip="back">{icons_dict['back']}</div></a></div>''')
|
|
body = ' '.join(options_html)
|
|
return render_template('tracklist.html', body=body, title=title, theme_html=theme_html)
|
|
|
|
@app.route('/set_delay/<input_string>', methods=["GET"])
|
|
def settings_delay_main_option(input_string):
|
|
try:
|
|
theme = session['theme']
|
|
except KeyError as e:
|
|
theme = default_theme
|
|
theme_html = get_themes(theme, True)
|
|
option_val = sanitize_input(input_string)
|
|
options_html = []
|
|
if len(option_val) <= 2:
|
|
try:
|
|
converted = int(option_val)
|
|
failed = False
|
|
except Exception as e:
|
|
failed = True
|
|
else:
|
|
failed = True
|
|
try:
|
|
if not failed:
|
|
session['delay'] = converted
|
|
options_html.append(f'Updated to {option_val}.')
|
|
else:
|
|
options_html.append(f'Invalid value.')
|
|
except KeyError as e:
|
|
options_html.append('Cookies needed to work.')
|
|
meta_reload = f'''<meta http-equiv="refresh" content="3;url=/settings">'''
|
|
title = 'Settings: delay'
|
|
options_html.append(f'''<a href="/settings"><div class="svg_exit"><div data-tooltip="back">{icons_dict['back']}</div></a></div>''')
|
|
body = ' '.join(options_html)
|
|
return render_template('tracklist.html', body=body, title=title, meta=meta_reload, theme_html=theme_html)
|
|
|
|
@app.route('/track_list', methods=["GET"])
|
|
def playlist_home():
|
|
try:
|
|
theme = session['theme']
|
|
except KeyError as e:
|
|
theme = default_theme
|
|
theme_html = get_themes(theme, True)
|
|
title = 'Home'
|
|
playlists_list = []
|
|
playlists_list.append('<div class="column" style="margin:2.5% 2.5% 45% 2.5%;"><table><th>Playlists</th>')
|
|
# print(playlist)
|
|
# <div class="svg"><div data-tooltip="stop"><a href="/track">{icons_dict['stop']}</a></div>
|
|
for k, v in playlist.items():
|
|
start_ = next(iter(all_tracks[k]))
|
|
playlists_list.append(f'''<tr><td><div class="svg" style="width: 22px;"><div data-tooltip="play"><a href="/track/{k}/{start_}" target="player">{icons_dict['playsmall']}</a></div></div><a href="/track_list/{k}" style="line-height: 20px">{k} ({len(v)})</a></td></tr>''')
|
|
playlists_list.append('</table></div>')
|
|
body = ''.join(playlists_list)
|
|
return render_template('tracklist.html', body=body, title=title, theme_html=theme_html)
|
|
|
|
@app.route('/track_list/<input_string>', methods=["GET"])
|
|
def track_list(input_string):
|
|
try:
|
|
theme = session['theme']
|
|
except KeyError as e:
|
|
theme = default_theme
|
|
theme_html = get_themes(theme, True)
|
|
pl = sanitize_input(input_string)
|
|
title = pl
|
|
if len(t_dict) > 0:
|
|
t_dict[pl]['infohash']
|
|
if scraping_on:
|
|
seeds = t_dict[pl]['seeds']
|
|
leeches = t_dict[pl]['leeches']
|
|
c_ = t_dict[pl]['completed']
|
|
if c_ != 0:
|
|
completed = f' ({c_})'
|
|
tt = ' (completed)'
|
|
else:
|
|
completed = ''
|
|
tt = ''
|
|
peers = ''
|
|
peers = f'''<td><div data-tooltip="seeds/leeches{tt}">{seeds}/{leeches}{completed}</div></td>'''
|
|
else:
|
|
peers = ''
|
|
# t_row = ''
|
|
t_row = f'''
|
|
</table><table><tr><td>
|
|
<a href="{t_dict[pl]['magnet']}"><div class="svg"><div data-tooltip="magnet link">{icons_dict['magnet']}</a></div></div>
|
|
<a href="/static/{subpath}/{pl}.torrent"><div class="svg"><div data-tooltip="download {pl}.torrent">{icons_dict['download']}</a></div></div>
|
|
</td>
|
|
{peers}
|
|
<td>{t_dict[pl]['total_size']}{t_dict[pl]['unit']}</td></tr>
|
|
'''
|
|
else:
|
|
t_row = ''
|
|
body = f'''<div class="column" style="margin:2.5% 2.5% 45% 2.5%;"><table><th style="line-height: 20px">{pl} <a href="/track_list"><div class="svg"><div data-tooltip="back">{icons_dict['backsmall']}</a></div></div></th>{(''.join(playlist[pl]))}{t_row}</table></div>'''
|
|
return render_template('tracklist.html', body=body, title=title, theme_html=theme_html)
|
|
|
|
@app.route('/track/<input_string1>/<input_string2>', methods=["GET"])
|
|
def track(input_string1, input_string2):
|
|
global b32_dict
|
|
try:
|
|
b32_n = session['b32_n']
|
|
|
|
ident = session['ident']
|
|
try:
|
|
b32_ = b32s_[b32_n]
|
|
except IndexError as e:
|
|
if len(b32_dict[0]['b32']) > 4 :
|
|
b32_n = load_balance_b32s(b32_dict)
|
|
b32_ = b32s_[b32_n]
|
|
session['b32_n'] = b32_n
|
|
preconnect = f'''<link rel="preconnect" href="{b32_}" />'''
|
|
else:
|
|
b32_n = 0
|
|
b32_ = ''
|
|
b32_dict[b32_n]['timestamps'].append([datetime.now(), ident])
|
|
|
|
|
|
except KeyError as e:
|
|
ident = random_string(10)
|
|
session['ident'] = ident
|
|
print(len(b32_dict[0]['b32']))
|
|
if len(b32_dict[0]['b32']) > 4 :
|
|
b32_n = load_balance_b32s(b32_dict)
|
|
print(b32_n)
|
|
print(b32s_)
|
|
b32_ = b32s_[b32_n]
|
|
else:
|
|
b32_n = 0
|
|
b32_ = ''
|
|
b32_dict[b32_n]['timestamps'].append([datetime.now(), ident])
|
|
session['b32_n'] = b32_n
|
|
if b32_ != '':
|
|
preconnect = f'''<link rel="preconnect" href="{b32_}" />'''
|
|
else:
|
|
preconnect = ''
|
|
playlist_dir = sanitize_input(input_string1)
|
|
track_name = sanitize_input(input_string2)
|
|
track_path = os.path.join(script_directory, f'static/{subpath}/{playlist_dir}', track_name)
|
|
exists_ = os.path.exists(track_path)
|
|
try:
|
|
session['current_track'] = track_name
|
|
session['current_playlist'] = playlist_dir
|
|
except Exception as e:
|
|
print(e)
|
|
try:
|
|
delayed = session['delay']
|
|
except KeyError as e:
|
|
delayed = default_delay
|
|
try:
|
|
repeat = session['repeat']
|
|
except KeyError as e:
|
|
repeat = False
|
|
try:
|
|
theme = session['theme']
|
|
except KeyError as e:
|
|
theme = default_theme
|
|
theme_html = get_themes(theme, True)
|
|
try:
|
|
img = all_tracks[playlist_dir][track_name]['image']
|
|
except KeyError as e:
|
|
img = None
|
|
if img != None:
|
|
img_url_ = f'/static/{subpath}/{playlist_dir}/{img}'
|
|
img_url_converted = urllib.parse.quote(img_url_, safe='/', encoding=None, errors=None)
|
|
else:
|
|
img_url_safe = urllib.parse.quote(default_image, safe='/', encoding=None, errors=None)
|
|
img_url_converted = f'/static/{subpath}/{img_url_safe}'
|
|
track_list_safe = urllib.parse.quote(f'/track_list/{playlist_dir}', safe='/', encoding=None, errors=None)
|
|
if exists_:
|
|
keys_iter = iter(all_tracks[playlist_dir])
|
|
nxt_key = None
|
|
last_key_ = None
|
|
for key in keys_iter:
|
|
if key == track_name:
|
|
nxt_key = next(keys_iter, None)
|
|
prev_key = last_key_
|
|
break
|
|
else:
|
|
last_key_ = key
|
|
|
|
if nxt_key == None:
|
|
nxt_key = list(all_tracks[playlist_dir])[0]
|
|
|
|
if prev_key == None:
|
|
g_ = list(all_tracks[playlist_dir])
|
|
prev_key = g_[len(g_) - 1]
|
|
print(prev_key)
|
|
final_track = list(all_tracks[playlist_dir])[-1]
|
|
if track_name.casefold() == final_track.casefold():
|
|
is_last = True
|
|
else:
|
|
is_last = False
|
|
if is_last:
|
|
if repeat:
|
|
nxt_load_url = f'/track/{playlist_dir}/{nxt_key}'
|
|
else:
|
|
nxt_load_url = '/track'
|
|
else:
|
|
nxt_load_url = f'/track/{playlist_dir}/{nxt_key}'
|
|
length = all_tracks[playlist_dir][track_name]['length'] + delayed
|
|
track_n = os.path.splitext(os.path.basename(track_name))[0]
|
|
meta_reload = f'''<meta http-equiv="refresh" content="{length};url={nxt_load_url}">'''
|
|
body = f'''<div style="height: 500px; background-size: 500px 500px; background-repeat: no-repeat; background-image: url({img_url_converted});">
|
|
<div1><div class="track">
|
|
<div class="svg"><div data-tooltip="previous"><a href="/track/{playlist_dir}/{prev_key}">{icons_dict['previous']}</a></div></div>
|
|
<div class="svg"><div data-tooltip="next"><a href="/track/{playlist_dir}/{nxt_key}">{icons_dict['next']}</a></div></div>
|
|
<div class="svg"><div data-tooltip="stop"><a href="/track">{icons_dict['stop']}</a></div></div>
|
|
<div class="svg"><div data-tooltip="download"><a href="{b32_}/static/{subpath}/{playlist_dir}/{track_name}" target="_blank">{icons_dict['download_large']}</a></div></div>
|
|
<div class="svg"><div data-tooltip="playlist: {playlist_dir}"><a href="{track_list_safe}" target="track_list">{icons_dict['playlist']}</a></div></div>
|
|
{remove_track_count(track_n)}</div></div></div><audio controls autoplay><source src="{b32_}/static/{subpath}/{playlist_dir}/{track_name}" type="audio/mpeg">Your browser does not support the audio element.</audio>'''
|
|
else:
|
|
body = f'''ERROR: Audio Not Found!'''
|
|
meta_reload = f'''<meta http-equiv="refresh" content="5;url=/track">'''
|
|
return render_template('track.html', body=body, title=track_name, preconnect=preconnect, meta_reload=meta_reload, theme_html=theme_html)
|
|
|
|
@app.route('/track', methods=["GET"])
|
|
def track_home():
|
|
img_url_safe = urllib.parse.quote(default_image, safe='/', encoding=None, errors=None)
|
|
img_url_converted = f'/static/{img_url_safe}'
|
|
body = f'''<img src="img_url_converted" class="center" width="150px" height="150px">'''
|
|
try:
|
|
full_track = session['current_track']
|
|
track_name = os.path.splitext(os.path.basename(full_track))[0]
|
|
playlist_dir = session['current_playlist']
|
|
if track_name != None and playlist_dir != None:
|
|
session['current_track'] = None
|
|
session['current_playlist'] = None
|
|
last_song = True
|
|
else:
|
|
last_song = False
|
|
except KeyError as e:
|
|
print(e)
|
|
last_song = False
|
|
except TypeError as e:
|
|
print(e)
|
|
last_song = False
|
|
try:
|
|
theme = session['theme']
|
|
except KeyError as e:
|
|
theme = default_theme
|
|
theme_html = get_themes(theme, True)
|
|
|
|
if last_song:
|
|
url_safe = urllib.parse.quote(f'/track/{playlist_dir}/{full_track}', safe='/', encoding=None, errors=None)
|
|
resume = f'''<div1><div class="track"><div class="svg"><div data-tooltip="Resume"><a href="{url_safe}">{icons_dict['play']}</a></div></div>Resume: {track_name}</div></div>'''
|
|
else:
|
|
resume = ''
|
|
body = f'''<div style="height: 500px; background-size: 500px 500px; background-repeat: no-repeat; background-image: url({img_url_converted});">{resume}</div>'''
|
|
track_name = 'Home'
|
|
return render_template('track.html', body=body, title=track_name, theme_html=theme_html)
|
|
|
|
@app.route('/info', methods=["GET"])
|
|
def info():
|
|
try:
|
|
theme = session['theme']
|
|
except KeyError as e:
|
|
theme = default_theme
|
|
theme_html = get_themes(theme, True)
|
|
title = 'info'
|
|
body = ''
|
|
return render_template('tracklist.html', body=body, title=title, theme_html=theme_html)
|
|
|
|
@app.route('/changelog', methods = ['GET'])
|
|
def changelog():
|
|
try:
|
|
theme = session['theme']
|
|
except KeyError as e:
|
|
theme = default_theme
|
|
theme_html = get_themes(theme, True)
|
|
lines_list = []
|
|
title = 'Changelog'
|
|
e = f'''<a href="/info"><div class="svg_exit">{icons_dict['exit']}</a></div>'''
|
|
if os.path.isfile(changelog_txt):
|
|
with open(changelog_txt, 'r') as file:
|
|
for line_number, line in enumerate(file, start=1):
|
|
rm_line_break = line.replace('\n', '')
|
|
div = f'<div>{rm_line_break}</div>'
|
|
lines_list.append(div)
|
|
pre = ['<div class="column"><pre contenteditable="true" spellcheck="false">', '</pre></div>']
|
|
body = f"{pre[0]}{''.join(lines_list)}{pre[1]}"
|
|
|
|
return render_template('md_file.html', body=body, title=title, exit=e, theme_html=theme_html)
|
|
|
|
@app.route('/about', methods = ['GET'])
|
|
def about():
|
|
try:
|
|
theme = session['theme']
|
|
except KeyError as e:
|
|
theme = default_theme
|
|
theme_html = get_themes(theme, True)
|
|
e = f'''<a href="/info"><div class="svg_exit">{icons_dict['exit']}</a></div>'''
|
|
file = os.path.join(script_directory, 'md_files/about.md')
|
|
title = 'About'
|
|
try:
|
|
with open(file, 'r') as f:
|
|
tempMd= f.read()
|
|
body = f'<div class="text_column">{markdown.markdown(tempMd)}</div>'
|
|
except (FileNotFoundError) as e:
|
|
body = 'ERROR - no content found for this!'
|
|
return render_template('md_file.html', body=body, title=title, exit=e, theme_html=theme_html)
|
|
|
|
@app.route('/tos', methods = ['GET'])
|
|
def tos():
|
|
try:
|
|
theme = session['theme']
|
|
except KeyError as e:
|
|
theme = default_theme
|
|
theme_html = get_themes(theme, True)
|
|
e = f'''<a href="/info"><div class="svg_exit">{icons_dict['exit']}</a></div>'''
|
|
file = os.path.join(script_directory, 'md_files/tos.md')
|
|
title = 'TOS'
|
|
try:
|
|
with open(file, 'r') as f:
|
|
tempMd= f.read()
|
|
body = f'<div class="text_column">{markdown.markdown(tempMd)}</div>'
|
|
except (FileNotFoundError) as e:
|
|
body = 'ERROR - no content found for this!'
|
|
return render_template('md_file.html', body=body, title=title, exit=e, theme_html=theme_html)
|
|
|
|
@app.route('/robots.txt', methods = ['GET'])
|
|
def robots():
|
|
return send_file('robots.txt', as_attachment=True)
|
|
|
|
@app.route("/ping", methods = ['GET'])
|
|
def ping():
|
|
return "PONG"
|
|
|
|
@app.route(f'/{endpoint}', methods=['POST'])
|
|
def refresh():
|
|
global all_tracks, playlist, t_dict
|
|
data = req_flask.json
|
|
# try:
|
|
api_key_path = os.path.join(script_directory, '.apikey')
|
|
if os.path.exists(api_key_path):
|
|
api_key = read_txt_file(os.path.join(script_directory, '.apikey'))[0]
|
|
key_ = sanitize_input(data.get('key'))
|
|
if len(key_) == 40:
|
|
if key_ == api_key:
|
|
cmd = sanitize_input(data.get('command'))
|
|
if cmd == 'refresh':
|
|
all_tracks, playlist, t_dict = load_track_data({}, {}, {})
|
|
if scraping_on:
|
|
for pl in t_dict:
|
|
t_dict[pl]['seeds'], t_dict[pl]['leeches'], t_dict[pl]['completed'] = 0, 0 ,0
|
|
t_dict = asyncio.run(scrape_trackers(t_dict))
|
|
return 'success'
|
|
elif cmd == 'update_playlists':
|
|
try:
|
|
msg = 'UPDATE'
|
|
update_api(msg, '.runupdate')
|
|
except Exception as e:
|
|
print(f'Error {e}')
|
|
# if not Th.playlist_thread:
|
|
# exit_event = threading.Event()
|
|
# thread_update_playlists = threading.Thread(target=update_playlists_thread, args=(exit_event, script_directory))
|
|
# thread_update_playlists.start()
|
|
# return 'running'
|
|
# else:
|
|
# return 'already running'
|
|
else:
|
|
return 'failed'
|
|
else:
|
|
abort(404)
|
|
else:
|
|
abort(404)
|
|
else:
|
|
print(f'No api key found')
|
|
abort(400)
|
|
|
|
@app.route('/<input_string>', methods = ['GET'])
|
|
def kill(input_string):
|
|
abort(404)
|
|
|
|
def tracker_scrape_thread(exit_event, get_peercounts):
|
|
global t_dict, b32_dict
|
|
sc = 0
|
|
purge_ts = 30
|
|
time_difference = timedelta(seconds=15*60)
|
|
while not exit_event.is_set():
|
|
if scraping_on:
|
|
if sc <= 0:
|
|
try:
|
|
t_dict = asyncio.run(scrape_trackers(t_dict))
|
|
# print(f'Threads: {threading.active_count()}')
|
|
time.sleep(scrape_interval)
|
|
except Exception as e:
|
|
print('Scraper error {e}')
|
|
sc = scrape_interval
|
|
|
|
dn = datetime.now()
|
|
for k, v in b32_dict.items():
|
|
temp_list = []
|
|
already_seen = []
|
|
for n in range(0, len(b32_dict[k]['timestamps'])):
|
|
if len(b32_dict[k]['timestamps']) != 0:
|
|
if (b32_dict[k]['timestamps'][n][1]) not in already_seen:
|
|
if (dn - b32_dict[k]['timestamps'][n][0]) > time_difference:
|
|
pass
|
|
else:
|
|
temp_list.append(b32_dict[k]['timestamps'][n])
|
|
already_seen.append((b32_dict[k]['timestamps'][n][1]))
|
|
b32_dict[k]['timestamps'] = temp_list
|
|
# for k, v in b32_dict.items():
|
|
# print(f"{b32_dict[k]['b32']}:{len(b32_dict[k]['timestamps'])}")
|
|
# print(b32_dict)
|
|
time.sleep(purge_ts)
|
|
if scraping_on:
|
|
sc -= purge_ts
|
|
|
|
if scraping_on:
|
|
from scrape import scrape_trackers
|
|
import asyncio
|
|
for pl in t_dict:
|
|
t_dict[pl]['seeds'], t_dict[pl]['leeches'], t_dict[pl]['completed'] = 0, 0 ,0
|
|
exit_event = threading.Event()
|
|
thread_torrent_scraper = threading.Thread(target=tracker_scrape_thread, args=(exit_event, get_peercounts))
|
|
thread_torrent_scraper.start()
|
|
|
|
if __name__ == '__main__':
|
|
app.run(host='127.0.0.1', port=site_port)
|
|
app.run(debug=True)
|
|
|
|
|