#!/usr/bin/env python3 import os, re, urllib.request, markdown, bleach, json, random, threading, time, subprocess, sys from flask import Flask, render_template, session, abort, send_file, request as req_flask from datetime import datetime, timedelta from configload import * from updateplaylists import update_api, random_string, read_txt_file, remove_track_count app = Flask(__name__) app.config["SECRET_KEY"] = secret_key class Th(): playlist_thread = False def get_short_url(long_url:str)->str: '''regex to get shortened url''' p = '(?:http.*://)?(?P[^:/ ]+).?(?P[0-9]*).*' m = re.search(p,long_url) short = m.group('host') port = m.group('port') if port != '': x = f':{port}' else: x = '' url = f'{short}{x}' return url def get_extension(filename:str)->tuple: '''given a string converts to filename and extension''' split_ = filename.split('.') ext_length = len(split_[len(split_)-1])+1 name_ = filename[:-ext_length] ext_ = filename[-ext_length:] return name_, ext_ def get_svg(absolute_location:str, filename:str)->str: '''Convert svg file for inline html''' try: if filename == '': filename = 'home' with open(f'{absolute_location}/static/icons/{str(filename)}.svg', 'r') as file: icon = file.read().replace('\n', '') except Exception as e: icon = '' return icon def sanitize_input(input_str:str)->str: '''sanitize user input.''' allowed_tags = '' x1 = bleach.clean(input_str, tags=allowed_tags) sanitized_str = re.sub(r']*>(.*?)', '', x1, flags=re.IGNORECASE) sanitized_ = sanitized_str.replace('&', '&') return sanitized_ def get_footer()->str: '''Generate footer. 2 options based on input being present or not.''' footer_list = [ left_footer, 'TOS', 'Changelog', 'About', f'AH', right_footer, ] footer = ' | '.join(footer_list) return footer def get_player_icons()->dict: '''Gets svg files for inline. Icons list are the names of svg files to read as icons.''' icons_dict = {} icons_list = [ 'stop', 'pause', 'next', 'previous', 'back', 'backsmall', 'playlist', 'exit', 'delay', 'repeat', 'play', 'playsmall', 'theme', 'magnet', 'download', 'search', 'listeners', 'download_large', ] for item in icons_list: icons_dict[item] = get_svg(script_directory, item) return icons_dict def load_track_data(all_tracks:dict, playlist:dict, t_dict:dict)->tuple: '''load the track data from data.json file''' json_file = (os.path.join(script_directory, 'data.json')) try: with open(json_file) as json_file: data = json.load(json_file) all_tracks = data['all_tracks'] playlist = data['playlist'] t_dict = data['torrent'] except Exception as e: print(f'Error: {e}') return all_tracks, playlist, t_dict return all_tracks, playlist, t_dict def get_themes(theme:str, mini:bool)->str: '''convert session theme str to the needed css file''' if theme.casefold() == 'darkgreen': index_theme = 'darkgreen.css' mini_theme = 'darkgreen_mini.css' elif theme.casefold() == 'pink': index_theme = 'pink.css' mini_theme = 'pink_mini.css' elif theme.casefold() == 'blue': index_theme = 'blue.css' mini_theme = 'blue_mini.css' elif theme.casefold() == 'red': index_theme = 'red.css' mini_theme = 'red_mini.css' if mini: theme_html = f'/static/{mini_theme}' else: theme_html = f'/static/{index_theme}' return theme_html def search_tracks(term:str)->list: search_dict = {} for item in all_tracks: matching_keys = [key for key, value in all_tracks[item].items() if re.search(term, key, re.IGNORECASE)] if len(matching_keys) > 0: search_dict[item] = matching_keys result_list = [] if len(search_dict) > 0: for pl in search_dict: # print(search_dict[pl]) for i in range(0, len(search_dict[pl])): track_name = get_extension(search_dict[pl][i])[0] # print(track_name) track_href = urllib.parse.quote(f'/track/{pl}/{search_dict[pl][i]}', safe='/', encoding=None, errors=None) playlist_href = urllib.parse.quote(f'/track_list/{pl}', safe='/', encoding=None, errors=None) result_list.append(f'{track_name}{pl}') return result_list def load_balance_b32s(b32_dict:dict)->int: '''with dict of b32's chose the one with the least listeners, or random if multiple are the same''' f = [] for k, v in b32_dict.items(): f.append(len(b32_dict[k]['timestamps'])) smallest = min(f) occurs = f.count(smallest) # print(occurs) if occurs >= 1: rand_list = [] for i in range(0,len(f)): if f[i] == smallest: rand_list.append(i) chosen = random.choice(rand_list) # print(rand_list) else: chosen = smallest return chosen def get_listeners(b32_dict:dict)->int: '''get number of listeners''' # print(b32_dict) listeners = 0 seen_list = [] for k, v in b32_dict.items(): if len(b32_dict[k]['timestamps']) > 0: for i in range(0, len(b32_dict[k]['timestamps'])): if b32_dict[k]['timestamps'][i][1] not in seen_list: seen_list.append(b32_dict[k]['timestamps'][i][1]) # print(seen_list) listeners = len(seen_list) return listeners def get_listeners_single(b32_dict:dict)->int: '''get number of listeners from single b32''' listeners = 0 seen_list = [] if len(b32_dict['timestamps']) > 0: for i in range (len(b32_dict['timestamps'])): if b32_dict['timestamps'][i][1] not in seen_list: seen_list.append(b32_dict['timestamps'][i][1]) listeners = len(seen_list) return listeners # def update_playlists_thread(exit_event, script_directory): # while not exit_event.is_set(): # Th.playlist_thread = True # try: # # command = f'python3 {os.path.join(script_directory, '__main__.py')}' # # subprocess.run(["python3", os.path.join(script_directory, '__main__.py')]) # # os.system(command) # msg = 'UPDATE' # update_api(msg, '.runupdate') # except Exception as e: # print(f'Error {e}') # Th.playlist_thread = False # exit_event.set() # sys.exit() icons_dict = get_player_icons() themes_dict = {} endpoint = random_string(20) update_api(endpoint, '.endpoint') for item in theme_list: color = item.split(':') themes_dict[color[0]] = f'''
{icons_dict['stop']}
''' all_tracks, playlist, t_dict = load_track_data({}, {}, {}) footer:str = get_footer() if get_peercounts and make_playlist_torrents: scraping_on:bool = True else: scraping_on:bool = False @app.route('/', methods=["GET"]) def home(): try: theme = session['theme'] except KeyError as e: theme = default_theme theme_html = get_themes(theme, False) body = f'''
''' return render_template('index.html', body=body, title=site_title, footer=footer, theme_html=theme_html) @app.route('/settings', methods=["GET"]) def settings(): # global script_directory refresh_time = 60 title = 'Settings' try: delay = session['delay'] except KeyError as e: delay = default_delay try: repeat = session['repeat'] except KeyError as e: repeat = False try: theme = session['theme'] except KeyError as e: theme = default_theme theme_html = get_themes(theme, True) toggle = 'on' if repeat: repeat_msg = 'on' toggle = 'off' else: repeat_msg = 'off' e = f''' ''' body = e meta_reload = f'''''' return render_template('tracklist.html', body=body, title=title, theme_html=theme_html, meta=meta_reload) @app.route('/listeners', methods=["GET"]) def settings_listeners(): try: theme = session['theme'] except KeyError as e: theme = default_theme theme_html = get_themes(theme, True) title = 'Settings: listeners' stats_html = [] b32_table = [] for k, v in b32_dict.items(): dest = get_short_url(b32_dict[k]['b32'])[:4] b32_table.append((f"{dest}{get_listeners_single(b32_dict[k])}")) stats_html.append(f'''{''.join(b32_table)}
DestListeners
''') body = f'''
{' '.join(stats_html)}
''' e = f'''''' return render_template('md_file.html', body=body, title=title, exit=e, theme_html=theme_html) @app.route('/search', methods=["GET", "POST"]) def search(): try: theme = session['theme'] except KeyError as e: theme = default_theme theme_html = get_themes(theme, True) title = 'Search' if req_flask.method == "POST": search_html = [] term = sanitize_input((req_flask.form.get("term"))) if len(term) <= 32: results = search_tracks(term) if len(results) > 0: results_table = f'''{''.join(results)}
Track ({len(results)})Playlist
''' search_html.append(results_table) else: search_html.append('No results.') else: search_html.append('Query too long.') body = f'''
{' '.join(search_html)}
''' e = f'''''' return render_template('md_file.html', body=body, title=title, exit=e, theme_html=theme_html) else: search_html = [] search_html.append(f'''
''') search_html.append(f'''''') body = ' '.join(search_html) return render_template('tracklist.html', body=body, title=title, theme_html=theme_html) @app.route('/set_theme', methods=["GET"]) def settings_theme_main(): try: theme = session['theme'] except KeyError as e: theme = default_theme theme_html = get_themes(theme, True) options = themes_dict options_html = [] for item in options: # print(item) options_html.append(f'{options[item]}') title = 'Settings: theme' options_html.append(f'''''') body = ' '.join(options_html) return render_template('tracklist.html', body=body, title=title, theme_html=theme_html) @app.route('/set_theme/', methods=["GET"]) def settings_theme_main_option(input_string): option_val = sanitize_input(input_string) options_html = [] try: theme = session['theme'] except KeyError as e: theme = default_theme theme_html = get_themes(theme, True) if len(option_val) < 24: try: if option_val in themes_dict: failed = False else: failed = True except Exception as e: failed = True else: failed = True meta_reload = '' try: if not failed: session['theme'] = option_val options_html.append(f'Theme: {option_val}') else: options_html.append(f'Invalid value.') except KeyError as e: options_html.append('Cookies needed to work.') meta_reload = f'''''' title = 'Settings: theme' changeit_now = [ f'''''', '

[Refresh now]

', ] for item in changeit_now: options_html.append(item) body = ' '.join(options_html) return render_template('tracklist.html', body=body, title=title, meta=meta_reload, theme_html=theme_html) @app.route('/settings_init', methods=["GET"]) def settings_init(): try: theme = session['theme'] except KeyError as e: theme = default_theme theme_html = get_themes(theme, True) title = 'Loading' wait_time = '2' body = f'''
''' meta_reload = f'''''' return render_template('tracklist.html', body=body, title=title, meta=meta_reload, theme_html=theme_html) @app.route('/set_repeat/', methods=["GET"]) def settings_repeat_main_option(input_string): option_val = sanitize_input(input_string) options_html = [] try: theme = session['theme'] except KeyError as e: theme = default_theme theme_html = get_themes(theme, True) if len(option_val) <= 3: try: if option_val.casefold() == 'on'.casefold(): converted = True elif option_val.casefold() == 'off'.casefold(): converted = False else: failed = True failed = False except Exception as e: failed = True else: failed = True try: if not failed: session['repeat'] = converted options_html.append(f'Repeat {option_val}.') else: options_html.append(f'Invalid value.') except KeyError as e: options_html.append('Cookies needed to work.') meta_reload = f'''''' title = 'Settings: repeat' options_html.append(f'''''') body = ' '.join(options_html) return render_template('tracklist.html', body=body, title=title, meta=meta_reload, theme_html=theme_html) @app.route('/set_delay', methods=["GET"]) def settings_delay_main(): try: theme = session['theme'] except KeyError as e: theme = default_theme theme_html = get_themes(theme, True) options = ['1', '3', '5', '10'] options_html = [] options_html.append('Options (seconds):') for item in options: options_html.append(f'[{item}]') title = 'Settings: delay' options_html.append(f'''''') body = ' '.join(options_html) return render_template('tracklist.html', body=body, title=title, theme_html=theme_html) @app.route('/set_delay/', methods=["GET"]) def settings_delay_main_option(input_string): try: theme = session['theme'] except KeyError as e: theme = default_theme theme_html = get_themes(theme, True) option_val = sanitize_input(input_string) options_html = [] if len(option_val) <= 2: try: converted = int(option_val) failed = False except Exception as e: failed = True else: failed = True try: if not failed: session['delay'] = converted options_html.append(f'Updated to {option_val}.') else: options_html.append(f'Invalid value.') except KeyError as e: options_html.append('Cookies needed to work.') meta_reload = f'''''' title = 'Settings: delay' options_html.append(f'''''') body = ' '.join(options_html) return render_template('tracklist.html', body=body, title=title, meta=meta_reload, theme_html=theme_html) @app.route('/track_list', methods=["GET"]) def playlist_home(): try: theme = session['theme'] except KeyError as e: theme = default_theme theme_html = get_themes(theme, True) title = 'Home' playlists_list = [] playlists_list.append('
') # print(playlist) #
for k, v in playlist.items(): start_ = next(iter(all_tracks[k])) playlists_list.append(f'''
''') playlists_list.append('
Playlists
{k} ({len(v)})
') body = ''.join(playlists_list) return render_template('tracklist.html', body=body, title=title, theme_html=theme_html) @app.route('/track_list/', methods=["GET"]) def track_list(input_string): try: theme = session['theme'] except KeyError as e: theme = default_theme theme_html = get_themes(theme, True) pl = sanitize_input(input_string) title = pl if len(t_dict) > 0: t_dict[pl]['infohash'] if scraping_on: seeds = t_dict[pl]['seeds'] leeches = t_dict[pl]['leeches'] c_ = t_dict[pl]['completed'] if c_ != 0: completed = f' ({c_})' tt = ' (completed)' else: completed = '' tt = '' peers = '' peers = f'''
{seeds}/{leeches}{completed}
''' else: peers = '' # t_row = '' t_row = f''' {peers} ''' else: t_row = '' body = f'''
{t_dict[pl]['total_size']}{t_dict[pl]['unit']}
{(''.join(playlist[pl]))}{t_row}
{pl}
''' return render_template('tracklist.html', body=body, title=title, theme_html=theme_html) @app.route('/track//', methods=["GET"]) def track(input_string1, input_string2): global b32_dict try: b32_n = session['b32_n'] ident = session['ident'] try: b32_ = b32s_[b32_n] except IndexError as e: if len(b32_dict[0]['b32']) > 4 : b32_n = load_balance_b32s(b32_dict) b32_ = b32s_[b32_n] session['b32_n'] = b32_n preconnect = f'''''' else: b32_n = 0 b32_ = '' b32_dict[b32_n]['timestamps'].append([datetime.now(), ident]) except KeyError as e: ident = random_string(10) session['ident'] = ident print(len(b32_dict[0]['b32'])) if len(b32_dict[0]['b32']) > 4 : b32_n = load_balance_b32s(b32_dict) print(b32_n) print(b32s_) b32_ = b32s_[b32_n] else: b32_n = 0 b32_ = '' b32_dict[b32_n]['timestamps'].append([datetime.now(), ident]) session['b32_n'] = b32_n if b32_ != '': preconnect = f'''''' else: preconnect = '' playlist_dir = sanitize_input(input_string1) track_name = sanitize_input(input_string2) track_path = os.path.join(script_directory, f'static/{subpath}/{playlist_dir}', track_name) exists_ = os.path.exists(track_path) try: session['current_track'] = track_name session['current_playlist'] = playlist_dir except Exception as e: print(e) try: delayed = session['delay'] except KeyError as e: delayed = default_delay try: repeat = session['repeat'] except KeyError as e: repeat = False try: theme = session['theme'] except KeyError as e: theme = default_theme theme_html = get_themes(theme, True) try: img = all_tracks[playlist_dir][track_name]['image'] except KeyError as e: img = None if img != None: img_url_ = f'/static/{subpath}/{playlist_dir}/{img}' img_url_converted = urllib.parse.quote(img_url_, safe='/', encoding=None, errors=None) else: img_url_safe = urllib.parse.quote(default_image, safe='/', encoding=None, errors=None) img_url_converted = f'/static/{subpath}/{img_url_safe}' track_list_safe = urllib.parse.quote(f'/track_list/{playlist_dir}', safe='/', encoding=None, errors=None) if exists_: keys_iter = iter(all_tracks[playlist_dir]) nxt_key = None last_key_ = None for key in keys_iter: if key == track_name: nxt_key = next(keys_iter, None) prev_key = last_key_ break else: last_key_ = key if nxt_key == None: nxt_key = list(all_tracks[playlist_dir])[0] if prev_key == None: g_ = list(all_tracks[playlist_dir]) prev_key = g_[len(g_) - 1] print(prev_key) final_track = list(all_tracks[playlist_dir])[-1] if track_name.casefold() == final_track.casefold(): is_last = True else: is_last = False if is_last: if repeat: nxt_load_url = f'/track/{playlist_dir}/{nxt_key}' else: nxt_load_url = '/track' else: nxt_load_url = f'/track/{playlist_dir}/{nxt_key}' length = all_tracks[playlist_dir][track_name]['length'] + delayed track_n = os.path.splitext(os.path.basename(track_name))[0] meta_reload = f'''''' body = f'''''' else: body = f'''ERROR: Audio Not Found!''' meta_reload = f'''''' return render_template('track.html', body=body, title=track_name, preconnect=preconnect, meta_reload=meta_reload, theme_html=theme_html) @app.route('/track', methods=["GET"]) def track_home(): img_url_safe = urllib.parse.quote(default_image, safe='/', encoding=None, errors=None) img_url_converted = f'/static/{img_url_safe}' body = f'''''' try: full_track = session['current_track'] track_name = os.path.splitext(os.path.basename(full_track))[0] playlist_dir = session['current_playlist'] if track_name != None and playlist_dir != None: session['current_track'] = None session['current_playlist'] = None last_song = True else: last_song = False except KeyError as e: print(e) last_song = False except TypeError as e: print(e) last_song = False try: theme = session['theme'] except KeyError as e: theme = default_theme theme_html = get_themes(theme, True) if last_song: url_safe = urllib.parse.quote(f'/track/{playlist_dir}/{full_track}', safe='/', encoding=None, errors=None) resume = f'''
Resume: {track_name}
''' else: resume = '' body = f'''
{resume}
''' track_name = 'Home' return render_template('track.html', body=body, title=track_name, theme_html=theme_html) @app.route('/info', methods=["GET"]) def info(): try: theme = session['theme'] except KeyError as e: theme = default_theme theme_html = get_themes(theme, True) title = 'info' body = '' return render_template('tracklist.html', body=body, title=title, theme_html=theme_html) @app.route('/changelog', methods = ['GET']) def changelog(): try: theme = session['theme'] except KeyError as e: theme = default_theme theme_html = get_themes(theme, True) lines_list = [] title = 'Changelog' e = f'''''' if os.path.isfile(changelog_txt): with open(changelog_txt, 'r') as file: for line_number, line in enumerate(file, start=1): rm_line_break = line.replace('\n', '') div = f'
{rm_line_break}
' lines_list.append(div) pre = ['
', '
'] body = f"{pre[0]}{''.join(lines_list)}{pre[1]}" return render_template('md_file.html', body=body, title=title, exit=e, theme_html=theme_html) @app.route('/about', methods = ['GET']) def about(): try: theme = session['theme'] except KeyError as e: theme = default_theme theme_html = get_themes(theme, True) e = f'''''' file = os.path.join(script_directory, 'md_files/about.md') title = 'About' try: with open(file, 'r') as f: tempMd= f.read() body = f'
{markdown.markdown(tempMd)}
' except (FileNotFoundError) as e: body = 'ERROR - no content found for this!' return render_template('md_file.html', body=body, title=title, exit=e, theme_html=theme_html) @app.route('/tos', methods = ['GET']) def tos(): try: theme = session['theme'] except KeyError as e: theme = default_theme theme_html = get_themes(theme, True) e = f'''''' file = os.path.join(script_directory, 'md_files/tos.md') title = 'TOS' try: with open(file, 'r') as f: tempMd= f.read() body = f'
{markdown.markdown(tempMd)}
' except (FileNotFoundError) as e: body = 'ERROR - no content found for this!' return render_template('md_file.html', body=body, title=title, exit=e, theme_html=theme_html) @app.route('/robots.txt', methods = ['GET']) def robots(): return send_file('robots.txt', as_attachment=True) @app.route("/ping", methods = ['GET']) def ping(): return "PONG" @app.route(f'/{endpoint}', methods=['POST']) def refresh(): global all_tracks, playlist, t_dict data = req_flask.json # try: api_key_path = os.path.join(script_directory, '.apikey') if os.path.exists(api_key_path): api_key = read_txt_file(os.path.join(script_directory, '.apikey'))[0] key_ = sanitize_input(data.get('key')) if len(key_) == 40: if key_ == api_key: cmd = sanitize_input(data.get('command')) if cmd == 'refresh': all_tracks, playlist, t_dict = load_track_data({}, {}, {}) if scraping_on: for pl in t_dict: t_dict[pl]['seeds'], t_dict[pl]['leeches'], t_dict[pl]['completed'] = 0, 0 ,0 t_dict = asyncio.run(scrape_trackers(t_dict)) return 'success' elif cmd == 'update_playlists': try: msg = 'UPDATE' update_api(msg, '.runupdate') except Exception as e: print(f'Error {e}') # if not Th.playlist_thread: # exit_event = threading.Event() # thread_update_playlists = threading.Thread(target=update_playlists_thread, args=(exit_event, script_directory)) # thread_update_playlists.start() # return 'running' # else: # return 'already running' else: return 'failed' else: abort(404) else: abort(404) else: print(f'No api key found') abort(400) @app.route('/', methods = ['GET']) def kill(input_string): abort(404) def tracker_scrape_thread(exit_event, get_peercounts): global t_dict, b32_dict sc = 0 purge_ts = 30 time_difference = timedelta(seconds=15*60) while not exit_event.is_set(): if scraping_on: if sc <= 0: try: t_dict = asyncio.run(scrape_trackers(t_dict)) # print(f'Threads: {threading.active_count()}') time.sleep(scrape_interval) except Exception as e: print('Scraper error {e}') sc = scrape_interval dn = datetime.now() for k, v in b32_dict.items(): temp_list = [] already_seen = [] for n in range(0, len(b32_dict[k]['timestamps'])): if len(b32_dict[k]['timestamps']) != 0: if (b32_dict[k]['timestamps'][n][1]) not in already_seen: if (dn - b32_dict[k]['timestamps'][n][0]) > time_difference: pass else: temp_list.append(b32_dict[k]['timestamps'][n]) already_seen.append((b32_dict[k]['timestamps'][n][1])) b32_dict[k]['timestamps'] = temp_list # for k, v in b32_dict.items(): # print(f"{b32_dict[k]['b32']}:{len(b32_dict[k]['timestamps'])}") # print(b32_dict) time.sleep(purge_ts) if scraping_on: sc -= purge_ts if scraping_on: from scrape import scrape_trackers import asyncio for pl in t_dict: t_dict[pl]['seeds'], t_dict[pl]['leeches'], t_dict[pl]['completed'] = 0, 0 ,0 exit_event = threading.Event() thread_torrent_scraper = threading.Thread(target=tracker_scrape_thread, args=(exit_event, get_peercounts)) thread_torrent_scraper.start() if __name__ == '__main__': app.run(host='127.0.0.1', port=site_port) app.run(debug=True)