"""Loads and registers modules""" # Friendly Telegram (telegram userbot) # Copyright (C) 2018-2021 The Authors # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # █ █ ▀ █▄▀ ▄▀█ █▀█ ▀ ▄▀█ ▀█▀ ▄▀█ █▀▄▀█ ▄▀█ # █▀█ █ █ █ █▀█ █▀▄ █ ▄ █▀█ █ █▀█ █ ▀ █ █▀█ # # © Copyright 2022 # # https://t.me/hikariatama # # 🔒 Licensed under the GNU GPLv3 # 🌐 https://www.gnu.org/licenses/agpl-3.0.html # scope: inline import asyncio import importlib import inspect import logging import os import re import sys import urllib import uuid from importlib.machinery import ModuleSpec import telethon from telethon.tl.types import Message from typing import Union, List from aiogram.types import CallbackQuery from collections import ChainMap import requests from .. import loader, utils, main from ..compat import geek from ..inline.types import InlineMessage logger = logging.getLogger(__name__) VALID_URL = r"[-[\]_.~:/?#@!$&'()*+,;%<=>a-zA-Z0-9]+" VALID_PIP_PACKAGES = re.compile( r"^\s*# ?requires:(?: ?)((?:{url} )*(?:{url}))\s*$".format(url=VALID_URL), re.MULTILINE, ) USER_INSTALL = "PIP_TARGET" not in os.environ and "VIRTUAL_ENV" not in os.environ GIT_REGEX = re.compile( r"^https?://github\.com((?:/[a-z0-9-]+){2})(?:/tree/([a-z0-9-]+)((?:/[a-z0-9-]+)*))?/?$", flags=re.IGNORECASE, ) def unescape_percent(text): i = 0 ln = len(text) is_handling_percent = False out = "" while i < ln: char = text[i] if char == "%" and not is_handling_percent: is_handling_percent = True i += 1 continue if char == "d" and is_handling_percent: out += "." is_handling_percent = False i += 1 continue out += char is_handling_percent = False i += 1 return out def get_git_api(url): m = GIT_REGEX.search(url) if m is None: return None branch = m.group(2) path_ = m.group(3) api_url = "https://api.github.com/repos{}/contents".format(m.group(1)) if path_ is not None and len(path_) > 0: api_url += path_ if branch: api_url += f"?ref={branch}" return api_url @loader.tds class LoaderMod(loader.Module): """Loads modules""" strings = { "name": "Loader", "repo_config_doc": "Fully qualified URL to a module repo", "avail_header": "📲 Official modules from repo", "select_preset": "⚠️ Please select a preset", "no_preset": "🚫 Preset not found", "preset_loaded": "✅ Preset loaded", "no_module": "🚫 Module not available in repo.", "no_file": "🚫 File not found", "provide_module": "⚠️ Provide a module to load", "bad_unicode": "🚫 Invalid Unicode formatting in module", "load_failed": "🚫 Loading failed. See logs for details", "loaded": "🪁 Module {}{} loaded.{}", "no_class": "What class needs to be unloaded?", "unloaded": "🔥 Module unloaded.", "not_unloaded": "🚫 Module not unloaded.", "requirements_failed": "🚫 Requirements installation failed", "requirements_installing": "🔄 Installing requirements...", "requirements_restart": "🔄 Requirements installed, but a restart is required", "all_modules_deleted": "✅ All modules deleted", "single_cmd": "\n📍 {}{} 👉🏻 ", "undoc_cmd": "👁‍🗨 No docs", "ihandler": "\n🎹 Inline: {} 👉🏻 ", "undoc_ihandler": "👁‍🗨 No docs", "inline_init_failed": ( "🚫 This module requires Hikka inline feature and " "initialization of InlineManager failed\n" "Please, remove one of your old bots from @BotFather and " "restart userbot to load this module" ), "version_incompatible": "🚫 This module requires Hikka {}+\nPlease, update with .update", "ffmpeg_required": "🚫 This module requires FFMPEG, which is not installed", "developer": "\n\n🧑‍💻 Developer: {}", "module_fs": "💿 Would you like to save this module to filesystem, so it won't get unloaded after restart?", "save": "💿 Save", "no_save": "🚫 Don't save", "save_for_all": "💽 Always save to fs", "never_save": "🚫 Never save to fs", "will_save_fs": "💽 Now all modules, loaded with .loadmod will be saved to filesystem", "add_repo_config_doc": "Additional repos to load from, separated with «|»", } def __init__(self): super().__init__() self.config = loader.ModuleConfig( "MODULES_REPO", "https://mods.hikariatama.ru/", lambda: self.strings("repo_config_doc"), "ADDITIONAL_REPOS", # Currenly the trusted developers are specified ( "https://github.com/hikariatama/host/raw/master/|" "https://github.com/MoriSummerz/ftg-mods/raw/main/|" "https://gitlab.com/CakesTwix/friendly-userbot-modules/-/raw/master/" ), lambda: self.strings("add_repo_config_doc"), ) def _update_modules_in_db(self) -> None: self._db.set( __name__, "loaded_modules", { module.__class__.__name__: module.__origin__ for module in self.allmodules.modules if module.__origin__.startswith("http") }, ) @loader.owner async def dlmodcmd(self, message: Message) -> None: """Downloads and installs a module from the official module repo""" if args := utils.get_args(message): args = args[0] if urllib.parse.urlparse(args[0]).netloc else args[0].lower() await self.download_and_install(args, message) self._update_modules_in_db() else: await self.inline.list( message, [ self.strings("avail_header") + f"\n☁️ {repo.strip('/')}\n\n" + "\n".join( [ " ".join(chunk).strip(" | ") for chunk in utils.chunks( [ f"{i} | " for i in sorted( [ utils.escape_html( i.split("/")[-1].split(".")[0] ) for i in mods.values() ] ) ], 5, ) ] ) for repo, mods in (await self.get_repo_list("full")).items() ], ) @loader.owner async def dlpresetcmd(self, message: Message) -> None: """Set modules preset""" args = utils.get_args(message) if not args: await utils.answer(message, self.strings("select_preset")) return try: await self.get_repo_list(args[0]) except requests.exceptions.HTTPError as e: if e.response.status_code == 404: await utils.answer(message, self.strings("no_preset")) return raise self._db.set(__name__, "chosen_preset", args[0]) self._db.set(__name__, "loaded_modules", {}) await utils.answer(message, self.strings("preset_loaded")) await self.allmodules.commands["restart"](await message.reply("_")) async def _get_modules_to_load(self): possible_mods = ( await self.get_repo_list(self._db.get(__name__, "chosen_preset", None)) ).values() todo = dict(ChainMap(*possible_mods)) todo.update(**self._db.get(__name__, "loaded_modules", {})) return todo async def _get_repo(self, repo: str, preset: str) -> str: res = await utils.run_sync( requests.get, f'{repo.strip("/")}/{preset}.txt', ) if not str(res.status_code).startswith("2"): logger.debug(f"Can't load {repo=}, {preset=}, {res.status_code=}") return "" return res.text async def get_repo_list(self, preset=None): if preset is None or preset == "none": preset = "minimal" return { repo: { f"Preset_mod_{repo_id}_{i}": f'{repo.strip("/")}/{link}.py' for i, link in enumerate( set( filter( lambda x: x, (await self._get_repo(repo, preset)).split("\n"), ) ) ) } for repo_id, repo in enumerate( [self.config["MODULES_REPO"]] + self.config["ADDITIONAL_REPOS"].split("|") ) if repo.startswith("http") } async def download_and_install(self, module_name, message=None): try: if urllib.parse.urlparse(module_name).netloc: url = module_name else: links = list( dict( ChainMap(*list((await self.get_repo_list("full")).values())) ).values() ) try: url = next( link for link in links if link.endswith(f"{module_name}.py") ) except Exception: if message is not None: await utils.answer(message, self.strings("no_module")) return False r = await utils.run_sync(requests.get, url) if r.status_code == 404: if message is not None: await utils.answer(message, self.strings("no_module")) return False r.raise_for_status() return await self.load_module( r.content.decode("utf-8"), message, module_name, url, ) except Exception: logger.exception(f"Failed to load {module_name}") async def _inline__load( self, call: CallbackQuery, doc: str, path_: Union[str, None], mode: str, ) -> None: save = False if mode == "all_yes": self._db.set(main.__name__, "permanent_modules_fs", True) self._db.set(main.__name__, "disable_modules_fs", False) await call.answer(self.strings("will_save_fs")) save = True elif mode == "all_no": self._db.set(main.__name__, "disable_modules_fs", True) self._db.set(main.__name__, "permanent_modules_fs", False) elif mode == "once": save = True if path_ is not None: await self.load_module(doc, call, origin=path_, save_fs=save) else: await self.load_module(doc, call, save_fs=save) @loader.owner async def loadmodcmd(self, message: Message) -> None: """Loads the module file""" msg = message if message.file else (await message.get_reply_message()) if msg is None or msg.media is None: if args := utils.get_args(message): try: path_ = args[0] with open(path_, "rb") as f: doc = f.read() except FileNotFoundError: await utils.answer(message, self.strings("no_file")) return else: await utils.answer(message, self.strings("provide_module")) return else: path_ = None doc = await msg.download_media(bytes) logger.debug("Loading external module...") try: doc = doc.decode("utf-8") except UnicodeDecodeError: await utils.answer(message, self.strings("bad_unicode")) return if not self._db.get( main.__name__, "disable_modules_fs", False, ) and not self._db.get(main.__name__, "permanent_modules_fs", False): if message.file: await message.edit("") message = await message.respond("🌘") if await self.inline.form( self.strings("module_fs"), message=message, reply_markup=[ [ { "text": self.strings("save"), "callback": self._inline__load, "args": (doc, path_, "once"), }, { "text": self.strings("no_save"), "callback": self._inline__load, "args": (doc, path_, "no"), }, ], [ { "text": self.strings("save_for_all"), "callback": self._inline__load, "args": (doc, path_, "all_yes"), } ], [ { "text": self.strings("never_save"), "callback": self._inline__load, "args": (doc, path_, "all_no"), } ], ], ): return if path_ is not None: await self.load_module( doc, message, origin=path_, save_fs=self._db.get(main.__name__, "permanent_modules_fs", False) and not self._db.get(main.__name__, "disable_modules_fs", False), ) else: await self.load_module( doc, message, save_fs=self._db.get(main.__name__, "permanent_modules_fs", False) and not self._db.get(main.__name__, "disable_modules_fs", False), ) async def load_module( self, doc: str, message: Message, name: Union[str, None] = None, origin: str = "", did_requirements: bool = False, save_fs: bool = False, ) -> None: if any( line.replace(" ", "") == "#scope:ffmpeg" for line in doc.splitlines() ) and os.system("ffmpeg -version 1>/dev/null 2>/dev/null"): if isinstance(message, Message): await utils.answer(message, self.strings("ffmpeg_required")) return if ( any(line.replace(" ", "") == "#scope:inline" for line in doc.splitlines()) and not self.inline.init_complete ): if isinstance(message, Message): await utils.answer(message, self.strings("inline_init_failed")) return if re.search(r"# ?scope: ?hikka_min", doc): ver = re.search( r"# ?scope: ?hikka_min ([0-9]+\.[0-9]+\.[0-9]+)", doc, ).group(1) ver_ = tuple(map(int, ver.split("."))) if main.__version__ < ver_: if isinstance(message, Message): await utils.answer( message, self.strings("version_incompatible").format(ver), ) return developer = re.search(r"# ?meta developer: ?(.+)", doc) developer = developer.group(1) if developer else False developer = self.strings("developer").format(developer) if developer else "" if name is None: uid = "__extmod_" + str(uuid.uuid4()) else: if name.startswith(self.config["MODULES_REPO"]): name = name.split("/")[-1].split(".py")[0] uid = name.replace("%", "%%").replace(".", "%d") module_name = "hikka.modules." + uid doc = geek.compat(doc) try: try: spec = ModuleSpec( module_name, loader.StringLoader(doc, origin), origin=origin, ) instance = self.allmodules.register_module( spec, module_name, origin, save_fs=save_fs, ) except ImportError as e: logger.info( "Module loading failed, attemping dependency installation", exc_info=True, ) # Let's try to reinstall dependencies try: requirements = list( filter( lambda x: x and x[0] not in ("-", "_", "."), map( str.strip, VALID_PIP_PACKAGES.search(doc)[1].split(" "), ), ) ) except TypeError: logger.warning("No valid pip packages specified in code, attemping installation from error") # fmt: skip requirements = [e.name] logger.debug(f"Installing requirements: {requirements}") if not requirements: raise Exception("Nothing to install") from e if did_requirements: if message is not None: await utils.answer( message, self.strings("requirements_restart"), ) return if message is not None: await utils.answer( message, self.strings("requirements_installing"), ) pip = await asyncio.create_subprocess_exec( sys.executable, "-m", "pip", "install", "--upgrade", "-q", "--disable-pip-version-check", "--no-warn-script-location", *["--user"] if USER_INSTALL else [], *requirements, ) rc = await pip.wait() if rc != 0: if message is not None: await utils.answer( message, self.strings("requirements_failed"), ) return importlib.invalidate_caches() return await self.load_module( doc, message, name, origin, True, save_fs, ) # Try again except loader.LoadError as e: self.allmodules.modules.remove(instance) if message: await utils.answer(message, f"🚫 {utils.escape_html(str(e))}") return except BaseException as e: logger.exception(f"Loading external module failed due to {e}") if message is not None: await utils.answer(message, self.strings("load_failed")) return instance.inline = self.inline instance.animate = self._animate for method in dir(instance): if isinstance(getattr(instance, method), loader.InfiniteLoop): setattr(getattr(instance, method), "module_instance", instance) logger.debug(f"Added {instance=} to {method=}") if hasattr(instance, "__version__") and isinstance(instance.__version__, tuple): version = f" (v{'.'.join(list(map(str, list(instance.__version__))))})" else: version = "" try: try: self.allmodules.send_config_one(instance, self._db, self.translator) await self.allmodules.send_ready_one( instance, self._client, self._db, self.allclients, no_self_unload=True, from_dlmod=bool(message), ) except loader.LoadError as e: self.allmodules.modules.remove(instance) if message: await utils.answer(message, f"🚫 {utils.escape_html(str(e))}") return except loader.SelfUnload as e: logging.debug(f"Unloading {instance}, because it raised `SelfUnload`") self.allmodules.modules.remove(instance) if message: await utils.answer(message, f"🚫 {utils.escape_html(str(e))}") return except Exception as e: logger.exception(f"Module threw because {e}") if message is not None: await utils.answer(message, self.strings("load_failed")) return if message is not None: try: modname = instance.strings("name") except KeyError: modname = getattr(instance, "name", "ERROR") modhelp = "" if instance.__doc__: modhelp += ( f"\nℹ️ {utils.escape_html(inspect.getdoc(instance))}\n" ) if any( line.replace(" ", "") == "#scope:disable_onload_docs" for line in doc.splitlines() ): await utils.answer( message, self.strings("loaded").format( modname.strip(), version, modhelp, ) + developer, ) return for _name, fun in sorted( instance.commands.items(), key=lambda x: x[0], ): modhelp += self.strings("single_cmd").format(self.get_prefix(), _name) if fun.__doc__: modhelp += utils.escape_html(inspect.getdoc(fun)) else: modhelp += self.strings("undoc_cmd") if self.inline.init_complete: if hasattr(instance, "inline_handlers"): for _name, fun in sorted( instance.inline_handlers.items(), key=lambda x: x[0], ): modhelp += self.strings("ihandler").format( f"@{self.inline.bot_username} {_name}" ) if fun.__doc__: modhelp += utils.escape_html( "\n".join( [ line.strip() for line in inspect.getdoc(fun).splitlines() if not line.strip().startswith("@") ] ) ) else: modhelp += self.strings("undoc_ihandler") try: await utils.answer( message, self.strings("loaded").format( modname.strip(), version, modhelp, ) + developer, ) except telethon.errors.rpcerrorlist.MediaCaptionTooLongError: await message.reply( self.strings("loaded").format( modname.strip(), version, modhelp, ) + developer ) return @loader.owner async def unloadmodcmd(self, message: Message) -> None: """Unload module by class name""" args = utils.get_args_raw(message) if not args: await utils.answer(message, self.strings("no_class")) return worked = self.allmodules.unload_module(args) self._db.set( __name__, "loaded_modules", { mod: link for mod, link in self._db.get(__name__, "loaded_modules", {}).items() if mod not in worked }, ) await utils.answer( message, self.strings("unloaded" if worked else "not_unloaded"), ) async def _animate( self, message: Union[Message, InlineMessage], frames: List[str], interval: Union[float, int], *, inline: bool = False, ) -> None: """ Animate message :param message: Message to animate :param frames: A List of strings which are the frames of animation :param interval: Animation delay :param inline: Whether to use inline bot for animation :returns message: Please, note that if you set `inline=True`, first frame will be shown with an empty button due to the limitations of Telegram API """ if interval < 0.1: logger.warning("Resetting animation interval to 0.1s, because it may get you in floodwaits bro") # fmt: skip interval = 0.1 for frame in frames: if isinstance(message, Message): if inline: message = await self.inline.form( message=message, text=frame, reply_markup={"text": "\u0020\u2800", "data": "empty"}, ) else: message = await utils.answer(message, frame) elif isinstance(message, InlineMessage) and inline: await message.edit(frame) await asyncio.sleep(interval) return message @loader.owner async def clearmodulescmd(self, message: Message) -> None: """Delete all installed modules""" self._db.set(__name__, "loaded_modules", {}) for file in os.scandir(loader.LOADED_MODULES_DIR): os.remove(file) self._db.set(__name__, "chosen_preset", "none") await utils.answer(message, self.strings("all_modules_deleted")) await self.allmodules.commands["restart"](await message.reply("_")) async def _update_modules(self): todo = await self._get_modules_to_load() for mod in todo.values(): await self.download_and_install(mod) self._update_modules_in_db() async def client_ready(self, client, db): self._db = db self._client = client # Legacy db migration if isinstance(self._db.get(__name__, "loaded_modules", {}), list): self._db.set( __name__, "loaded_modules", { f"Loaded_module_{i}": link for i, link in enumerate( self._db.get(__name__, "loaded_modules", {}) ) }, ) await self._update_modules()