# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀ # █▀█ █ █ █ █▀█ █▀▄ █ # © Copyright 2022 # https://t.me/hikariatama # # 🔒 Licensed under the GNU AGPLv3 # 🌐 https://www.gnu.org/licenses/agpl-3.0.html # scope: inline import inspect import logging import os import random import time from io import BytesIO from typing import Union from telethon.tl.functions.channels import EditAdminRequest, InviteToChannelRequest from telethon.tl.types import ChatAdminRights, Message from .. import loader, main, utils from ..inline.types import InlineCall logger = logging.getLogger(__name__) if "DYNO" not in os.environ: DEBUG_MODS_DIR = os.path.join(utils.get_base_dir(), "debug_modules") if not os.path.isdir(DEBUG_MODS_DIR): os.mkdir(DEBUG_MODS_DIR, mode=0o755) for mod in os.scandir(DEBUG_MODS_DIR): os.remove(mod.path) @loader.tds class TestMod(loader.Module): """Perform operations based on userbot self-testing""" _memory = {} strings = { "name": "Tester", "set_loglevel": "🚫 Please specify verbosity as an integer or string", "no_logs": "ℹ️ You don't have any logs at verbosity {}.", "logs_filename": "hikka-logs.txt", "logs_caption": ( "🌌 Hikka logs with" " verbosity {}\n\n👋 Hikka version:" " {}.{}.{}{}\n" " Uptime: {}\n{}\n\n{} NoNick\n{} Grep\n{}" " InlineLogs" ), "suspend_invalid_time": ( "💀 Invalid time to" " suspend" ), "suspended": ( "🥶 Bot suspended" " for {} seconds" ), "results_ping": ( " Telegram ping:" " {} ms\n😎 Uptime: {}" ), "ping_hint": ( "💡 Telegram ping mostly" " depends on Telegram servers latency and other external factors and has" " nothing to do with the parameters of server on which userbot is" " installed" ), "confidential": ( "⚠️ Log level {} may reveal your confidential info," " be careful" ), "confidential_text": ( "⚠️ Log level {0} may reveal your confidential info," " be careful\nType .logs {0} force_insecure to" " ignore this warning" ), "choose_loglevel": "💁‍♂️ Choose log level", "bad_module": "🚫 Module not found", "debugging_enabled": ( "🧑‍💻 Debugging mode enabled for module {0}\nGo to" " directory named `debug_modules`, edit file named `{0}.py` and see changes" " in real time" ), "debugging_disabled": "✅ Debugging disabled", "heroku_debug": "🚫 Debugging is not available on Heroku", } strings_ru = { "set_loglevel": "🚫 Укажи уровень логов числом или строкой", "no_logs": "ℹ️ У тебя нет логов уровня {}.", "logs_filename": "hikka-logs.txt", "logs_caption": ( "🌌 Логи Hikka уровня" " {}\n\n👋" " Версия Hikka: {}.{}.{}{}\nUptime:" " {}\n{}\n\n{} NoNick\n{} Grep\n{}" " InlineLogs" ), "bad_module": "🚫 Модуль не найден", "debugging_enabled": ( "🧑‍💻 Режим разработчика включен для модуля" " {0}\nОтправляйся в директорию `debug_modules`," " изменяй файл `{0}.py`, и смотри изменения в режиме реального времени" ), "debugging_disabled": "✅ Режим разработчика выключен", "suspend_invalid_time": ( "💀 Неверное время" " заморозки" ), "suspended": ( "🥶 Бот заморожен на" " {} секунд" ), "results_ping": ( " Скорость отклика" " Telegram: {} ms\n😎 Прошло с последней" " перезагрузки: {}" ), "ping_hint": ( "💡 Скорость отклика" " Telegram в большей степени зависит от загруженности серверов Telegram и" " других внешних факторов и никак не связана с параметрами сервера, на" " который установлен юзербот" ), "confidential": ( "⚠️ Уровень логов {} может содержать личную" " информацию, будь осторожен" ), "confidential_text": ( "⚠️ Уровень логов {0} может содержать личную" " информацию, будь осторожен\nНапиши .logs {0}" " force_insecure, чтобы отправить логи игнорируя" " предупреждение" ), "choose_loglevel": "💁‍♂️ Выбери уровень логов", "_cmd_doc_dump": "Показать информацию о сообщении", "_cmd_doc_logs": ( "<уровень> - Отправляет лог-файл. Уровни ниже WARNING могут содержать" " личную инфомрацию." ), "_cmd_doc_suspend": "<время> - Заморозить бота на некоторое время", "_cmd_doc_ping": "Проверяет скорость отклика юзербота", "_cls_doc": "Операции, связанные с самотестированием", "heroku_debug": "🚫 Режим разработчика не доступен на Heroku", } def __init__(self): self.config = loader.ModuleConfig( loader.ConfigValue( "force_send_all", False, "⚠️ Do not touch, if you don't know what it does!\nBy default, Hikka" " will try to determine, which client caused logs. E.g. there is a" " module TestModule installed on Client1 and TestModule2 on Client2. By" " default, Client2 will get logs from TestModule2, and Client1 will get" " logs from TestModule. If this option is enabled, Hikka will send all" " logs to Client1 and Client2, even if it is not the one that caused" " the log.", validator=loader.validators.Boolean(), on_change=self._pass_config_to_logger, ), loader.ConfigValue( "tglog_level", "INFO", "⚠️ Do not touch, if you don't know what it does!\n" "Minimal loglevel for records to be sent in Telegram.", validator=loader.validators.Choice( ["INFO", "WARNING", "ERROR", "CRITICAL"] ), on_change=self._pass_config_to_logger, ), ) def _pass_config_to_logger(self): logging.getLogger().handlers[0].force_send_all = self.config["force_send_all"] logging.getLogger().handlers[0].tg_level = self.config["tglog_level"] @loader.command(ru_doc="Ответь на сообщение, чтобы показать его дамп") async def dump(self, message: Message): """Use in reply to get a dump of a message""" if not message.is_reply: return await utils.answer( message, "" + utils.escape_html((await message.get_reply_message()).stringify()) + "", ) @loader.loop(interval=1) async def watchdog(self): if not os.path.isdir(DEBUG_MODS_DIR): return try: for module in os.scandir(DEBUG_MODS_DIR): last_modified = os.stat(module.path).st_mtime cls_ = module.path.split("/")[-1].split(".py")[0] if cls_ not in self._memory: self._memory[cls_] = last_modified continue if self._memory[cls_] == last_modified: continue self._memory[cls_] = last_modified logger.debug(f"Reloading debug module {cls_}") with open(module.path, "r") as f: try: await next( module for module in self.allmodules.modules if module.__class__.__name__ == "LoaderMod" ).load_module( f.read(), None, save_fs=False, ) except Exception: logger.exception("Failed to reload module in watchdog") except Exception: logger.exception("Failed debugging watchdog") return @loader.command( ru_doc=( "[модуль] - Для разработчиков: открыть модуль в режиме дебага и применять" " изменения из него в режиме реального времени" ) ) async def debugmod(self, message: Message): """[module] - For developers: Open module for debugging You will be able to track changes in real-time""" if "DYNO" in os.environ: await utils.answer(message, self.strings("heroku_debug")) return args = utils.get_args_raw(message) instance = None for module in self.allmodules.modules: if ( module.__class__.__name__.lower() == args.lower() or module.strings["name"].lower() == args.lower() ): if os.path.isfile( os.path.join( DEBUG_MODS_DIR, f"{module.__class__.__name__}.py", ) ): os.remove( os.path.join( DEBUG_MODS_DIR, f"{module.__class__.__name__}.py", ) ) try: delattr(module, "hikka_debug") except AttributeError: pass await utils.answer(message, self.strings("debugging_disabled")) return module.hikka_debug = True instance = module break if not instance: await utils.answer(message, self.strings("bad_module")) return with open( os.path.join( DEBUG_MODS_DIR, f"{instance.__class__.__name__}.py", ), "wb", ) as f: f.write(inspect.getmodule(instance).__loader__.data) await utils.answer( message, self.strings("debugging_enabled").format(instance.__class__.__name__), ) @loader.command(ru_doc="<уровень> - Показать логи") async def logs( self, message: Union[Message, InlineCall], force: bool = False, lvl: Union[int, None] = None, ): """ - Dump logs""" if not isinstance(lvl, int): args = utils.get_args_raw(message) try: try: lvl = int(args.split()[0]) except ValueError: lvl = getattr(logging, args.split()[0].upper(), None) except IndexError: lvl = None if not isinstance(lvl, int): try: if not self.inline.init_complete or not await self.inline.form( text=self.strings("choose_loglevel"), reply_markup=[ [ { "text": "🚨 Critical", "callback": self.logs, "args": (False, 50), }, { "text": "🚫 Error", "callback": self.logs, "args": (False, 40), }, ], [ { "text": "⚠️ Warning", "callback": self.logs, "args": (False, 30), }, { "text": "ℹ️ Info", "callback": self.logs, "args": (False, 20), }, ], [ { "text": "🧑‍💻 Debug", "callback": self.logs, "args": (False, 10), }, { "text": "👁 All", "callback": self.logs, "args": (False, 0), }, ], [{"text": "🚫 Cancel", "action": "close"}], ], message=message, ): raise except Exception: await utils.answer(message, self.strings("set_loglevel")) return logs = "\n\n".join( [ "\n".join( handler.dumps(lvl, client_id=self._client.tg_id) if "client_id" in inspect.signature(handler.dumps).parameters else handler.dumps(lvl) ) for handler in logging.getLogger().handlers ] ) named_lvl = ( lvl if lvl not in logging._levelToName else logging._levelToName[lvl] # skipcq: PYL-W0212 ) if ( lvl < logging.WARNING and not force and ( not isinstance(message, Message) or "force_insecure" not in message.raw_text.lower() ) ): try: if not self.inline.init_complete: raise cfg = { "text": self.strings("confidential").format(named_lvl), "reply_markup": [ { "text": "📤 Send anyway", "callback": self.logs, "args": [True, lvl], }, {"text": "🚫 Cancel", "action": "close"}, ], } if isinstance(message, Message): if not await self.inline.form(**cfg, message=message): raise else: await message.edit(**cfg) except Exception: await utils.answer( message, self.strings("confidential_text").format(named_lvl), ) return if len(logs) <= 2: if isinstance(message, Message): await utils.answer(message, self.strings("no_logs").format(named_lvl)) else: await message.edit(self.strings("no_logs").format(named_lvl)) await message.unload() return if btoken := self._db.get("hikka.inline", "bot_token", False): logs = logs.replace( btoken, f'{btoken.split(":")[0]}:***************************', ) if hikka_token := self._db.get("HikkaDL", "token", False): logs = logs.replace( hikka_token, f'{hikka_token.split("_")[0]}_********************************', ) if hikka_token := self._db.get("Kirito", "token", False): logs = logs.replace( hikka_token, f'{hikka_token.split("_")[0]}_********************************', ) if os.environ.get("DATABASE_URL"): logs = logs.replace( os.environ.get("DATABASE_URL"), "postgre://**************************", ) if os.environ.get("REDIS_URL"): logs = logs.replace( os.environ.get("REDIS_URL"), "postgre://**************************", ) if os.environ.get("hikka_session"): logs = logs.replace( os.environ.get("hikka_session"), "StringSession(**************************)", ) logs = BytesIO(logs.encode("utf-16")) logs.name = self.strings("logs_filename") ghash = utils.get_git_hash() other = ( *main.__version__, " ({ghash[:8]})' if ghash else "", utils.formatted_uptime(), utils.get_named_platform(), "✅" if self._db.get(main.__name__, "no_nickname", False) else "🚫", "✅" if self._db.get(main.__name__, "grep", False) else "🚫", "✅" if self._db.get(main.__name__, "inlinelogs", False) else "🚫", ) if getattr(message, "out", True): await message.delete() if isinstance(message, Message): await utils.answer( message, logs, caption=self.strings("logs_caption").format(named_lvl, *other), ) else: await self._client.send_file( message.form["chat"], logs, caption=self.strings("logs_caption").format(named_lvl, *other), ) @loader.owner @loader.command(ru_doc="<время> - Заморозить бота на N секунд") async def suspend(self, message: Message): """