diff --git a/hikka/modules/api_protection.py b/hikka/modules/api_protection.py new file mode 100644 index 0000000..8038a8b --- /dev/null +++ b/hikka/modules/api_protection.py @@ -0,0 +1,228 @@ +# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀ ▄▀█ ▀█▀ ▄▀█ █▀▄▀█ ▄▀█ +# █▀█ █ █ █ █▀█ █▀▄ █ ▄ █▀█ █ █▀█ █ ▀ █ █▀█ +# +# © Copyright 2022 +# +# https://t.me/hikariatama +# +# 🔒 Licensed under the GNU GPLv3 +# 🌐 https://www.gnu.org/licenses/agpl-3.0.html + +# meta pic: https://img.icons8.com/emoji/344/shield-emoji.png +# meta developer: @hikariatama + +from .. import loader, utils +from ..inline.types import InlineCall +from telethon.tl.types import Message +import logging +import time +import io +import json +import random + +logger = logging.getLogger(__name__) + + +@loader.tds +class APIRatelimiterMod(loader.Module): + """Helps userbot avoid spamming Telegram API""" + + strings = { + "name": "APIRatelimiter", + "warning": ( + "🚫 WARNING!\n\n" + "Your account exceeded the limit of requests, " + "specified in config. In order to prevent " + "Telegram API Flood, userbot has been fully frozen " + "for {} seconds. Further info is provided in attached file. \n\n" + "It is recommended to get help in {prefix}support group!\n\n" + "If you think, that it is an intended behavior, then wait until userbot gets unlocked " + "and next time, when you will be going to perform such an operation, use " + "{prefix}suspend_api_protect <time in seconds>" + ), + "args_invalid": "🚫 Invalid arguments", + "suspended_for": "✅ API Flood Protection is disabled for {} seconds", + "test": "⚠️ This action will expose your account to flooding Telegram API. In order to confirm, that you really know, what you are doing, complete this simple test - find the emoji, differing from others", + "saved": "🚫 Protection disabled", + "u_sure": "⚠️ Are you sure?", + } + + strings_ru = { + "warning": ( + "🚫 ВНИМАНИЕ!\n\n" + "Аккаунт вышел за лимиты запросов, указанные в конфиге. " + "С целью предотвращения флуда Telegram API, юзербот был полностью заморожен " + "на {} секунд. Дополнительная информация прикреплена в файле ниже. \n\n" + "Рекомендуется обратиться за помощью в {prefix}support группу!\n\n" + "Если ты считаешь, что это запланированное поведение юзербота, просто подожди, пока закончится таймер " + "и в следующий раз, когда запланируешь выполнять такую ресурсозатратную операцию, используй " + "{prefix}suspend_api_protect <время в секундах>" + ), + "args_invalid": "🚫 Неверные аргументы", + "suspended_for": "✅ Защита API отключена на {} секунд", + "test": "⚠️ Это действие открывает юзерботу возможность флудить Telegram API. Для того, чтобы убедиться, что ты действительно уверен в том, что делаешь - реши простенький тест - найди отличающийся эмодзи.", + "saved": "🚫 Защита отключена", + "u_sure": "⚠️ Ты уверен?", + } + + _ratelimiter = [] + _suspend_until = 0 + + def __init__(self): + self.config = loader.ModuleConfig( + "time_sample", + 15, + lambda: "Time sample DO NOT TOUCH", + "threshold", + 50, + lambda: "Threshold DO NOT TOUCH", + "local_floodwait", + 30, + lambda: "Local FloodWait time DO NOT TOUCH", + ) + + async def client_ready(self, client, db): + if hasattr(client._call, "_old_call_rewritten"): + raise loader.SelfUnload("Already installed") + + self._me = (await client.get_me()).id + old_call = client._call + + async def new_call( + sender: "MTProtoSender", # noqa: F821 + request: "TLRequest", # noqa: F821 + ordered: bool = False, + flood_sleep_threshold: int = None, + ): + if time.perf_counter() > self._suspend_until and not self.get( + "disable_protection" + ): + request_name = type(request).__name__ + self._ratelimiter += [[request_name, time.perf_counter()]] + + self._ratelimiter = list( + filter( + lambda x: time.perf_counter() - x[1] + < int(self.config["time_sample"]), + self._ratelimiter, + ) + ) + + if len(self._ratelimiter) > int(self.config["threshold"]): + report = io.BytesIO( + json.dumps( + self._ratelimiter, + indent=4, + ).encode("utf-8") + ) + report.name = "local_fw_report.json" + + await self.inline.bot.send_document( + self._me, + report, + caption=self.strings("warning").format( + self.config["local_floodwait"], + prefix=self.get_prefix(), + ), + parse_mode="HTML", + ) + + # It is intented to use time.sleep instead of asyncio.sleep + time.sleep(int(self.config["local_floodwait"])) + + return await old_call(sender, request, ordered, flood_sleep_threshold) + + client._call = new_call + client._old_call_rewritten = old_call + client._call._hikka_overwritten = True + logger.debug("Successfully installed ratelimiter") + + async def on_unload(self): + if hasattr(self._client, "_old_call_rewritten"): + self._client._call = self._client._old_call_rewritten + delattr(self._client, "_old_call_rewritten") + logger.debug("Successfully uninstalled ratelimiter") + + async def suspend_api_protectcmd(self, message: Message): + """