i2pnews/functions.py

356 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
import configparser
import os
import glob
import markdown
from feedgen.feed import FeedGenerator
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
import bleach
from random import randrange
import random
import re
import requests
# from app import script_directory
allowed_tags = ''
script_directory = os.path.dirname(os.path.abspath(__file__))
def is_valid_path(path):
return os.path.exists(path)
def config_load(config_path):
config = configparser.ConfigParser()
config.read(config_path)
try:
hostname = (config['proxy']['hostname'])
# port = (config['proxy']['port'])
http_tunnel = int((config['proxy']['http_tunnel']))
channel = (config['rss']['channel'])
primary = (config['rss']['primary'])
auth_users = (config['rss']['auth_users']).split(', ')
news_bot = (config['rss']['news_bot'])
news_pass = (config['rss']['news_pass'])
news_port = int((config['rss']['news_irc_port']))
news_rss_port = int((config['rss']['news_rss_flask_port']))
gunicorn_port = int((config['rss']['gunicorn_port']))
news_email = (config['rss']['news_email'])
bot_send = (config['rss']['bot_send'])
ah = (config['rss']['news_rss_ah'])
except (configparser.NoSectionError, configparser.NoOptionError, KeyError) as e:
if is_valid_path(config_path):
print(f"Error reading config file: {e}")
else:
print(f"The 'config.ini' file was not found{e}")
except (ValueError) as e:
print(f"Cannot accept value: {e}")
return hostname, channel, primary, auth_users, news_bot, news_pass, news_port, news_rss_port, http_tunnel, news_email, gunicorn_port, bot_send, ah
def load_settings_ini(config_path):
config = configparser.ConfigParser()
config.read(config_path)
get_sections = config.sections()
try:
# page_load_limit = (config['limits']['page_load_limit'])
config_dict = {}
for item in get_sections:
try:
my_list_str = (config[item]['s'])
x_list = [item.strip() for item in my_list_str.split('\n') if item.strip()]
if x_list == []:
x_list = ['']
x = 0
for y in x_list:
if y == 'none':
x_list[x] = ''
x += 1
config_dict[item] = x_list
except (configparser.NoSectionError, configparser.NoOptionError) as e:
if is_valid_path(config_path):
print_1(f"Error reading config file: {e}")
else:
my_list = []
config_dict[item] = my_list
except (configparser.NoSectionError, configparser.NoOptionError, KeyError) as e:
if is_valid_path(config_path):
print(f"Error reading config file: {e}")
else:
print(f"The 'config.ini' file was not found{e}")
except (ValueError) as e:
print(f"Cannot accept value: {e}")
return(config_dict)
def sanitize_input(input_str):
x1 = bleach.clean(input_str, tags=allowed_tags)
sanitized_str = re.sub(r'<script\b[^>]*>(.*?)</script>', '', x1, flags=re.IGNORECASE)
return sanitized_str
def get_svg(absolute_location, filename):
# try:
if filename == '':
filename = 'guides'
if filename == 'projects/0002-09-21-2023-Federated Torrents':
filename = 'development'
with open(f'{absolute_location}/static/icons/{str(filename)}.svg', 'r') as file:
icon = file.read().replace('\n', '')
return icon
def generate_header_links(page_link, page_title, script_directory):
if len(page_link) > 1:
links_string_div = ''
for item in page_link:
split = item.split(':')
icon = get_svg(script_directory, split[2])
links_string_div = f'''{links_string_div}<a href="{split[1]}"><div class="svg">{icon}</div>{split[0]}</a>'''
icon = get_svg(script_directory, page_title.lower())
links_string_div = f'''<div class="dropdown"><button class="dropbtn"><div class="svg">{icon}</div>{page_title}<i class="fa fa-caret-down"></i></button><div class="dropdown-content">{links_string_div}</div></div>'''
else:
split = page_link[0].split(':')
icon = get_svg(script_directory, split[1])
links_string_div = f'''<a href="{split[0]}"><div class="svg">{icon}</div>{page_title}</a>'''
return links_string_div
def header_links_update(links_dict, saved):
links_string_div = []
for key in links_dict:
if saved != 0 and key == 'Saved':
links_string_div.append(generate_header_links(links_dict[key], f'{key}: {str(saved)}', script_directory))
else:
links_string_div.append(generate_header_links(links_dict[key], key, script_directory))
header_links_string = ''.join(links_string_div)
header_links = f'<div class="flex-container-button"><div class="navbar">{header_links_string}</div></div></div>'
return header_links
def logger_list(path, rev):
file_type = ['.txt']
file_list = []
post_title = []
body_preview = []
post_id = []
post_date = []
file_name = []
search = [os.path.join(path, f"*{ext}") for ext in file_type]
for pattern in search:
file_list.extend(glob.glob(pattern))
file_list.sort(reverse=rev)
for item in file_list:
remove_ext, _, _ = item.partition('.')
filename = os.path.basename(remove_ext)
file_name.append(filename)
try:
with open(item, 'r') as f:
tempMd= f.read()
body_preview.append(f"{markdown.markdown(tempMd)[3:250]}...")
except (FileNotFoundError) as e:
body_preview.append('ERROR - no content found for this!')
return file_list, body_preview, file_name
def get_avatar_list():
print(script_directory)
file_type = ['.svg']
file_list = []
file_name = []
return_list = []
path_to_avatars = f'{script_directory}/static/avatars/'
print(path_to_avatars)
search = [os.path.join(f'{path_to_avatars}', f"*{ext}") for ext in file_type]
for pattern in search:
file_list.extend(glob.glob(pattern))
for item in file_list:
remove_ext, _, _ = item.partition('.')
filename = os.path.basename(remove_ext)
return_list.append(filename)
print(return_list)
return return_list
def color_nick_replay(nick, avatar):
relays = len(relay_list_color_dict)
relayed_user = nick.split('/')
try:
if relayed_user[1]:
for key in relay_list_color_dict:
if relayed_user[1] == key:
nick_color = f"<{relay_list_color_dict[key]}{avatar} {nick}</{relay_list_color_dict[key]}"
except Exception as e:
nick_color = f"<{relay_list_color_dict['SIMP']}{avatar} {nick}</{relay_list_color_dict['SIMP']}"
return nick_color
def get_avatar(absolute_location, x):
with open(f'{absolute_location}/static/avatars/{str(x)}.svg', 'r') as file:
avatar = file.read().replace('\n', '')
return avatar
def binary_feed_convert(nick):
hashed = hash(nick)
x = random.choice(avatars)
return hashed, x
def color_feed_nick(nick, avatar, topic):
relays = len(relay_list_color_dict)
relayed_user = nick.split(':')
try:
if relayed_user[1]:
nick_raw = relayed_user[0]
for key in relay_list_color_dict:
if relayed_user[1] == key:
nick_color = f"<{relay_list_color_dict[key]}{avatar} {relayed_user[0]}</{relay_list_color_dict[key]}"
except Exception as e:
nick_color = f"<{relay_list_color_dict['blog']}{avatar} {relayed_user}</{relay_list_color_dict['blog']}"
return nick_color, nick_raw
def get_feed_timestamp():
timestamp_now = "{:%Y-%m-%d %H:%M:%S}".format(datetime.now())
year = int(timestamp_now[:4])
month = int(timestamp_now[5:7])
day = int(timestamp_now[8:10])
hour = int(timestamp_now[11:13])
minute = int(timestamp_now[14:16])
timestamp = datetime(year, month, day, hour, minute, 0, 0, tzinfo=ZoneInfo(key='UTC'))
return timestamp
def generate_rss_feed():
list_md = os.path.join(script_directory, 'static/rssfeed/')
file_list, body_preview, file_name = logger_list(list_md, True)
for x in range(0, len(file_list)):
file_name = os.path.splitext(os.path.basename(file_list[x]))[0]
fg = FeedGenerator()
if file_name == 'all':
feed_name = 'feed'
else:
feed_name = file_name
fg.id(f'http://i2pnews.simp.i2p/{feed_name}')
fg.title('I2P News')
fg.author( {'name':f'i2pnews - {file_name}','email':'simp@mail.i2p'} )
fg.link( href='http://i2pnews.simp.i2p', rel='alternate' )
fg.subtitle('I2P RSS Feed Aggregator')
fg.link( href=f'http://i2pnews.simp.i2p/{feed_name}', rel='self' )
# atomfeed = fg.atom_str(pretty=True)
rssfeed = fg.rss_str(pretty=True)
logger_line_list = []
txt_file = os.path.join(script_directory, f'{file_list[x]}')
file_object_r = open(txt_file, 'r')
read_file_r = file_object_r.read()
logger_line_list = read_file_r.splitlines(True)
if len(logger_line_list) >= 15:
lines_int = 15
else:
lines_int = len(logger_line_list)
lines_list = []
for i in range(0, lines_int):
lines_list = logger_line_list[i].split(' ')
if len(lines_list) > 2:
try:
for v in range(0, 2):
lines_list.pop(v)
lines_list.pop(0)
lines_joined = ' '.join(lines_list)
title = (lines_joined.split('@@@'))[1]
for e in range(0, len(lines_list)):
if '::' == lines_list[e][:2]:
lines_list[e] = f'{lines_list[e][2:]}'
link = lines_list[e]
except Exception as e:
pass
fe = fg.add_entry()
fe.id(f'http://i2pnews.simp.i2p/{feed_name}')
fe.title(title)
# fe.description('This is a description')
fe.link(href=link)
fe.updated(get_feed_timestamp())
atomfeed = fg.atom_str(pretty=True)
rssfeed = fg.rss_str(pretty=True)
# fg.atom_file(f'atom_{file_name}.xml')
fg.rss_file(f'rss_{file_name}.xml')
file_object_r.close()
def get_short_url(long_url):
host_port = long_url
p = '(?:http.*://)?(?P<host>[^:/ ]+).?(?P<port>[0-9]*).*'
m = re.search(p,long_url)
short = m.group('host')
port = m.group('port')
if port != '':
x = f':{port}'
else:
x = ''
url = f'{short}{x}'
return url
def get_host(long_url):
host_port = long_url
p = '(?:http.*://)?(?P<host>[^:/ ]+).?(?P<port>[0-9]*).*'
m = re.search(p,long_url)
short = m.group('host')
return short
def format_msg(single_msg_raw, topic):
single_msg = single_msg_raw[:-1]
l = single_msg.split(' ')
# print(l)
time_date_format = ':%Y-%m-%d %H:%M:%S'
try:
time_obj = datetime.strptime(f':{l[1]} {l[2]}', time_date_format) + timedelta(hours=5)
time = time_obj.strftime('%d-%m-%Y %H:%M')
except Exception as e:
return ''
hashed, x = binary_feed_convert(l[0])
try:
if hashed_feeds[hashed]:
avatar = get_avatar(script_directory, hashed_feeds[hashed])
except Exception as e:
hashed_feeds[hashed] = x
avatar = get_avatar(script_directory, x)
nick, nick_raw = color_feed_nick(l[0], avatar, topic)
for i in range(0, 2):
l.pop(i)
l.pop(0)
post_url_list = []
for i in range(0, len(l)):
if '::' == l[i][:2]:
if l[i][:6].casefold() != '::http':
post_url = f'http://{nick_raw}{l[i][2:]}'
else:
post_url = (single_msg.split('::')[1]).split(' @@@')[0]
if get_host(post_url) == '127.0.0.1':
url_q = post_url.split(get_short_url(post_url))[1]
post_url = f'http://{nick_raw}{url_q}'
l[i] = ''
read_more = f'''<div contenteditable="false"><a href="{post_url}" target="_blank">Read More</a></div>'''
elif '@@@' == l[i][:3]:
l[i] = l[i][3:]
br = '<br>'
elif '<a' in l[i]:
l[i] = f'''<div contenteditable="false"><a'''
br = ''
elif '</a>' in l[i]:
l[i] = f'''{l[i]}</div>'''
br = ''
elif 'http://' in l[i] and 'href=' not in l[i]:
l[i] = f'''<div contenteditable="false"><a href="{l[i]}" target="_blank">{l[i]}</a></div>'''
br = ''
elif 'https://' in l[i] and 'href=' not in l[i]:
l[i] = f'''<div contenteditable="false"><a href="{l[i]}" target="_blank">{l[i]}</a></div>'''
br = ''
elif len(l[i]) > 3:
l[i].replace('<', '&lt;')
l[i].replace('>', '&gt;')
elif l[i] == '':
br = ''
else:
br = '<br>'
msg = f'''{' '.join(l)} {read_more}'''
formatted = f"{nick} <time>{time}</time><br>{msg}{br}"
return formatted
def send_message(broadcast_urls, payload):
for item in broadcast_urls:
try:
response = requests.post(item, json=payload)
if response.status_code == 200:
return True, 'Sent'
print('Message sent successfully')
else:
return False, f'Failed to send message. Status code: {response.status_code}'
print(f'Failed to send message. Status code: {response.status_code}')
except Exception as e:
return False, f'Failed to send {e}'
print("Failed to send.")
avatars = get_avatar_list()
hashed_feeds = {}
relay_list_color_dict = {
'news': 'intr>',
'developer': 'kytv>',
'blog': 'irc2p>',
'forum': 'ilita>',
'torrents': 'simp>',
}
def is_url_ours(url):
s = url.split('http://simp.i2p/info/')
if len(s) > 1:
ours = True
else:
ours = False
return ours