mirror of http://git.simp.i2p/simp/i2music.git
756 lines
27 KiB
Python
Executable File
756 lines
27 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
from PIL import Image
|
|
from configload import random_string, music_paths, bitrate, settings_path, list_ini, make_playlist_torrents, default_image, script_directory, exclude, subpath, torrent_directory, image_resize_kb, wait_before_adding_torrent, hostname, site_port, gunicorn_port
|
|
# from configload import *
|
|
from pydub import AudioSegment
|
|
import fleep, os, random, time, urllib.request, json, shutil, io, requests, natsort
|
|
from tqdm import tqdm
|
|
from torf import Torrent
|
|
from datetime import datetime
|
|
from zoneinfo import ZoneInfo
|
|
from mutagen.wave import WAVE
|
|
from mutagen.mp3 import MP3
|
|
from mutagen.oggopus import OggOpus
|
|
|
|
delete_list:list = []
|
|
new_format:str = 'opus'
|
|
# container:str = 'ogg'
|
|
container:str = 'opus'
|
|
video_extensions:list = ['mp4', 'mkv', 'webm', 'av1']
|
|
finished:int = 0
|
|
playlists_finished:int = 0
|
|
tracker_list:list = list_ini(settings_path, 'trackers', 'tracker_list')
|
|
|
|
class Cl:
|
|
red = '\033[91m'
|
|
green = '\033[92m'
|
|
yellow = "\033[93m"
|
|
normal = '\033[0m'
|
|
|
|
def print_title(menu_title:str, sub_title:str)->None:
|
|
'''Gets size of terminal and prints a title section'''
|
|
title = ' '.join(menu_title).upper()
|
|
terminal_width = shutil.get_terminal_size().columns
|
|
line_break = "⎯" * terminal_width
|
|
len(title)
|
|
spacer_x = " " * int(((terminal_width / 2)-(len(title) / 2)))
|
|
title_l = [
|
|
'\n',
|
|
line_break,
|
|
f"{Cl.yellow}{spacer_x}{title}{Cl.normal}",
|
|
line_break,
|
|
"",
|
|
]
|
|
if sub_title != "":
|
|
title_l.insert(3, sub_title)
|
|
for item in title_l:
|
|
print(item)
|
|
|
|
def remove_track_count(t_name:str)->str:
|
|
'''If track name starts with "int." remove it'''
|
|
int_ = -1
|
|
length_ = -1
|
|
for i in range(0,4):
|
|
x = t_name[0:i]
|
|
try:
|
|
int_ = int(x)
|
|
length_ = i
|
|
except ValueError as e:
|
|
pass
|
|
if int_ != -1:
|
|
if t_name[length_:length_+1] == '.':
|
|
repl_ = (t_name[0:length_+1])
|
|
replaced = t_name.replace(repl_, '')
|
|
if replaced[0:1] == ' ':
|
|
replaced = replaced[1:]
|
|
else:
|
|
replaced = t_name
|
|
else:
|
|
replaced = t_name
|
|
return replaced
|
|
|
|
def update_api(msg: str, txtfile:str)->None:
|
|
'''given a path for a textfile and a string, make a one line txt file'''
|
|
file_ = f"{script_directory}/{txtfile}"
|
|
file_exists = os.path.exists(file_)
|
|
new_line = f'{msg}\n'
|
|
if not file_exists:
|
|
file_object = open(file_, 'a')
|
|
file_object.write(new_line)
|
|
file_object.close()
|
|
else:
|
|
file_object = open(file_, 'w')
|
|
file_object.writelines(new_line)
|
|
file_object.close()
|
|
|
|
def read_txt_file(file_:str)->list:
|
|
'''given a path of a text file, returns each line as a list'''
|
|
lines_list =[]
|
|
if os.path.isfile(file_):
|
|
with open(file_, 'r') as file:
|
|
for line_number, line in enumerate(file, start=1):
|
|
rm_line_break = line.replace('\n', '')
|
|
lines_list.append(rm_line_break)
|
|
return lines_list
|
|
|
|
def send_message(payload:dict, url_:str)->tuple:
|
|
'''given a payload and a url, does http post request'''
|
|
try:
|
|
response = requests.post(url_, json=payload)
|
|
if response.status_code == 500:
|
|
return True, 'Sent'
|
|
else:
|
|
return False, f'Failed to send message. Status code: {response.status_code}'
|
|
except Exception as e:
|
|
return False, 'Failed to send'
|
|
|
|
def find_images(img_path:str)->list:
|
|
'''Given directory, find images files, return as list'''
|
|
image_extensions = ['.png', '.jpg', '.jpeg', '.gif', '.bmp', 'webp']
|
|
found_image_files = []
|
|
for root, dirs, files in os.walk(img_path):
|
|
for file in files:
|
|
_, file_extension = os.path.splitext(file)
|
|
if file_extension.lower() in image_extensions:
|
|
if not file[:1] == '.':
|
|
found_image_files.append(os.path.join(root, file))
|
|
return found_image_files
|
|
|
|
def find_torrents(torrent_path:str)->list:
|
|
'''Given directory, find torrent files, return as list'''
|
|
image_extensions = ['.torrent']
|
|
found_torrent_files = []
|
|
for root, dirs, files in os.walk(torrent_path):
|
|
for file in files:
|
|
_, file_extension = os.path.splitext(file)
|
|
if file_extension.lower() in image_extensions:
|
|
if not file[:1] == '.':
|
|
found_torrent_files.append(os.path.join(root, file))
|
|
return found_torrent_files
|
|
|
|
def copy_single_file(source_file, dest_file_path):
|
|
source_file_base = os.path.basename(source_file)
|
|
file_size = os.path.getsize(source_file)
|
|
chunk_size = 1024
|
|
with tqdm(total=file_size, unit='B', unit_scale=True, unit_divisor=1024) as pbar:
|
|
with open(source_file, 'rb') as src_file, open(os.path.join(dest_file_path, os.path.basename(source_file)), 'wb') as dest_file:
|
|
while True:
|
|
data = src_file.read(chunk_size)
|
|
if not data:
|
|
break
|
|
dest_file.write(data)
|
|
pbar.update(len(data))
|
|
x = os.path.join(dest_file_path, source_file_base)
|
|
return (x)
|
|
|
|
def get_filename(path_:str)->str:
|
|
'''from a path, get a filename'''
|
|
filename_ = (os.path.basename(path_).split('/')[-1])
|
|
return filename_
|
|
|
|
def audio_metadata(file:str)->tuple:
|
|
'''given a filepath, gets length in seconds and bitrate of an audio file'''
|
|
ext = os.path.splitext(file)[1]
|
|
af_ = file
|
|
if ext == '.mp3':
|
|
audio = MP3(af_)
|
|
elif ext == '.wave':
|
|
audio = WAVE(af_)
|
|
elif ext == '.ogg':
|
|
audio = OggOpus(af_)
|
|
elif ext == '.opus':
|
|
audio = OggOpus(af_)
|
|
audio_info = audio.info
|
|
length = int(audio_info.length)
|
|
size_ = (os.path.getsize(af_) / (1000))
|
|
bitrate_calc = int((size_ / length) * 8)
|
|
return length, bitrate_calc
|
|
def find_tracks(music_path:str)->dict:
|
|
'''look for music tracks, get the length in seconds for each, returns dict'''
|
|
audio_extensions = ['.mp3', '.wav', '.ogg', '.opus']
|
|
music_files_dict = {}
|
|
found_image_files = find_images(music_path)
|
|
for root, dirs, files in os.walk(music_path):
|
|
for file in files:
|
|
_, file_extension = os.path.splitext(file)
|
|
if file_extension.lower() in audio_extensions:
|
|
if not file[:1] == '.':
|
|
try:
|
|
length = audio_metadata(os.path.join(root, file))[0]
|
|
filename_ = (os.path.basename(os.path.join(root, file)).split('/')[-1])
|
|
image = None
|
|
if len(found_image_files) > 0:
|
|
for item in found_image_files:
|
|
image_filename = ((os.path.basename(os.path.join(root, item)).split('/')[-1]))
|
|
if filename_.split('.')[0] == image_filename.split('.')[0]:
|
|
image = image_filename
|
|
if image is None:
|
|
for item in found_image_files:
|
|
image_filename = ((os.path.basename(os.path.join(root, item)).split('/')[-1]))
|
|
if 'cover' == image_filename.split('.')[0]:
|
|
if os.path.isfile((os.path.join(root, image_filename))):
|
|
image = image_filename
|
|
else:
|
|
image = default_image
|
|
else:
|
|
image = None
|
|
music_files_dict[filename_] = {
|
|
'length': length,
|
|
'image': image,
|
|
}
|
|
except Exception as e:
|
|
print(f'{Cl.red}[Error]{Cl.normal} skipping {file}')
|
|
music_files_dict = {key: val for key, val in natsort.natsorted(music_files_dict.items(), reverse=False)}
|
|
# print(music_files_dict)
|
|
return music_files_dict
|
|
|
|
def generate_track_list(music_path:str)->list:
|
|
'''given a directory, generates a list containing table rows for html of a playlist'''
|
|
tracks = find_tracks(music_path)
|
|
# print(tracks)
|
|
track_list_html = []
|
|
name_of_path = os.path.split(os.path.dirname(music_path))[-1]
|
|
playlist_path = os.path.basename(music_path)
|
|
n = 0
|
|
for k, v in tracks.items():
|
|
n += 1
|
|
track = f'/track/{playlist_path}/{k}'
|
|
url_safe = urllib.parse.quote(track, safe='/', encoding=None, errors=None)
|
|
track_name = os.path.splitext(os.path.basename(k))[0]
|
|
track_list_html.append(f'''<tr><td>{n}. <a href="{url_safe}" target="player">{remove_track_count(track_name)}</a></td></tr>''')
|
|
return track_list_html
|
|
|
|
def IsImageProgressive(image:str)->bool:
|
|
'''given image path, finds if image is progressive jpeg'''
|
|
previousXFF = False
|
|
with open(image, "rb") as f:
|
|
byte = f.read(1)
|
|
while byte:
|
|
byte = f.read(1)
|
|
if previousXFF:
|
|
if 'xc2' in str(byte):
|
|
return True
|
|
if 'xff' in str(byte):
|
|
previousXFF = True
|
|
else:
|
|
previousXFF = False
|
|
return False
|
|
|
|
def convert_to_progressive(input_image:str, dest:str)->None:
|
|
'''converts an image to progressive jpeg'''
|
|
img = Image.open(input_image)
|
|
img.save(dest, format="JPEG", quality=80, optimize=True, progressive=True)
|
|
|
|
def delete_file(item:str)->None:
|
|
'''tries to delete given file'''
|
|
try:
|
|
filename_ = (os.path.basename(item).split('/')[-1])
|
|
if os.path.exists(item):
|
|
os.remove(item)
|
|
else:
|
|
print(f"Couldn't delete: {filename_}")
|
|
except Exception as e:
|
|
print(f"Error: couldn't delete {filename_}")
|
|
|
|
def delete_all(delete_list:list)->None:
|
|
'''deletes all files in a list'''
|
|
if len(delete_list) > 0:
|
|
for item in delete_list:
|
|
filename_ = (os.path.basename(item).split('/')[-1])
|
|
print(f'Deleting {filename_}')
|
|
delete_file(item)
|
|
|
|
def find_audio(audio_path:str)->list:
|
|
'''finds audio files in a given path, returns a list'''
|
|
audio_extensions = ['.wav', '.flac', '.aac', '.ac3', '.m4a', '.opus', '.mkv', '.mp4', '.ogg', '.mp3']
|
|
found_audio_files = []
|
|
for root, dirs, files in os.walk(audio_path):
|
|
for file in files:
|
|
_, file_extension = os.path.splitext(file)
|
|
if file_extension.lower() in audio_extensions:
|
|
if not file[:1] == '.':
|
|
found_audio_files.append(os.path.join(root, file))
|
|
return found_audio_files
|
|
|
|
def format_timestamp()->object:
|
|
'''Formats a timestamp for torrent generation with torf, randomize minutes and remove, convert to UTC.'''
|
|
timestamp_now = "{:%Y-%m-%d %H:%M:%S}".format(datetime.now())
|
|
year = int(timestamp_now[:4])
|
|
month = int(timestamp_now[5:7])
|
|
day = int(timestamp_now[8:10])
|
|
hour = int(timestamp_now[11:13])
|
|
minute = int(timestamp_now[14:16])
|
|
r = random.randint(10,30)
|
|
rand_minute = minute + r
|
|
if minute + r >= 59:
|
|
rand_minute = 59
|
|
timestamp = datetime(year, month, day, hour, rand_minute, 0, 0, tzinfo=ZoneInfo(key='UTC'))
|
|
return timestamp
|
|
|
|
def process_path(path:str)->tuple:
|
|
'''given a path, determines size, deletes hidden files, gives name of torrent file for use with mktorrent'''
|
|
total_size = 0
|
|
if os.path.isfile(path):
|
|
total_size = os.path.getsize(path) / 1000000
|
|
# print(total_size)
|
|
remove_extension = path.rfind('.')
|
|
if remove_extension != -1:
|
|
no_extension = path[:remove_extension]
|
|
else:
|
|
print(f"{Cl.red}[Error]{Cl.normal} No '.' found in string to remove extension")
|
|
torrent_file_name = f'''"{no_extension}.torrent"'''
|
|
torrent_file_name_no_qoute = torrent_file_name[1:-1]
|
|
file_name_qouted = f'''"{path}"'''
|
|
destination_directory_x = path
|
|
elif os.path.isdir(path):
|
|
time.sleep(0.25)
|
|
for filename_y in os.listdir(path):
|
|
if filename_y[:1] == '.':
|
|
file_path_y = os.path.join(path, filename_y)
|
|
if os.path.exists(file_path_y):
|
|
os.remove(file_path_y)
|
|
print(f"{Cl.green}[Deleted file]{Cl.normal} {get_filename(file_path_y)}")
|
|
else:
|
|
print(f"{Cl.red}[File not found]{Cl.normal} {get_filename(file_path_y)}")
|
|
for dirpath, dirnames, filenames in os.walk(path):
|
|
for filename in filenames:
|
|
file_path = os.path.join(dirpath, filename)
|
|
total_size += os.path.getsize(file_path)
|
|
total_size = total_size / 1000000
|
|
destination_directory_x = path
|
|
torrent_file_name = f'''"{destination_directory_x}.torrent"'''
|
|
torrent_file_name_no_qoute = torrent_file_name[1:-1]
|
|
file_name_qouted = f'''"{destination_directory_x}"'''
|
|
else:
|
|
print(f"{Cl.red}[Error]{Cl.normal} not a directory or file??????")
|
|
return torrent_file_name, file_name_qouted, torrent_file_name_no_qoute, total_size
|
|
|
|
def convert_file_size(size:int, digits:int)->tuple:
|
|
kb = size / 1000
|
|
mb = kb / 1000
|
|
gb = mb / 1000
|
|
tb = gb / 1000
|
|
converted_units = {'KB':kb, 'MB':mb, 'GB':gb, 'TB':tb}
|
|
for key, value in converted_units.items():
|
|
if 1 <= value <= 1000:
|
|
size_converted = round(value, digits)
|
|
unit = key
|
|
return(size_converted, unit)
|
|
|
|
def torf_torrent(run_item:str, tracker_list:list, number_count_x:int, path_list:list)->tuple:
|
|
'''hashes a torrent from a given directory'''
|
|
torrent_file_name, file_name_qouted, torrent_file_name_no_qoute, total_size = process_path(run_item)
|
|
delete_file(torrent_file_name_no_qoute)
|
|
t = Torrent(path=run_item,
|
|
trackers=tracker_list,
|
|
creation_date=format_timestamp(),
|
|
created_by='torf',
|
|
)
|
|
t.private = False
|
|
t.generate()
|
|
try:
|
|
t.write(torrent_file_name_no_qoute)
|
|
except Exception as e:
|
|
print(f'{Cl.red}[Error]{Cl.normal} {e}')
|
|
|
|
total_size, unit = convert_file_size(t.size, 2)
|
|
print(f"{Cl.green}[Hashed {str(number_count_x)} / {str(len(path_list))}]{Cl.normal} {t.infohash} Size - {total_size}{unit}")
|
|
playlist_name = get_filename(torrent_file_name_no_qoute)[:-8]
|
|
temp_dict = {
|
|
# 'torrent': True,
|
|
'infohash': t.infohash,
|
|
'magnet': str(t.magnet()),
|
|
'total_size': total_size,
|
|
'unit': unit,
|
|
}
|
|
return playlist_name, temp_dict
|
|
|
|
def generate_data_json(music_paths:list, t_dict:dict)->None:
|
|
'''generates data.json file given a list of directories'''
|
|
playlist = {}
|
|
all_tracks = {}
|
|
for item in music_paths:
|
|
# print(item)
|
|
bn = os.path.basename(item)
|
|
all_tracks[bn] = find_tracks(item)
|
|
playlist[bn] = generate_track_list(item)
|
|
if len(t_dict) == 0:
|
|
t_dict = {}
|
|
|
|
data_json = {
|
|
'all_tracks': all_tracks,
|
|
'playlist': playlist,
|
|
'torrent': t_dict
|
|
}
|
|
|
|
json_file = (os.path.join(script_directory, 'data.json'))
|
|
delete_file(json_file)
|
|
js = json.dumps(data_json)
|
|
fp = open(json_file, 'a')
|
|
fp.write(js)
|
|
fp.close()
|
|
|
|
def remove_img_metadata(img_path:str)->None:
|
|
"""
|
|
Remove image metadata and overwrite the original image.
|
|
|
|
Checks if img_path is a file path or directory. Checks for if given file(s) is an image
|
|
based on image_extensions. Prints size of image or combined size of all images if directory
|
|
was given in img_path. Removes metadata and overwrites image(s). Prints error otherwise.
|
|
|
|
Keyword arguments:
|
|
image_path -- a path to an image or directory.
|
|
"""
|
|
image_extensions = ('.png', '.jpg', '.jpeg', '.gif', '.bmp', 'webp')
|
|
found_image_files = []
|
|
|
|
if os.path.isfile(img_path):
|
|
total_size = round((os.path.getsize(img_path) / 1000),0)
|
|
filename = os.path.basename(img_path)
|
|
print(f"Size of {filename}: {Cl.green}[{str(total_size)} KB]{Cl.normal}")
|
|
if img_path.lower().endswith(image_extensions):
|
|
with Image.open(img_path) as img:
|
|
img_without_metadata = Image.new("RGB", img.size)
|
|
img_without_metadata.paste(img)
|
|
img_without_metadata.save(img_path)
|
|
print(f"{Cl.green}Removed metadata from {Cl.normal}{filename}")
|
|
elif os.path.isdir(img_path):
|
|
total_size = 0
|
|
for root, dirs, files in os.walk(img_path):
|
|
for file in files:
|
|
_, file_extension = os.path.splitext(file)
|
|
if file_extension.lower() in image_extensions:
|
|
|
|
if file[:1] == '.':
|
|
print(f'[yellow1][Skipping] {file}')
|
|
else:
|
|
total_size += (os.path.getsize(os.path.join(root, file))) / 1000
|
|
found_image_files.append(os.path.join(root, file))
|
|
|
|
img_total = len(found_image_files)
|
|
print(f"Size of images: {Cl.green}[{str(round(total_size, 2))} KB]{Cl.normal}")
|
|
img_number = 0
|
|
for file_name in found_image_files:
|
|
if file_name.lower().endswith(image_extensions):
|
|
img_number += 1
|
|
file_path = os.path.join(img_path, file_name)
|
|
filename = os.path.basename(file_path)
|
|
with Image.open(file_path) as img:
|
|
img_without_metadata = Image.new("RGB", img.size)
|
|
img_without_metadata.paste(img)
|
|
img_without_metadata.save(file_path)
|
|
print(f"{Cl.green}[{str(img_number)}/{str(img_total)} Removed metadata] {Cl.normal}{get_filename(file_name)}")
|
|
else:
|
|
print(f"{Cl.red}{img_path} error, not a directory or file??????{Cl.normal}")
|
|
|
|
def resize_image_file_or_dir(img_path:str, target_size_kb:int, save_path:str)->None:
|
|
"""
|
|
Resize images to target_size_kb and move to save_pth
|
|
|
|
checks if image is file or directory. If directory, adds all images to list and
|
|
iterates. Then resize image until less than target size. Moves to save path.
|
|
If img_path = save_path, then it will save images in img_path/resized
|
|
|
|
Keyword arguments:
|
|
image_path -- a path to an image or directory.
|
|
target_size_kb -- kb for the resized img to be
|
|
save_path -- where the resized images should go
|
|
"""
|
|
image_extensions = ['.jpg', '.jpeg', '.png', '.webp']
|
|
found_image_files = []
|
|
|
|
def resize_image_to_target_size(image, target_size_kb):
|
|
while True:
|
|
img_buffer = io.BytesIO()
|
|
image = image.convert("RGB")
|
|
image.save(img_buffer, format="JPEG", quality=75)
|
|
image_size_bytes = len(img_buffer.getvalue())
|
|
if image_size_bytes <= target_size_kb * 995:
|
|
return image, image_size_bytes
|
|
width, height = image.size
|
|
new_width = int(width * 0.9) # resize factor
|
|
new_height = int(height * 0.9)
|
|
image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)
|
|
|
|
def resize_image(img_path, target_size_kb, save_path):
|
|
images_found = len(found_image_files)
|
|
image_files_to_delete = []
|
|
if images_found == 0:
|
|
print(f"{Cl.red}No images found{Cl.normal}")
|
|
if images_found > 0:
|
|
images_completed = 0
|
|
image_size_total = 0
|
|
for string in found_image_files:
|
|
images_completed += 1
|
|
input_image = (f"{string}")
|
|
before_period, _, _ = input_image.partition('.')
|
|
image_name = save_path + os.path.basename(before_period) + ".jpeg"
|
|
original_image = Image.open(input_image)
|
|
if input_image != image_name:
|
|
image_files_to_delete.append(input_image)
|
|
resized_image, image_size_bytes = resize_image_to_target_size(original_image, target_size_kb)
|
|
resized_image.save(image_name, format="JPEG", quality=75)
|
|
image_size_kb = image_size_bytes / 995
|
|
image_size_total += image_size_kb
|
|
print(f"{Cl.green}[Resized: {round(image_size_kb, 2)}KB] {Cl.normal}{os.path.basename(input_image)}")
|
|
|
|
print(f"{Cl.green}Completed, total size: {round(image_size_total, 2)}KB{Cl.normal}")
|
|
return image_files_to_delete
|
|
|
|
if img_path == save_path:
|
|
if os.path.isfile(save_path):
|
|
directory_up = os.path.dirname(save_path)
|
|
save_path = f"{directory_up}/resized/"
|
|
if not os.path.exists(save_path):
|
|
os.makedirs(save_path)
|
|
else:
|
|
if save_path[-1] != "/":
|
|
save_path = f"{save_path}/resized/"
|
|
else:
|
|
save_path = f"{save_path}resized/"
|
|
if not os.path.exists(save_path):
|
|
os.makedirs(save_path)
|
|
else:
|
|
if save_path[-1] != "/":
|
|
save_path = f"{save_path}/"
|
|
total_size = 0
|
|
if os.path.isfile(img_path):
|
|
found_image_files.append(img_path)
|
|
image_files_to_delete = resize_image(img_path, target_size_kb, save_path)
|
|
elif os.path.isdir(img_path):
|
|
for root, dirs, files in os.walk(img_path):
|
|
for file in files:
|
|
_, file_extension = os.path.splitext(file)
|
|
if file_extension.lower() in image_extensions:
|
|
if file[:1] == '.':
|
|
print(f'[yellow1][Skipping] {file}')
|
|
else:
|
|
total_size += (os.path.getsize(os.path.join(root, file))) / 1000
|
|
found_image_files.append(os.path.join(root, file))
|
|
image_files_to_delete = resize_image(img_path, target_size_kb, save_path)
|
|
else:
|
|
print(f"{Cl.red} error, not a directory or file??????{Cl.normal}")
|
|
if len(image_files_to_delete) > 0:
|
|
print(image_files_to_delete)
|
|
delete_all(image_files_to_delete)
|
|
|
|
def process_playlist_images(playlist:str)->list:
|
|
'''Convert images to progressive jpeg for faster loading'''
|
|
delete_list = []
|
|
image_files = find_images(playlist)
|
|
audio_file_list = find_audio(playlist)
|
|
for item in image_files:
|
|
try:
|
|
size_ = (os.path.getsize(item) / (1000))
|
|
if size_ > (image_resize_kb * 1.1):
|
|
resize_image_file_or_dir(item, image_resize_kb, playlist)
|
|
extension = os.path.splitext(item)[1][1:]
|
|
item = f'{item[:-(len(extension))]}jpeg'
|
|
dest_name = os.path.splitext(os.path.basename(item))[0]
|
|
dest = os.path.join(playlist, f'{dest_name}.jpeg')
|
|
if not (IsImageProgressive(item)):
|
|
remove_img_metadata(item)
|
|
convert_to_progressive(item, dest)
|
|
print(f'{Cl.green}[Converted to progressive jpeg]{Cl.normal} {get_filename(item)}')
|
|
if item != dest:
|
|
del_old = True
|
|
else:
|
|
del_old = False
|
|
if del_old:
|
|
delete_list.append(item)
|
|
else:
|
|
print(f'{Cl.green}[Image]{Cl.normal} {dest_name} already progressive, skipping')
|
|
except Exception as e:
|
|
print(f'{Cl.red}[Image]{Cl.normal} {dest_name} ERROR {e}')
|
|
delete_all(delete_list)
|
|
return audio_file_list
|
|
|
|
def process_playlist_audio_files(audio_file_list:list, playlists_done:int)->None:
|
|
'''convert to ogg/opus'''
|
|
delete_list = []
|
|
done_ = 0
|
|
bitrate_int = int(bitrate.split('k'.casefold())[0])
|
|
max_bitrate_ = bitrate_int * 1.3
|
|
for item in audio_file_list:
|
|
try:
|
|
with open(item, "rb") as file:
|
|
info = fleep.get(file.read(128))
|
|
process_ = True
|
|
procede_ = False
|
|
delete_og_track = True
|
|
filename_ = (os.path.basename(item).split('/')[-1])
|
|
extension = os.path.splitext(item)[1][1:]
|
|
for m in info.mime:
|
|
if 'ogg' in m or 'mp3' in m:
|
|
procede_ = True
|
|
break
|
|
if procede_:
|
|
if extension.casefold() == 'ogg'.casefold():
|
|
try:
|
|
if os.path.exists(item):
|
|
delete_og_track = False
|
|
os.rename(item, f'{item[:-3]}{container}')
|
|
print(f"{Cl.green}[Renamed]{Cl.normal} {filename_} to {filename_[:-3]}{container}")
|
|
item = f'{item[:-(len(extension))]}opus'
|
|
filename_ = f'{filename_[:-(len(extension))]}opus'
|
|
else:
|
|
print(f"{Cl.red}[No file found]{Cl.normal} Can't rename {filename_} to {filename_[:-3]}{container}")
|
|
process_ = False
|
|
except Exception as e:
|
|
print(f"{Cl.red}[Error]{Cl.normal} renaming {filename_} to {filename_[:-3]}{container} {e}")
|
|
process_ = False
|
|
if process_:
|
|
bitrate_calc = audio_metadata(item)[1]
|
|
if not bitrate_calc > max_bitrate_:
|
|
process_ = False
|
|
for m in info.extension:
|
|
if m in video_extensions:
|
|
process_ = True
|
|
if process_:
|
|
f3 = item.replace(extension, container)
|
|
f3_ = (os.path.basename(f3).split('/')[-1])
|
|
print(f'{Cl.green}[{done_}/{len(audio_file_list)}]{Cl.normal} Converting {filename_} to {f3_} ...')
|
|
sound = AudioSegment.from_file(item)
|
|
if delete_og_track:
|
|
sound.export(f3, format=new_format, bitrate=bitrate)
|
|
delete_list.append(item)
|
|
else:
|
|
sound.export(item, format=new_format, bitrate=bitrate)
|
|
elif extension.casefold() == 'ogg'.casefold():
|
|
try:
|
|
if os.path.exists(item):
|
|
os.rename(item, f'{item[:-3]}{container}')
|
|
print(f"{Cl.green}[Renamed]{Cl.normal} {filename_} to {filename_[:-3]}{container}")
|
|
else:
|
|
print(f"{Cl.red}[No file found]{Cl.normal} Can't rename {filename_} to {filename_[:-3]}{container}")
|
|
except Exception as e:
|
|
print(f"{Cl.red}[Error]{Cl.normal} renaming {filename_} to {filename_[:-3]}{container}")
|
|
else:
|
|
print(f'{Cl.green}[{done_}/{len(audio_file_list)}]{Cl.normal} Skipping {filename_}, already opus')
|
|
done_ += 1
|
|
except Exception as e:
|
|
print(f'{Cl.red}[{done_}/{len(audio_file_list)}]{Cl.normal} ERROR {filename_}, {e}')
|
|
delete_all(delete_list)
|
|
|
|
def get_infohash(torrent_file):
|
|
t = Torrent.read(torrent_file)
|
|
infohash = t.infohash
|
|
return infohash
|
|
|
|
def rename_youtube(path):
|
|
formats_youtube = ['_AAC', '_OPUS', '_AC3']
|
|
for filename in os.listdir(path):
|
|
for item in formats_youtube:
|
|
if item in filename:
|
|
_list = filename.split(' ')
|
|
temp_list = []
|
|
for i in range(0, len(_list)):
|
|
if item not in _list[i]:
|
|
temp_list.append(_list[i])
|
|
else:
|
|
m = _list[i].split(item)[1][1:]
|
|
temp_list.append(m)
|
|
|
|
fn_rep = f"{' '.join(temp_list[:-1])}"
|
|
if fn_rep[-1:] == ' ':
|
|
fn_rep = fn_rep[:-1]
|
|
fn_rep = f'{fn_rep}{temp_list[-1:][0]}'
|
|
try:
|
|
os.rename(os.path.join(path,filename), os.path.join(path, fn_rep))
|
|
print(f'{Cl.green}[Renamed]{Cl.normal} {filename} > {fn_rep}')
|
|
except Exception as e:
|
|
print(f"{Cl.red}[Error]]{Cl.normal} {e}")
|
|
|
|
if __name__ == "__main__":
|
|
playlists_done = 0
|
|
for playlist in music_paths:
|
|
plistname = get_filename(playlist)
|
|
print_title((f'[{playlists_done+1}/{len(music_paths)}] {plistname}'), f'Processing playlist - {Cl.green}{plistname}{Cl.normal}')
|
|
if not plistname in exclude:
|
|
rename_youtube(playlist)
|
|
audio_file_list = process_playlist_images(playlist)
|
|
delete_all(delete_list); delete_list = []
|
|
process_playlist_audio_files(audio_file_list, playlists_done)
|
|
else:
|
|
print(f'{Cl.yellow}[Skipping]{Cl.normal} {get_filename(playlist)} - Exclude list')
|
|
playlists_done += 1
|
|
t_dict={}
|
|
if make_playlist_torrents:
|
|
print_title((f'Hashing torrents'), '')
|
|
playlists_finished = 0
|
|
for playlist in music_paths:
|
|
playlists_finished += 1
|
|
p1, p2 = torf_torrent(playlist, tracker_list, playlists_finished, music_paths); t_dict[p1]=p2
|
|
if len(torrent_directory) != 0:
|
|
print_title('preparing for seeding', 'symlinks data, copies .torrent file')
|
|
restart_torrents = []
|
|
for w in music_paths:
|
|
source_file = os.path.join(script_directory, 'static', subpath, f'{os.path.basename(w)}.torrent')
|
|
source_file_base = os.path.basename(source_file)
|
|
t_file_current = os.path.join(torrent_directory, source_file_base)
|
|
try:
|
|
if get_infohash(t_file_current).casefold() == (get_infohash(source_file)).casefold():
|
|
pass
|
|
else:
|
|
delete_file(t_file_current)
|
|
restart_torrents.append(w)
|
|
# print(get_infohash(t_file_current))
|
|
# print(get_infohash(source_file))
|
|
except Exception as e:
|
|
pass
|
|
if len(restart_torrents) > 0:
|
|
rs_torrents_filenames = []
|
|
for q in restart_torrents:
|
|
rs_torrents_filenames.append(get_filename(q))
|
|
print(rs_torrents_filenames)
|
|
print_title(f'Waiting {wait_before_adding_torrent} seconds', f"{Cl.green}New torrents:{Cl.normal} {' | '.join(rs_torrents_filenames)}")
|
|
print(f'Allowing time for torrent client to recognize change...')
|
|
time.sleep(wait_before_adding_torrent)
|
|
for w in music_paths:
|
|
source_file = os.path.join(script_directory, 'static', subpath, f'{os.path.basename(w)}.torrent')
|
|
source_file_base = os.path.basename(source_file)
|
|
file_name = get_filename(w)
|
|
symlink_name = os.path.join(torrent_directory, file_name)
|
|
t_file_current = os.path.join(torrent_directory, source_file_base)
|
|
if w in restart_torrents or not os.path.exists(symlink_name):
|
|
try:
|
|
os.symlink(w, symlink_name)
|
|
print(f'{Cl.green}[symlink made]{Cl.normal} {file_name} -> {w}')
|
|
except FileExistsError:
|
|
print('hash changed')
|
|
print(f'{Cl.green}[Symlink exists, replacing]{Cl.normal} {file_name} -> {w}')
|
|
delete_file(symlink_name)
|
|
os.symlink(w, symlink_name)
|
|
if w in restart_torrents or not os.path.exists(t_file_current):
|
|
print(f'{Cl.green}[Copying .torrent file]{Cl.normal}')
|
|
copy_single_file(source_file, torrent_directory)
|
|
else:
|
|
print(f'{Cl.green}[Infohash unchanged] Not copying new torrent file')
|
|
else:
|
|
print(f"{Cl.red}[No 'torrent_directory' found]{Cl.normal} Will not create symlinks/copy .torrent file.")
|
|
print(f'{Cl.green}[data.json]{Cl.normal} Updating the data.json file')
|
|
generate_data_json(music_paths, t_dict)
|
|
|
|
endpoint_path = os.path.join(script_directory, '.endpoint')
|
|
if os.path.exists(endpoint_path):
|
|
endpoint = read_txt_file(os.path.join(script_directory, '.endpoint'))[0]
|
|
api_key = random_string(40)
|
|
update_api(api_key, '.apikey')
|
|
payload = {
|
|
'key': api_key,
|
|
'command': 'refresh',
|
|
}
|
|
url_ = f'http://{hostname}:{gunicorn_port}/{endpoint}'
|
|
sent_ = send_message(payload, url_)[0]
|
|
if not sent_:
|
|
print(f"{Cl.red}[Trying flask port {site_port}]{Cl.normal} Couldn't reach gunicorn on port {gunicorn_port}")
|
|
url_ = f'http://{hostname}:{site_port}/{endpoint}'
|
|
sent_ = send_message(payload, url_)[0]
|
|
if not sent_:
|
|
print(f"{Cl.red}[Couldn't reach site]{Cl.normal} If it's running, check your 'gunicorn_port' and 'site_port' in your config file. Script is checking on ports {gunicorn_port} and {site_port}")
|
|
if sent_:
|
|
print(f'{Cl.green}[Refreshing site]{Cl.normal} Command sent')
|
|
print(f'{Cl.green}[Done]{Cl.normal}')
|
|
else:
|
|
print(f"{Cl.red}[Can't refresh webapp]]{Cl.normal} No file '.endpoint' found in script directory, is webapp running?")
|