i2music/updateplaylists.py

756 lines
27 KiB
Python
Executable File

#!/usr/bin/env python3
from PIL import Image
from configload import random_string, music_paths, bitrate, settings_path, list_ini, make_playlist_torrents, default_image, script_directory, exclude, subpath, torrent_directory, image_resize_kb, wait_before_adding_torrent, hostname, site_port, gunicorn_port
# from configload import *
from pydub import AudioSegment
import fleep, os, random, time, urllib.request, json, shutil, io, requests, natsort
from tqdm import tqdm
from torf import Torrent
from datetime import datetime
from zoneinfo import ZoneInfo
from mutagen.wave import WAVE
from mutagen.mp3 import MP3
from mutagen.oggopus import OggOpus
delete_list:list = []
new_format:str = 'opus'
# container:str = 'ogg'
container:str = 'opus'
video_extensions:list = ['mp4', 'mkv', 'webm', 'av1']
finished:int = 0
playlists_finished:int = 0
tracker_list:list = list_ini(settings_path, 'trackers', 'tracker_list')
class Cl:
red = '\033[91m'
green = '\033[92m'
yellow = "\033[93m"
normal = '\033[0m'
def print_title(menu_title:str, sub_title:str)->None:
'''Gets size of terminal and prints a title section'''
title = ' '.join(menu_title).upper()
terminal_width = shutil.get_terminal_size().columns
line_break = "" * terminal_width
len(title)
spacer_x = " " * int(((terminal_width / 2)-(len(title) / 2)))
title_l = [
'\n',
line_break,
f"{Cl.yellow}{spacer_x}{title}{Cl.normal}",
line_break,
"",
]
if sub_title != "":
title_l.insert(3, sub_title)
for item in title_l:
print(item)
def remove_track_count(t_name:str)->str:
'''If track name starts with "int." remove it'''
int_ = -1
length_ = -1
for i in range(0,4):
x = t_name[0:i]
try:
int_ = int(x)
length_ = i
except ValueError as e:
pass
if int_ != -1:
if t_name[length_:length_+1] == '.':
repl_ = (t_name[0:length_+1])
replaced = t_name.replace(repl_, '')
if replaced[0:1] == ' ':
replaced = replaced[1:]
else:
replaced = t_name
else:
replaced = t_name
return replaced
def update_api(msg: str, txtfile:str)->None:
'''given a path for a textfile and a string, make a one line txt file'''
file_ = f"{script_directory}/{txtfile}"
file_exists = os.path.exists(file_)
new_line = f'{msg}\n'
if not file_exists:
file_object = open(file_, 'a')
file_object.write(new_line)
file_object.close()
else:
file_object = open(file_, 'w')
file_object.writelines(new_line)
file_object.close()
def read_txt_file(file_:str)->list:
'''given a path of a text file, returns each line as a list'''
lines_list =[]
if os.path.isfile(file_):
with open(file_, 'r') as file:
for line_number, line in enumerate(file, start=1):
rm_line_break = line.replace('\n', '')
lines_list.append(rm_line_break)
return lines_list
def send_message(payload:dict, url_:str)->tuple:
'''given a payload and a url, does http post request'''
try:
response = requests.post(url_, json=payload)
if response.status_code == 500:
return True, 'Sent'
else:
return False, f'Failed to send message. Status code: {response.status_code}'
except Exception as e:
return False, 'Failed to send'
def find_images(img_path:str)->list:
'''Given directory, find images files, return as list'''
image_extensions = ['.png', '.jpg', '.jpeg', '.gif', '.bmp', 'webp']
found_image_files = []
for root, dirs, files in os.walk(img_path):
for file in files:
_, file_extension = os.path.splitext(file)
if file_extension.lower() in image_extensions:
if not file[:1] == '.':
found_image_files.append(os.path.join(root, file))
return found_image_files
def find_torrents(torrent_path:str)->list:
'''Given directory, find torrent files, return as list'''
image_extensions = ['.torrent']
found_torrent_files = []
for root, dirs, files in os.walk(torrent_path):
for file in files:
_, file_extension = os.path.splitext(file)
if file_extension.lower() in image_extensions:
if not file[:1] == '.':
found_torrent_files.append(os.path.join(root, file))
return found_torrent_files
def copy_single_file(source_file, dest_file_path):
source_file_base = os.path.basename(source_file)
file_size = os.path.getsize(source_file)
chunk_size = 1024
with tqdm(total=file_size, unit='B', unit_scale=True, unit_divisor=1024) as pbar:
with open(source_file, 'rb') as src_file, open(os.path.join(dest_file_path, os.path.basename(source_file)), 'wb') as dest_file:
while True:
data = src_file.read(chunk_size)
if not data:
break
dest_file.write(data)
pbar.update(len(data))
x = os.path.join(dest_file_path, source_file_base)
return (x)
def get_filename(path_:str)->str:
'''from a path, get a filename'''
filename_ = (os.path.basename(path_).split('/')[-1])
return filename_
def audio_metadata(file:str)->tuple:
'''given a filepath, gets length in seconds and bitrate of an audio file'''
ext = os.path.splitext(file)[1]
af_ = file
if ext == '.mp3':
audio = MP3(af_)
elif ext == '.wave':
audio = WAVE(af_)
elif ext == '.ogg':
audio = OggOpus(af_)
elif ext == '.opus':
audio = OggOpus(af_)
audio_info = audio.info
length = int(audio_info.length)
size_ = (os.path.getsize(af_) / (1000))
bitrate_calc = int((size_ / length) * 8)
return length, bitrate_calc
def find_tracks(music_path:str)->dict:
'''look for music tracks, get the length in seconds for each, returns dict'''
audio_extensions = ['.mp3', '.wav', '.ogg', '.opus']
music_files_dict = {}
found_image_files = find_images(music_path)
for root, dirs, files in os.walk(music_path):
for file in files:
_, file_extension = os.path.splitext(file)
if file_extension.lower() in audio_extensions:
if not file[:1] == '.':
try:
length = audio_metadata(os.path.join(root, file))[0]
filename_ = (os.path.basename(os.path.join(root, file)).split('/')[-1])
image = None
if len(found_image_files) > 0:
for item in found_image_files:
image_filename = ((os.path.basename(os.path.join(root, item)).split('/')[-1]))
if filename_.split('.')[0] == image_filename.split('.')[0]:
image = image_filename
if image is None:
for item in found_image_files:
image_filename = ((os.path.basename(os.path.join(root, item)).split('/')[-1]))
if 'cover' == image_filename.split('.')[0]:
if os.path.isfile((os.path.join(root, image_filename))):
image = image_filename
else:
image = default_image
else:
image = None
music_files_dict[filename_] = {
'length': length,
'image': image,
}
except Exception as e:
print(f'{Cl.red}[Error]{Cl.normal} skipping {file}')
music_files_dict = {key: val for key, val in natsort.natsorted(music_files_dict.items(), reverse=False)}
# print(music_files_dict)
return music_files_dict
def generate_track_list(music_path:str)->list:
'''given a directory, generates a list containing table rows for html of a playlist'''
tracks = find_tracks(music_path)
# print(tracks)
track_list_html = []
name_of_path = os.path.split(os.path.dirname(music_path))[-1]
playlist_path = os.path.basename(music_path)
n = 0
for k, v in tracks.items():
n += 1
track = f'/track/{playlist_path}/{k}'
url_safe = urllib.parse.quote(track, safe='/', encoding=None, errors=None)
track_name = os.path.splitext(os.path.basename(k))[0]
track_list_html.append(f'''<tr><td>{n}. <a href="{url_safe}" target="player">{remove_track_count(track_name)}</a></td></tr>''')
return track_list_html
def IsImageProgressive(image:str)->bool:
'''given image path, finds if image is progressive jpeg'''
previousXFF = False
with open(image, "rb") as f:
byte = f.read(1)
while byte:
byte = f.read(1)
if previousXFF:
if 'xc2' in str(byte):
return True
if 'xff' in str(byte):
previousXFF = True
else:
previousXFF = False
return False
def convert_to_progressive(input_image:str, dest:str)->None:
'''converts an image to progressive jpeg'''
img = Image.open(input_image)
img.save(dest, format="JPEG", quality=80, optimize=True, progressive=True)
def delete_file(item:str)->None:
'''tries to delete given file'''
try:
filename_ = (os.path.basename(item).split('/')[-1])
if os.path.exists(item):
os.remove(item)
else:
print(f"Couldn't delete: {filename_}")
except Exception as e:
print(f"Error: couldn't delete {filename_}")
def delete_all(delete_list:list)->None:
'''deletes all files in a list'''
if len(delete_list) > 0:
for item in delete_list:
filename_ = (os.path.basename(item).split('/')[-1])
print(f'Deleting {filename_}')
delete_file(item)
def find_audio(audio_path:str)->list:
'''finds audio files in a given path, returns a list'''
audio_extensions = ['.wav', '.flac', '.aac', '.ac3', '.m4a', '.opus', '.mkv', '.mp4', '.ogg', '.mp3']
found_audio_files = []
for root, dirs, files in os.walk(audio_path):
for file in files:
_, file_extension = os.path.splitext(file)
if file_extension.lower() in audio_extensions:
if not file[:1] == '.':
found_audio_files.append(os.path.join(root, file))
return found_audio_files
def format_timestamp()->object:
'''Formats a timestamp for torrent generation with torf, randomize minutes and remove, convert to UTC.'''
timestamp_now = "{:%Y-%m-%d %H:%M:%S}".format(datetime.now())
year = int(timestamp_now[:4])
month = int(timestamp_now[5:7])
day = int(timestamp_now[8:10])
hour = int(timestamp_now[11:13])
minute = int(timestamp_now[14:16])
r = random.randint(10,30)
rand_minute = minute + r
if minute + r >= 59:
rand_minute = 59
timestamp = datetime(year, month, day, hour, rand_minute, 0, 0, tzinfo=ZoneInfo(key='UTC'))
return timestamp
def process_path(path:str)->tuple:
'''given a path, determines size, deletes hidden files, gives name of torrent file for use with mktorrent'''
total_size = 0
if os.path.isfile(path):
total_size = os.path.getsize(path) / 1000000
# print(total_size)
remove_extension = path.rfind('.')
if remove_extension != -1:
no_extension = path[:remove_extension]
else:
print(f"{Cl.red}[Error]{Cl.normal} No '.' found in string to remove extension")
torrent_file_name = f'''"{no_extension}.torrent"'''
torrent_file_name_no_qoute = torrent_file_name[1:-1]
file_name_qouted = f'''"{path}"'''
destination_directory_x = path
elif os.path.isdir(path):
time.sleep(0.25)
for filename_y in os.listdir(path):
if filename_y[:1] == '.':
file_path_y = os.path.join(path, filename_y)
if os.path.exists(file_path_y):
os.remove(file_path_y)
print(f"{Cl.green}[Deleted file]{Cl.normal} {get_filename(file_path_y)}")
else:
print(f"{Cl.red}[File not found]{Cl.normal} {get_filename(file_path_y)}")
for dirpath, dirnames, filenames in os.walk(path):
for filename in filenames:
file_path = os.path.join(dirpath, filename)
total_size += os.path.getsize(file_path)
total_size = total_size / 1000000
destination_directory_x = path
torrent_file_name = f'''"{destination_directory_x}.torrent"'''
torrent_file_name_no_qoute = torrent_file_name[1:-1]
file_name_qouted = f'''"{destination_directory_x}"'''
else:
print(f"{Cl.red}[Error]{Cl.normal} not a directory or file??????")
return torrent_file_name, file_name_qouted, torrent_file_name_no_qoute, total_size
def convert_file_size(size:int, digits:int)->tuple:
kb = size / 1000
mb = kb / 1000
gb = mb / 1000
tb = gb / 1000
converted_units = {'KB':kb, 'MB':mb, 'GB':gb, 'TB':tb}
for key, value in converted_units.items():
if 1 <= value <= 1000:
size_converted = round(value, digits)
unit = key
return(size_converted, unit)
def torf_torrent(run_item:str, tracker_list:list, number_count_x:int, path_list:list)->tuple:
'''hashes a torrent from a given directory'''
torrent_file_name, file_name_qouted, torrent_file_name_no_qoute, total_size = process_path(run_item)
delete_file(torrent_file_name_no_qoute)
t = Torrent(path=run_item,
trackers=tracker_list,
creation_date=format_timestamp(),
created_by='torf',
)
t.private = False
t.generate()
try:
t.write(torrent_file_name_no_qoute)
except Exception as e:
print(f'{Cl.red}[Error]{Cl.normal} {e}')
total_size, unit = convert_file_size(t.size, 2)
print(f"{Cl.green}[Hashed {str(number_count_x)} / {str(len(path_list))}]{Cl.normal} {t.infohash} Size - {total_size}{unit}")
playlist_name = get_filename(torrent_file_name_no_qoute)[:-8]
temp_dict = {
# 'torrent': True,
'infohash': t.infohash,
'magnet': str(t.magnet()),
'total_size': total_size,
'unit': unit,
}
return playlist_name, temp_dict
def generate_data_json(music_paths:list, t_dict:dict)->None:
'''generates data.json file given a list of directories'''
playlist = {}
all_tracks = {}
for item in music_paths:
# print(item)
bn = os.path.basename(item)
all_tracks[bn] = find_tracks(item)
playlist[bn] = generate_track_list(item)
if len(t_dict) == 0:
t_dict = {}
data_json = {
'all_tracks': all_tracks,
'playlist': playlist,
'torrent': t_dict
}
json_file = (os.path.join(script_directory, 'data.json'))
delete_file(json_file)
js = json.dumps(data_json)
fp = open(json_file, 'a')
fp.write(js)
fp.close()
def remove_img_metadata(img_path:str)->None:
"""
Remove image metadata and overwrite the original image.
Checks if img_path is a file path or directory. Checks for if given file(s) is an image
based on image_extensions. Prints size of image or combined size of all images if directory
was given in img_path. Removes metadata and overwrites image(s). Prints error otherwise.
Keyword arguments:
image_path -- a path to an image or directory.
"""
image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.bmp', 'webp')
found_image_files = []
if os.path.isfile(img_path):
total_size = round((os.path.getsize(img_path) / 1000),0)
filename = os.path.basename(img_path)
print(f"Size of {filename}: {Cl.green}[{str(total_size)} KB]{Cl.normal}")
if img_path.lower().endswith(image_extensions):
with Image.open(img_path) as img:
img_without_metadata = Image.new("RGB", img.size)
img_without_metadata.paste(img)
img_without_metadata.save(img_path)
print(f"{Cl.green}Removed metadata from {Cl.normal}{filename}")
elif os.path.isdir(img_path):
total_size = 0
for root, dirs, files in os.walk(img_path):
for file in files:
_, file_extension = os.path.splitext(file)
if file_extension.lower() in image_extensions:
if file[:1] == '.':
print(f'[yellow1][Skipping] {file}')
else:
total_size += (os.path.getsize(os.path.join(root, file))) / 1000
found_image_files.append(os.path.join(root, file))
img_total = len(found_image_files)
print(f"Size of images: {Cl.green}[{str(round(total_size, 2))} KB]{Cl.normal}")
img_number = 0
for file_name in found_image_files:
if file_name.lower().endswith(image_extensions):
img_number += 1
file_path = os.path.join(img_path, file_name)
filename = os.path.basename(file_path)
with Image.open(file_path) as img:
img_without_metadata = Image.new("RGB", img.size)
img_without_metadata.paste(img)
img_without_metadata.save(file_path)
print(f"{Cl.green}[{str(img_number)}/{str(img_total)} Removed metadata] {Cl.normal}{get_filename(file_name)}")
else:
print(f"{Cl.red}{img_path} error, not a directory or file??????{Cl.normal}")
def resize_image_file_or_dir(img_path:str, target_size_kb:int, save_path:str)->None:
"""
Resize images to target_size_kb and move to save_pth
checks if image is file or directory. If directory, adds all images to list and
iterates. Then resize image until less than target size. Moves to save path.
If img_path = save_path, then it will save images in img_path/resized
Keyword arguments:
image_path -- a path to an image or directory.
target_size_kb -- kb for the resized img to be
save_path -- where the resized images should go
"""
image_extensions = ['.jpg', '.jpeg', '.png', '.webp']
found_image_files = []
def resize_image_to_target_size(image, target_size_kb):
while True:
img_buffer = io.BytesIO()
image = image.convert("RGB")
image.save(img_buffer, format="JPEG", quality=75)
image_size_bytes = len(img_buffer.getvalue())
if image_size_bytes <= target_size_kb * 995:
return image, image_size_bytes
width, height = image.size
new_width = int(width * 0.9) # resize factor
new_height = int(height * 0.9)
image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)
def resize_image(img_path, target_size_kb, save_path):
images_found = len(found_image_files)
image_files_to_delete = []
if images_found == 0:
print(f"{Cl.red}No images found{Cl.normal}")
if images_found > 0:
images_completed = 0
image_size_total = 0
for string in found_image_files:
images_completed += 1
input_image = (f"{string}")
before_period, _, _ = input_image.partition('.')
image_name = save_path + os.path.basename(before_period) + ".jpeg"
original_image = Image.open(input_image)
if input_image != image_name:
image_files_to_delete.append(input_image)
resized_image, image_size_bytes = resize_image_to_target_size(original_image, target_size_kb)
resized_image.save(image_name, format="JPEG", quality=75)
image_size_kb = image_size_bytes / 995
image_size_total += image_size_kb
print(f"{Cl.green}[Resized: {round(image_size_kb, 2)}KB] {Cl.normal}{os.path.basename(input_image)}")
print(f"{Cl.green}Completed, total size: {round(image_size_total, 2)}KB{Cl.normal}")
return image_files_to_delete
if img_path == save_path:
if os.path.isfile(save_path):
directory_up = os.path.dirname(save_path)
save_path = f"{directory_up}/resized/"
if not os.path.exists(save_path):
os.makedirs(save_path)
else:
if save_path[-1] != "/":
save_path = f"{save_path}/resized/"
else:
save_path = f"{save_path}resized/"
if not os.path.exists(save_path):
os.makedirs(save_path)
else:
if save_path[-1] != "/":
save_path = f"{save_path}/"
total_size = 0
if os.path.isfile(img_path):
found_image_files.append(img_path)
image_files_to_delete = resize_image(img_path, target_size_kb, save_path)
elif os.path.isdir(img_path):
for root, dirs, files in os.walk(img_path):
for file in files:
_, file_extension = os.path.splitext(file)
if file_extension.lower() in image_extensions:
if file[:1] == '.':
print(f'[yellow1][Skipping] {file}')
else:
total_size += (os.path.getsize(os.path.join(root, file))) / 1000
found_image_files.append(os.path.join(root, file))
image_files_to_delete = resize_image(img_path, target_size_kb, save_path)
else:
print(f"{Cl.red} error, not a directory or file??????{Cl.normal}")
if len(image_files_to_delete) > 0:
print(image_files_to_delete)
delete_all(image_files_to_delete)
def process_playlist_images(playlist:str)->list:
'''Convert images to progressive jpeg for faster loading'''
delete_list = []
image_files = find_images(playlist)
audio_file_list = find_audio(playlist)
for item in image_files:
try:
size_ = (os.path.getsize(item) / (1000))
if size_ > (image_resize_kb * 1.1):
resize_image_file_or_dir(item, image_resize_kb, playlist)
extension = os.path.splitext(item)[1][1:]
item = f'{item[:-(len(extension))]}jpeg'
dest_name = os.path.splitext(os.path.basename(item))[0]
dest = os.path.join(playlist, f'{dest_name}.jpeg')
if not (IsImageProgressive(item)):
remove_img_metadata(item)
convert_to_progressive(item, dest)
print(f'{Cl.green}[Converted to progressive jpeg]{Cl.normal} {get_filename(item)}')
if item != dest:
del_old = True
else:
del_old = False
if del_old:
delete_list.append(item)
else:
print(f'{Cl.green}[Image]{Cl.normal} {dest_name} already progressive, skipping')
except Exception as e:
print(f'{Cl.red}[Image]{Cl.normal} {dest_name} ERROR {e}')
delete_all(delete_list)
return audio_file_list
def process_playlist_audio_files(audio_file_list:list, playlists_done:int)->None:
'''convert to ogg/opus'''
delete_list = []
done_ = 0
bitrate_int = int(bitrate.split('k'.casefold())[0])
max_bitrate_ = bitrate_int * 1.3
for item in audio_file_list:
try:
with open(item, "rb") as file:
info = fleep.get(file.read(128))
process_ = True
procede_ = False
delete_og_track = True
filename_ = (os.path.basename(item).split('/')[-1])
extension = os.path.splitext(item)[1][1:]
for m in info.mime:
if 'ogg' in m or 'mp3' in m:
procede_ = True
break
if procede_:
if extension.casefold() == 'ogg'.casefold():
try:
if os.path.exists(item):
delete_og_track = False
os.rename(item, f'{item[:-3]}{container}')
print(f"{Cl.green}[Renamed]{Cl.normal} {filename_} to {filename_[:-3]}{container}")
item = f'{item[:-(len(extension))]}opus'
filename_ = f'{filename_[:-(len(extension))]}opus'
else:
print(f"{Cl.red}[No file found]{Cl.normal} Can't rename {filename_} to {filename_[:-3]}{container}")
process_ = False
except Exception as e:
print(f"{Cl.red}[Error]{Cl.normal} renaming {filename_} to {filename_[:-3]}{container} {e}")
process_ = False
if process_:
bitrate_calc = audio_metadata(item)[1]
if not bitrate_calc > max_bitrate_:
process_ = False
for m in info.extension:
if m in video_extensions:
process_ = True
if process_:
f3 = item.replace(extension, container)
f3_ = (os.path.basename(f3).split('/')[-1])
print(f'{Cl.green}[{done_}/{len(audio_file_list)}]{Cl.normal} Converting {filename_} to {f3_} ...')
sound = AudioSegment.from_file(item)
if delete_og_track:
sound.export(f3, format=new_format, bitrate=bitrate)
delete_list.append(item)
else:
sound.export(item, format=new_format, bitrate=bitrate)
elif extension.casefold() == 'ogg'.casefold():
try:
if os.path.exists(item):
os.rename(item, f'{item[:-3]}{container}')
print(f"{Cl.green}[Renamed]{Cl.normal} {filename_} to {filename_[:-3]}{container}")
else:
print(f"{Cl.red}[No file found]{Cl.normal} Can't rename {filename_} to {filename_[:-3]}{container}")
except Exception as e:
print(f"{Cl.red}[Error]{Cl.normal} renaming {filename_} to {filename_[:-3]}{container}")
else:
print(f'{Cl.green}[{done_}/{len(audio_file_list)}]{Cl.normal} Skipping {filename_}, already opus')
done_ += 1
except Exception as e:
print(f'{Cl.red}[{done_}/{len(audio_file_list)}]{Cl.normal} ERROR {filename_}, {e}')
delete_all(delete_list)
def get_infohash(torrent_file):
t = Torrent.read(torrent_file)
infohash = t.infohash
return infohash
def rename_youtube(path):
formats_youtube = ['_AAC', '_OPUS', '_AC3']
for filename in os.listdir(path):
for item in formats_youtube:
if item in filename:
_list = filename.split(' ')
temp_list = []
for i in range(0, len(_list)):
if item not in _list[i]:
temp_list.append(_list[i])
else:
m = _list[i].split(item)[1][1:]
temp_list.append(m)
fn_rep = f"{' '.join(temp_list[:-1])}"
if fn_rep[-1:] == ' ':
fn_rep = fn_rep[:-1]
fn_rep = f'{fn_rep}{temp_list[-1:][0]}'
try:
os.rename(os.path.join(path,filename), os.path.join(path, fn_rep))
print(f'{Cl.green}[Renamed]{Cl.normal} {filename} > {fn_rep}')
except Exception as e:
print(f"{Cl.red}[Error]]{Cl.normal} {e}")
if __name__ == "__main__":
playlists_done = 0
for playlist in music_paths:
plistname = get_filename(playlist)
print_title((f'[{playlists_done+1}/{len(music_paths)}] {plistname}'), f'Processing playlist - {Cl.green}{plistname}{Cl.normal}')
if not plistname in exclude:
rename_youtube(playlist)
audio_file_list = process_playlist_images(playlist)
delete_all(delete_list); delete_list = []
process_playlist_audio_files(audio_file_list, playlists_done)
else:
print(f'{Cl.yellow}[Skipping]{Cl.normal} {get_filename(playlist)} - Exclude list')
playlists_done += 1
t_dict={}
if make_playlist_torrents:
print_title((f'Hashing torrents'), '')
playlists_finished = 0
for playlist in music_paths:
playlists_finished += 1
p1, p2 = torf_torrent(playlist, tracker_list, playlists_finished, music_paths); t_dict[p1]=p2
if len(torrent_directory) != 0:
print_title('preparing for seeding', 'symlinks data, copies .torrent file')
restart_torrents = []
for w in music_paths:
source_file = os.path.join(script_directory, 'static', subpath, f'{os.path.basename(w)}.torrent')
source_file_base = os.path.basename(source_file)
t_file_current = os.path.join(torrent_directory, source_file_base)
try:
if get_infohash(t_file_current).casefold() == (get_infohash(source_file)).casefold():
pass
else:
delete_file(t_file_current)
restart_torrents.append(w)
# print(get_infohash(t_file_current))
# print(get_infohash(source_file))
except Exception as e:
pass
if len(restart_torrents) > 0:
rs_torrents_filenames = []
for q in restart_torrents:
rs_torrents_filenames.append(get_filename(q))
print(rs_torrents_filenames)
print_title(f'Waiting {wait_before_adding_torrent} seconds', f"{Cl.green}New torrents:{Cl.normal} {' | '.join(rs_torrents_filenames)}")
print(f'Allowing time for torrent client to recognize change...')
time.sleep(wait_before_adding_torrent)
for w in music_paths:
source_file = os.path.join(script_directory, 'static', subpath, f'{os.path.basename(w)}.torrent')
source_file_base = os.path.basename(source_file)
file_name = get_filename(w)
symlink_name = os.path.join(torrent_directory, file_name)
t_file_current = os.path.join(torrent_directory, source_file_base)
if w in restart_torrents or not os.path.exists(symlink_name):
try:
os.symlink(w, symlink_name)
print(f'{Cl.green}[symlink made]{Cl.normal} {file_name} -> {w}')
except FileExistsError:
print('hash changed')
print(f'{Cl.green}[Symlink exists, replacing]{Cl.normal} {file_name} -> {w}')
delete_file(symlink_name)
os.symlink(w, symlink_name)
if w in restart_torrents or not os.path.exists(t_file_current):
print(f'{Cl.green}[Copying .torrent file]{Cl.normal}')
copy_single_file(source_file, torrent_directory)
else:
print(f'{Cl.green}[Infohash unchanged] Not copying new torrent file')
else:
print(f"{Cl.red}[No 'torrent_directory' found]{Cl.normal} Will not create symlinks/copy .torrent file.")
print(f'{Cl.green}[data.json]{Cl.normal} Updating the data.json file')
generate_data_json(music_paths, t_dict)
endpoint_path = os.path.join(script_directory, '.endpoint')
if os.path.exists(endpoint_path):
endpoint = read_txt_file(os.path.join(script_directory, '.endpoint'))[0]
api_key = random_string(40)
update_api(api_key, '.apikey')
payload = {
'key': api_key,
'command': 'refresh',
}
url_ = f'http://{hostname}:{gunicorn_port}/{endpoint}'
sent_ = send_message(payload, url_)[0]
if not sent_:
print(f"{Cl.red}[Trying flask port {site_port}]{Cl.normal} Couldn't reach gunicorn on port {gunicorn_port}")
url_ = f'http://{hostname}:{site_port}/{endpoint}'
sent_ = send_message(payload, url_)[0]
if not sent_:
print(f"{Cl.red}[Couldn't reach site]{Cl.normal} If it's running, check your 'gunicorn_port' and 'site_port' in your config file. Script is checking on ports {gunicorn_port} and {site_port}")
if sent_:
print(f'{Cl.green}[Refreshing site]{Cl.normal} Command sent')
print(f'{Cl.green}[Done]{Cl.normal}')
else:
print(f"{Cl.red}[Can't refresh webapp]]{Cl.normal} No file '.endpoint' found in script directory, is webapp running?")