Heroku/hikka/modules/test.py

235 lines
8.5 KiB
Python
Executable File
Raw Blame History

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀ ▄▀█ ▀█▀ ▄▀█ █▀▄▀█ ▄▀█
# █▀█ █ █ █ █▀█ █▀▄ █ ▄ █▀█ █ █▀█ █ ▀ █ █▀█
#
# © Copyright 2022
#
# https://t.me/hikariatama
#
# 🔒 Licensed under the GNU GPLv3
# 🌐 https://www.gnu.org/licenses/agpl-3.0.html
# scope: inline
import time
import logging
from io import BytesIO
from .. import loader, utils, main
from typing import Union
from telethon.tl.types import Message
from telethon.errors.rpcerrorlist import ChatSendInlineForbiddenError
import aiogram
logger = logging.getLogger(__name__)
@loader.tds
class TestMod(loader.Module):
"""Perform operations based on userbot self-testing"""
strings = {
"name": "Tester",
"set_loglevel": "🚫 <b>Please specify verbosity as an integer or string</b>",
"no_logs": " <b>You don't have any logs at verbosity {}.</b>",
"logs_filename": "hikka-logs.txt",
"logs_caption": "🏩 <b>Hikka logs with verbosity </b><code>{}</code>\n\n👩‍🎤 <b>Hikka version: {}.{}.{}</b>\n⏱ <b>Uptime: {}</b>\n<b>{}</b>",
"suspend_invalid_time": "🚫 <b>Invalid time to suspend</b>",
"suspended": "🥶 <b>Bot suspended for</b> <code>{}</code> <b>seconds</b>",
"results_ping": "⏱ <b>Ping:</b> <code>{}</code> <b>ms</b>",
"confidential": "⚠️ <b>Log level </b><code>{}</code><b> may reveal your confidential info, be careful</b>",
"confidential_text": "⚠️ <b>Log level </b><code>{0}</code><b> may reveal your confidential info, be careful</b>\n<b>Type </b><code>.logs {0} force_insecure</code><b> to ignore this warning</b>",
"choose_loglevel": "💁‍♂️ <b>Choose log level</b>",
}
@staticmethod
async def dumpcmd(message: Message) -> None:
"""Use in reply to get a dump of a message"""
if not message.is_reply:
return
await utils.answer(
message,
"<code>"
+ utils.escape_html((await message.get_reply_message()).stringify())
+ "</code>",
)
@staticmethod
async def cancel(call: aiogram.types.CallbackQuery) -> None:
await call.delete()
async def logscmd(
self,
message: Union[Message, aiogram.types.CallbackQuery],
force: bool = False,
lvl: Union[int, None] = None,
) -> None:
"""<level> - Dumps logs. Loglevels below WARNING may contain personal info."""
if not isinstance(lvl, int):
args = utils.get_args_raw(message)
try:
try:
lvl = int(args.split()[0])
except ValueError:
lvl = getattr(logging, args.split()[0].upper(), None)
except IndexError:
lvl = None
if not isinstance(lvl, int):
if self.inline.init_complete:
await self.inline.form(
text=self.strings("choose_loglevel"),
reply_markup=[
[
{
"text": "🚨 Critical",
"callback": self.logscmd,
"args": (False, 50),
},
{
"text": "🚫 Error",
"callback": self.logscmd,
"args": (False, 40),
},
],
[
{
"text": "⚠️ Warning",
"callback": self.logscmd,
"args": (False, 30),
},
{
"text": " Info",
"callback": self.logscmd,
"args": (False, 20),
},
],
[
{
"text": "🧑‍💻 Debug",
"callback": self.logscmd,
"args": (False, 10),
},
{
"text": "👁 All",
"callback": self.logscmd,
"args": (False, 0),
},
],
[{"text": "🚫 Cancel", "callback": self.cancel}],
],
message=message,
)
else:
await utils.answer(message, self.strings("set_loglevel"))
return
logs = "\n\n".join(
[
("\n".join(handler.dumps(lvl)))
for handler in logging.getLogger().handlers
]
).encode("utf-16")
named_lvl = (
lvl if lvl not in logging._levelToName else logging._levelToName[lvl] # skipcq: PYL-W0212
)
if (
lvl < logging.WARNING
and not force
and (
not isinstance(message, Message)
or "force_insecure" not in message.raw_text.lower()
)
):
if self.inline.init_complete:
try:
cfg = {
"text": self.strings("confidential").format(named_lvl),
"reply_markup": [
[
{
"text": "📤 Send anyway",
"callback": self.logscmd,
"args": [True, lvl],
},
{"text": "🚫 Cancel", "callback": self.cancel},
]
],
}
if isinstance(message, Message):
await self.inline.form(**cfg, message=message)
else:
await message.edit(**cfg)
except ChatSendInlineForbiddenError:
await utils.answer(
message, self.strings("confidential_text").format(named_lvl)
)
else:
await utils.answer(
message, self.strings("confidential_text").format(named_lvl)
)
return
if len(logs) <= 2:
if isinstance(message, Message):
await utils.answer(message, self.strings("no_logs").format(named_lvl))
else:
await message.edit(self.strings("no_logs").format(named_lvl))
await message.unload()
return
logs = BytesIO(logs)
logs.name = self.strings("logs_filename")
other = (*main.__version__, utils.formatted_uptime(), utils.get_named_platform())
if isinstance(message, Message):
await message.delete()
await utils.answer(
message, logs, caption=self.strings("logs_caption").format(named_lvl, *other)
)
else:
await message.delete()
await self._client.send_file(
message.form["chat"],
logs,
caption=self.strings("logs_caption").format(named_lvl, *other),
)
@loader.owner
async def suspendcmd(self, message: Message) -> None:
"""<time> - Suspends the bot for N seconds"""
try:
time_sleep = float(utils.get_args_raw(message))
await utils.answer(
message, self.strings("suspended", message).format(str(time_sleep))
)
time.sleep(time_sleep)
except ValueError:
await utils.answer(message, self.strings("suspend_invalid_time", message))
async def pingcmd(self, message: Message) -> None:
"""Test your userbot ping"""
start = time.perf_counter_ns()
message = await utils.answer(message, "<code>🐻 Bear with us while ping is checking...</code>")
end = time.perf_counter_ns()
if isinstance(message, (list, tuple, set)):
message = message[0]
ms = (end - start) * 0.000001
await utils.answer(message, self.strings("results_ping").format(round(ms, 3)))
async def client_ready(self, client, db) -> None:
self._client = client