# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀ # █▀█ █ █ █ █▀█ █▀▄ █ # © Copyright 2022 # https://t.me/hikariatama # # 🔒 Licensed under the GNU AGPLv3 # 🌐 https://www.gnu.org/licenses/agpl-3.0.html import asyncio import io import json import logging import time from telethon.tl.types import Message from telethon.tl import functions from telethon.tl.tlobject import TLRequest from .. import loader, utils from ..inline.types import InlineCall logger = logging.getLogger(__name__) GROUPS = [ "auth", "account", "users", "contacts", "messages", "updates", "photos", "upload", "help", "channels", "bots", "payments", "stickers", "phone", "langpack", "folders", "stats", ] def decapitalize(string: str) -> str: return string[0].lower() + string[1:] CONSTRUCTORS = { decapitalize( method.__class__.__name__.rsplit("Request", 1)[0] ): method.CONSTRUCTOR_ID for method in utils.array_sum( [ [ method for method in dir(getattr(functions, group)) if isinstance(method, TLRequest) ] for group in GROUPS ] ) } @loader.tds class APIRatelimiterMod(loader.Module): """Helps userbot avoid spamming Telegram API""" strings = { "name": "APILimiter", "warning": ( "☣️" " WARNING!\n\nYour account exceeded the limit of requests, specified" " in config. In order to prevent Telegram API Flood, userbot has been" " fully frozen for {} seconds. Further info is provided in attached" " file. \n\nIt is recommended to get help in {prefix}support" " group!\n\nIf you think, that it is an intended behavior, then wait until" " userbot gets unlocked and next time, when you will be going to perform" " such an operation, use {prefix}suspend_api_protect <time" " in seconds>" ), "args_invalid": ( "☣️ Invalid arguments" ), "suspended_for": ( "👌 API Flood Protection" " is disabled for {} seconds" ), "test": ( "☣️ This action will" " expose your account to flooding Telegram API. In order to confirm," " that you really know, what you are doing, complete this simple test -" " find the emoji, differing from others" ), "on": ( "👌 Protection enabled" ), "off": ( "👌 Protection" " disabled" ), "u_sure": ( "☣️ Are you sure?" ), } strings_ru = { "warning": ( "☣️" " ВНИМАНИЕ!\n\nАккаунт вышел за лимиты запросов, указанные в" " конфиге. С целью предотвращения флуда Telegram API, юзербот был" " полностью заморожен на {} секунд. Дополнительная информация" " прикреплена в файле ниже. \n\nРекомендуется обратиться за помощью в" " {prefix}support группу!\n\nЕсли ты считаешь, что это" " запланированное поведение юзербота, просто подожди, пока закончится" " таймер и в следующий раз, когда запланируешь выполнять такую" " ресурсозатратную операцию, используй" " {prefix}suspend_api_protect <время в секундах>" ), "args_invalid": ( "☣️ Неверные" " аргументы" ), "suspended_for": ( "👌 Защита API отключена" " на {} секунд" ), "test": ( "☣️ Это действие" " открывает юзерботу возможность флудить Telegram API. Для того," " чтобы убедиться, что ты действительно уверен в том, что делаешь - реши" " простенький тест - найди отличающийся эмодзи." ), "on": "👌 Защита включена", "off": ( "👌 Защита отключена" ), "u_sure": "☣️ Ты уверен?", } _ratelimiter = [] _suspend_until = 0 _lock = False def __init__(self): self.config = loader.ModuleConfig( loader.ConfigValue( "time_sample", 15, lambda: "Time sample through which the bot will count requests", validator=loader.validators.Integer(minimum=1), ), loader.ConfigValue( "threshold", 100, lambda: "Threshold of requests to trigger protection", validator=loader.validators.Integer(minimum=10), ), loader.ConfigValue( "local_floodwait", 30, lambda: "Freeze userbot for this amount of time, if request limit exceeds", validator=loader.validators.Integer(minimum=10, maximum=3600), ), loader.ConfigValue( "forbidden_methods", ["joinChannel", "importChatInvite"], lambda: "Forbid specified methods from being executed throughout external modules", validator=loader.validators.MultiChoice( [ "sendReaction", "joinChannel", "importChatInvite", ] ), on_change=lambda: self._client.forbid_constructors( map( lambda x: CONSTRUCTORS[x], self.config["forbidden_constructors"] ) ), ), ) async def client_ready(self): asyncio.ensure_future(self._install_protection()) async def _install_protection(self): await asyncio.sleep(30) # Restart lock if hasattr(self._client._call, "_old_call_rewritten"): raise loader.SelfUnload("Already installed") old_call = self._client._call async def new_call( sender: "MTProtoSender", # type: ignore request: "TLRequest", # type: ignore ordered: bool = False, flood_sleep_threshold: int = None, ): if time.perf_counter() > self._suspend_until and not self.get( "disable_protection", True, ): request_name = type(request).__name__ self._ratelimiter += [[request_name, time.perf_counter()]] self._ratelimiter = list( filter( lambda x: time.perf_counter() - x[1] < int(self.config["time_sample"]), self._ratelimiter, ) ) if ( len(self._ratelimiter) > int(self.config["threshold"]) and not self._lock ): self._lock = True report = io.BytesIO( json.dumps( self._ratelimiter, indent=4, ).encode("utf-8") ) report.name = "local_fw_report.json" await self.inline.bot.send_document( self.tg_id, report, caption=self.strings("warning").format( self.config["local_floodwait"], prefix=self.get_prefix(), ), ) # It is intented to use time.sleep instead of asyncio.sleep time.sleep(int(self.config["local_floodwait"])) self._lock = False return await old_call(sender, request, ordered, flood_sleep_threshold) self._client._call = new_call self._client._old_call_rewritten = old_call self._client._call._hikka_overwritten = True logger.debug("Successfully installed ratelimiter") async def on_unload(self): if hasattr(self._client, "_old_call_rewritten"): self._client._call = self._client._old_call_rewritten delattr(self._client, "_old_call_rewritten") logger.debug("Successfully uninstalled ratelimiter") @loader.command(ru_doc="<время в секундах> - Заморозить защиту API на N секунд") async def suspend_api_protect(self, message: Message): """