# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
# █▀█ █ █ █ █▀█ █▀▄ █
# © Copyright 2022
# https://t.me/hikariatama
#
# 🔒 Licensed under the GNU AGPLv3
# 🌐 https://www.gnu.org/licenses/agpl-3.0.html
# meta pic: https://img.icons8.com/emoji/344/shield-emoji.png
# meta developer: @hikariatama
import asyncio
import io
import json
import logging
import time
from telethon.tl.types import Message
from .. import loader, utils
from ..inline.types import InlineCall
logger = logging.getLogger(__name__)
@loader.tds
class APIRatelimiterMod(loader.Module):
"""Helps userbot avoid spamming Telegram API"""
strings = {
"name": "APIRatelimiter",
"warning": (
"🚫 WARNING!\n\nYour account exceeded the limit of requests,"
" specified in config. In order to prevent Telegram API Flood, userbot has"
" been fully frozen for {} seconds. Further info is provided in"
" attached file. \n\nIt is recommended to get help in"
" {prefix}support
group!\n\nIf you think, that it is an"
" intended behavior, then wait until userbot gets unlocked and next time,"
" when you will be going to perform such an operation, use"
" {prefix}suspend_api_protect
<time in seconds>"
),
"args_invalid": "🚫 Invalid arguments",
"suspended_for": "✅ API Flood Protection is disabled for {} seconds",
"test": (
"⚠️ This action will expose your account to flooding Telegram API."
" In order to confirm, that you really know, what you are doing,"
" complete this simple test - find the emoji, differing from others"
),
"on": "✅ Protection enabled",
"off": "🚫 Protection disabled",
"u_sure": "⚠️ Are you sure?",
}
strings_ru = {
"warning": (
"🚫 ВНИМАНИЕ!\n\nАккаунт вышел за лимиты запросов, указанные в"
" конфиге. С целью предотвращения флуда Telegram API, юзербот был"
" полностью заморожен на {} секунд. Дополнительная информация"
" прикреплена в файле ниже. \n\nРекомендуется обратиться за помощью в"
" {prefix}support
группу!\n\nЕсли ты считаешь, что это"
" запланированное поведение юзербота, просто подожди, пока закончится"
" таймер и в следующий раз, когда запланируешь выполнять такую"
" ресурсозатратную операцию, используй"
" {prefix}suspend_api_protect
<время в секундах>"
),
"args_invalid": "🚫 Неверные аргументы",
"suspended_for": "✅ Защита API отключена на {} секунд",
"test": (
"⚠️ Это действие открывает юзерботу возможность флудить Telegram"
" API. Для того, чтобы убедиться, что ты действительно уверен в том,"
" что делаешь - реши простенький тест - найди отличающийся эмодзи."
),
"on": "✅ Защита включена",
"off": "🚫 Защита отключена",
"u_sure": "⚠️ Ты уверен?",
}
_ratelimiter = []
_suspend_until = 0
_lock = False
def __init__(self):
self.config = loader.ModuleConfig(
loader.ConfigValue(
"time_sample",
15,
lambda: "Time sample DO NOT TOUCH",
validator=loader.validators.Integer(minimum=1),
),
loader.ConfigValue(
"threshold",
100,
lambda: "Threshold DO NOT TOUCH",
validator=loader.validators.Integer(minimum=10),
),
loader.ConfigValue(
"local_floodwait",
30,
lambda: "Local FW DO NOT TOUCH",
validator=loader.validators.Integer(minimum=10, maximum=3600),
),
)
async def client_ready(self, *_):
asyncio.ensure_future(self._install_protection())
async def _install_protection(self):
await asyncio.sleep(30) # Restart lock
if hasattr(self._client._call, "_old_call_rewritten"):
raise loader.SelfUnload("Already installed")
old_call = self._client._call
async def new_call(
sender: "MTProtoSender", # type: ignore
request: "TLRequest", # type: ignore
ordered: bool = False,
flood_sleep_threshold: int = None,
):
if time.perf_counter() > self._suspend_until and not self.get(
"disable_protection",
True,
):
request_name = type(request).__name__
self._ratelimiter += [[request_name, time.perf_counter()]]
self._ratelimiter = list(
filter(
lambda x: time.perf_counter() - x[1]
< int(self.config["time_sample"]),
self._ratelimiter,
)
)
if (
len(self._ratelimiter) > int(self.config["threshold"])
and not self._lock
):
self._lock = True
report = io.BytesIO(
json.dumps(
self._ratelimiter,
indent=4,
).encode("utf-8")
)
report.name = "local_fw_report.json"
await self.inline.bot.send_document(
self.tg_id,
report,
caption=self.strings("warning").format(
self.config["local_floodwait"],
prefix=self.get_prefix(),
),
)
# It is intented to use time.sleep instead of asyncio.sleep
time.sleep(int(self.config["local_floodwait"]))
self._lock = False
return await old_call(sender, request, ordered, flood_sleep_threshold)
self._client._call = new_call
self._client._old_call_rewritten = old_call
self._client._call._hikka_overwritten = True
logger.debug("Successfully installed ratelimiter")
async def on_unload(self):
if hasattr(self._client, "_old_call_rewritten"):
self._client._call = self._client._old_call_rewritten
delattr(self._client, "_old_call_rewritten")
logger.debug("Successfully uninstalled ratelimiter")
async def suspend_api_protectcmd(self, message: Message):
"""