mirror of https://github.com/coddrago/Heroku
Delete hikka/modules/hikka_settings.py
parent
04da3c0b65
commit
2ba1a7ea69
|
@ -1,927 +0,0 @@
|
|||
# ©️ Dan Gazizullin, 2021-2023
|
||||
# This file is a part of Hikka Userbot
|
||||
# 🌐 https://github.com/hikariatama/Hikka
|
||||
# You can redistribute it and/or modify it under the terms of the GNU AGPLv3
|
||||
# 🔑 https://www.gnu.org/licenses/agpl-3.0.html
|
||||
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
|
||||
import hikkatl
|
||||
from hikkatl.tl.functions.messages import (
|
||||
GetDialogFiltersRequest,
|
||||
UpdateDialogFilterRequest,
|
||||
)
|
||||
from hikkatl.tl.types import Message
|
||||
from hikkatl.utils import get_display_name
|
||||
|
||||
from .. import loader, log, main, utils
|
||||
from .._internal import fw_protect, restart
|
||||
from ..inline.types import InlineCall
|
||||
from ..web import core
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
ALL_INVOKES = [
|
||||
"flush_entity_cache",
|
||||
"flush_fulluser_cache",
|
||||
"flush_fullchannel_cache",
|
||||
"flush_perms_cache",
|
||||
"flush_loader_cache",
|
||||
"flush_cache",
|
||||
"reload_core",
|
||||
"inspect_cache",
|
||||
"inspect_modules",
|
||||
]
|
||||
|
||||
|
||||
@loader.tds
|
||||
class HikkaSettingsMod(loader.Module):
|
||||
"""Advanced settings for Heroku Userbot"""
|
||||
|
||||
strings = {"name": "HerokuSettings"}
|
||||
|
||||
def get_watchers(self) -> tuple:
|
||||
return [
|
||||
str(watcher.__self__.__class__.strings["name"])
|
||||
for watcher in self.allmodules.watchers
|
||||
if watcher.__self__.__class__.strings is not None
|
||||
], self._db.get(main.__name__, "disabled_watchers", {})
|
||||
|
||||
async def _uninstall(self, call: InlineCall):
|
||||
await call.edit(self.strings("uninstall"))
|
||||
|
||||
async with self._client.conversation("@BotFather") as conv:
|
||||
for msg in [
|
||||
"/deletebot",
|
||||
f"@{self.inline.bot_username}",
|
||||
"Yes, I am totally sure.",
|
||||
]:
|
||||
await fw_protect()
|
||||
m = await conv.send_message(msg)
|
||||
r = await conv.get_response()
|
||||
|
||||
logger.debug(">> %s", m.raw_text)
|
||||
logger.debug("<< %s", r.raw_text)
|
||||
|
||||
await fw_protect()
|
||||
|
||||
await m.delete()
|
||||
await r.delete()
|
||||
|
||||
async for dialog in self._client.iter_dialogs(
|
||||
None,
|
||||
ignore_migrated=True,
|
||||
):
|
||||
if (
|
||||
dialog.name
|
||||
in {
|
||||
"heroku-logs",
|
||||
"heroku-onload",
|
||||
"heroku-assets",
|
||||
"heroku-backups",
|
||||
"heroku-acc-switcher",
|
||||
"silent-tags",
|
||||
}
|
||||
and dialog.is_channel
|
||||
and (
|
||||
dialog.entity.participants_count == 1
|
||||
or dialog.entity.participants_count == 2
|
||||
and dialog.name in {"heroku-logs", "silent-tags"}
|
||||
)
|
||||
or (
|
||||
self._client.loader.inline.init_complete
|
||||
and dialog.entity.id == self._client.loader.inline.bot_id
|
||||
)
|
||||
):
|
||||
await fw_protect()
|
||||
await self._client.delete_dialog(dialog.entity)
|
||||
|
||||
await fw_protect()
|
||||
|
||||
folders = await self._client(GetDialogFiltersRequest())
|
||||
|
||||
if any(folder.title == "heroku" for folder in folders):
|
||||
folder_id = max(
|
||||
folders,
|
||||
key=lambda x: x.id,
|
||||
).id
|
||||
await fw_protect()
|
||||
await self._client(UpdateDialogFilterRequest(id=folder_id))
|
||||
|
||||
for handler in logging.getLogger().handlers:
|
||||
handler.setLevel(logging.CRITICAL)
|
||||
|
||||
await fw_protect()
|
||||
|
||||
await self._client.log_out()
|
||||
|
||||
restart()
|
||||
|
||||
async def _uninstall_confirm_step_2(self, call: InlineCall):
|
||||
await call.edit(
|
||||
self.strings("deauth_confirm_step2"),
|
||||
utils.chunks(
|
||||
list(
|
||||
sorted(
|
||||
[
|
||||
{
|
||||
"text": self.strings("deauth_yes"),
|
||||
"callback": self._uninstall,
|
||||
},
|
||||
*[
|
||||
{
|
||||
"text": self.strings(f"deauth_no_{i}"),
|
||||
"action": "close",
|
||||
}
|
||||
for i in range(1, 4)
|
||||
],
|
||||
],
|
||||
key=lambda _: random.random(),
|
||||
)
|
||||
),
|
||||
2,
|
||||
)
|
||||
+ [
|
||||
[
|
||||
{
|
||||
"text": self.strings("deauth_cancel"),
|
||||
"action": "close",
|
||||
}
|
||||
]
|
||||
],
|
||||
)
|
||||
|
||||
@loader.command()
|
||||
async def uninstall_heroku(self, message: Message):
|
||||
await self.inline.form(
|
||||
self.strings("deauth_confirm"),
|
||||
message,
|
||||
[
|
||||
{
|
||||
"text": self.strings("deauth_confirm_btn"),
|
||||
"callback": self._uninstall_confirm_step_2,
|
||||
},
|
||||
{"text": self.strings("deauth_cancel"), "action": "close"},
|
||||
],
|
||||
)
|
||||
|
||||
@loader.command()
|
||||
async def watchers(self, message: Message):
|
||||
watchers, disabled_watchers = self.get_watchers()
|
||||
watchers = [
|
||||
f"♻️ {watcher}"
|
||||
for watcher in watchers
|
||||
if watcher not in list(disabled_watchers.keys())
|
||||
]
|
||||
watchers += [f"💢 {k} {v}" for k, v in disabled_watchers.items()]
|
||||
await utils.answer(
|
||||
message, self.strings("watchers").format("\n".join(watchers))
|
||||
)
|
||||
|
||||
@loader.command()
|
||||
async def watcherbl(self, message: Message):
|
||||
if not (args := utils.get_args_raw(message)):
|
||||
await utils.answer(message, self.strings("args"))
|
||||
return
|
||||
|
||||
watchers, disabled_watchers = self.get_watchers()
|
||||
|
||||
if args.lower() not in map(lambda x: x.lower(), watchers):
|
||||
await utils.answer(message, self.strings("mod404").format(args))
|
||||
return
|
||||
|
||||
args = next((x.lower() == args.lower() for x in watchers), False)
|
||||
|
||||
current_bl = [
|
||||
v for k, v in disabled_watchers.items() if k.lower() == args.lower()
|
||||
]
|
||||
current_bl = current_bl[0] if current_bl else []
|
||||
|
||||
chat = utils.get_chat_id(message)
|
||||
if chat not in current_bl:
|
||||
if args in disabled_watchers:
|
||||
for k in disabled_watchers:
|
||||
if k.lower() == args.lower():
|
||||
disabled_watchers[k].append(chat)
|
||||
break
|
||||
else:
|
||||
disabled_watchers[args] = [chat]
|
||||
|
||||
await utils.answer(
|
||||
message,
|
||||
self.strings("disabled").format(args) + " <b>in current chat</b>",
|
||||
)
|
||||
else:
|
||||
for k in disabled_watchers.copy():
|
||||
if k.lower() == args.lower():
|
||||
disabled_watchers[k].remove(chat)
|
||||
if not disabled_watchers[k]:
|
||||
del disabled_watchers[k]
|
||||
break
|
||||
|
||||
await utils.answer(
|
||||
message,
|
||||
self.strings("enabled").format(args) + " <b>in current chat</b>",
|
||||
)
|
||||
|
||||
self._db.set(main.__name__, "disabled_watchers", disabled_watchers)
|
||||
|
||||
@loader.command()
|
||||
async def watchercmd(self, message: Message):
|
||||
if not (args := utils.get_args_raw(message)):
|
||||
return await utils.answer(message, self.strings("args"))
|
||||
|
||||
chats, pm, out, incoming = False, False, False, False
|
||||
|
||||
if "-c" in args:
|
||||
args = args.replace("-c", "").replace(" ", " ").strip()
|
||||
chats = True
|
||||
|
||||
if "-p" in args:
|
||||
args = args.replace("-p", "").replace(" ", " ").strip()
|
||||
pm = True
|
||||
|
||||
if "-o" in args:
|
||||
args = args.replace("-o", "").replace(" ", " ").strip()
|
||||
out = True
|
||||
|
||||
if "-i" in args:
|
||||
args = args.replace("-i", "").replace(" ", " ").strip()
|
||||
incoming = True
|
||||
|
||||
if chats and pm:
|
||||
pm = False
|
||||
if out and incoming:
|
||||
incoming = False
|
||||
|
||||
watchers, disabled_watchers = self.get_watchers()
|
||||
|
||||
if args.lower() not in [watcher.lower() for watcher in watchers]:
|
||||
return await utils.answer(message, self.strings("mod404").format(args))
|
||||
|
||||
args = [watcher for watcher in watchers if watcher.lower() == args.lower()][0]
|
||||
|
||||
if chats or pm or out or incoming:
|
||||
disabled_watchers[args] = [
|
||||
*(["only_chats"] if chats else []),
|
||||
*(["only_pm"] if pm else []),
|
||||
*(["out"] if out else []),
|
||||
*(["in"] if incoming else []),
|
||||
]
|
||||
self._db.set(main.__name__, "disabled_watchers", disabled_watchers)
|
||||
await utils.answer(
|
||||
message,
|
||||
self.strings("enabled").format(args)
|
||||
+ f" (<code>{disabled_watchers[args]}</code>)",
|
||||
)
|
||||
return
|
||||
|
||||
if args in disabled_watchers and "*" in disabled_watchers[args]:
|
||||
await utils.answer(message, self.strings("enabled").format(args))
|
||||
del disabled_watchers[args]
|
||||
self._db.set(main.__name__, "disabled_watchers", disabled_watchers)
|
||||
return
|
||||
|
||||
disabled_watchers[args] = ["*"]
|
||||
self._db.set(main.__name__, "disabled_watchers", disabled_watchers)
|
||||
await utils.answer(message, self.strings("disabled").format(args))
|
||||
|
||||
@loader.command()
|
||||
async def nonickuser(self, message: Message):
|
||||
if not (reply := await message.get_reply_message()):
|
||||
await utils.answer(message, self.strings("reply_required"))
|
||||
return
|
||||
|
||||
u = reply.sender_id
|
||||
if not isinstance(u, int):
|
||||
u = u.user_id
|
||||
|
||||
nn = self._db.get(main.__name__, "nonickusers", [])
|
||||
if u not in nn:
|
||||
nn += [u]
|
||||
nn = list(set(nn)) # skipcq: PTC-W0018
|
||||
await utils.answer(message, self.strings("user_nn").format("on"))
|
||||
else:
|
||||
nn = list(set(nn) - {u})
|
||||
await utils.answer(message, self.strings("user_nn").format("off"))
|
||||
|
||||
self._db.set(main.__name__, "nonickusers", nn)
|
||||
|
||||
@loader.command()
|
||||
async def nonickchat(self, message: Message):
|
||||
if message.is_private:
|
||||
await utils.answer(message, self.strings("private_not_allowed"))
|
||||
return
|
||||
|
||||
chat = utils.get_chat_id(message)
|
||||
|
||||
nn = self._db.get(main.__name__, "nonickchats", [])
|
||||
if chat not in nn:
|
||||
nn += [chat]
|
||||
nn = list(set(nn)) # skipcq: PTC-W0018
|
||||
await utils.answer(
|
||||
message,
|
||||
self.strings("cmd_nn").format(
|
||||
utils.escape_html((await message.get_chat()).title),
|
||||
"on",
|
||||
),
|
||||
)
|
||||
else:
|
||||
nn = list(set(nn) - {chat})
|
||||
await utils.answer(
|
||||
message,
|
||||
self.strings("cmd_nn").format(
|
||||
utils.escape_html((await message.get_chat()).title),
|
||||
"off",
|
||||
),
|
||||
)
|
||||
|
||||
self._db.set(main.__name__, "nonickchats", nn)
|
||||
|
||||
@loader.command()
|
||||
async def nonickcmdcmd(self, message: Message):
|
||||
if not (args := utils.get_args_raw(message)):
|
||||
await utils.answer(message, self.strings("no_cmd"))
|
||||
return
|
||||
|
||||
if args not in self.allmodules.commands:
|
||||
await utils.answer(message, self.strings("cmd404"))
|
||||
return
|
||||
|
||||
nn = self._db.get(main.__name__, "nonickcmds", [])
|
||||
if args not in nn:
|
||||
nn += [args]
|
||||
nn = list(set(nn))
|
||||
await utils.answer(
|
||||
message,
|
||||
self.strings("cmd_nn").format(
|
||||
utils.escape_html(self.get_prefix() + args),
|
||||
"on",
|
||||
),
|
||||
)
|
||||
else:
|
||||
nn = list(set(nn) - {args})
|
||||
await utils.answer(
|
||||
message,
|
||||
self.strings("cmd_nn").format(
|
||||
utils.escape_html(self.get_prefix() + args),
|
||||
"off",
|
||||
),
|
||||
)
|
||||
|
||||
self._db.set(main.__name__, "nonickcmds", nn)
|
||||
|
||||
@loader.command()
|
||||
async def nonickcmds(self, message: Message):
|
||||
if not self._db.get(main.__name__, "nonickcmds", []):
|
||||
await utils.answer(message, self.strings("nothing"))
|
||||
return
|
||||
|
||||
await utils.answer(
|
||||
message,
|
||||
self.strings("cmd_nn_list").format(
|
||||
"\n".join(
|
||||
[
|
||||
f"▫️ <code>{utils.escape_html(self.get_prefix() + cmd)}</code>"
|
||||
for cmd in self._db.get(main.__name__, "nonickcmds", [])
|
||||
]
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
@loader.command()
|
||||
async def nonickusers(self, message: Message):
|
||||
users = []
|
||||
for user_id in self._db.get(main.__name__, "nonickusers", []).copy():
|
||||
try:
|
||||
user = await self._client.get_entity(user_id)
|
||||
except Exception:
|
||||
self._db.set(
|
||||
main.__name__,
|
||||
"nonickusers",
|
||||
list(
|
||||
(
|
||||
set(self._db.get(main.__name__, "nonickusers", []))
|
||||
- {user_id}
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
logger.warning("User %s removed from nonickusers list", user_id)
|
||||
continue
|
||||
|
||||
users += [
|
||||
'▫️ <b><a href="tg://user?id={}">{}</a></b>'.format(
|
||||
user_id,
|
||||
utils.escape_html(get_display_name(user)),
|
||||
)
|
||||
]
|
||||
|
||||
if not users:
|
||||
await utils.answer(message, self.strings("nothing"))
|
||||
return
|
||||
|
||||
await utils.answer(
|
||||
message,
|
||||
self.strings("user_nn_list").format("\n".join(users)),
|
||||
)
|
||||
|
||||
@loader.command()
|
||||
async def nonickchats(self, message: Message):
|
||||
chats = []
|
||||
for chat in self._db.get(main.__name__, "nonickchats", []):
|
||||
try:
|
||||
chat_entity = await self._client.get_entity(int(chat))
|
||||
except Exception:
|
||||
self._db.set(
|
||||
main.__name__,
|
||||
"nonickchats",
|
||||
list(
|
||||
(set(self._db.get(main.__name__, "nonickchats", [])) - {chat})
|
||||
),
|
||||
)
|
||||
|
||||
logger.warning("Chat %s removed from nonickchats list", chat)
|
||||
continue
|
||||
|
||||
chats += [
|
||||
'▫️ <b><a href="{}">{}</a></b>'.format(
|
||||
utils.get_entity_url(chat_entity),
|
||||
utils.escape_html(get_display_name(chat_entity)),
|
||||
)
|
||||
]
|
||||
|
||||
if not chats:
|
||||
await utils.answer(message, self.strings("nothing"))
|
||||
return
|
||||
|
||||
await utils.answer(
|
||||
message,
|
||||
self.strings("user_nn_list").format("\n".join(chats)),
|
||||
)
|
||||
|
||||
async def inline__setting(self, call: InlineCall, key: str, state: bool = False):
|
||||
if callable(key):
|
||||
key()
|
||||
hikkatl.extensions.html.CUSTOM_EMOJIS = not main.get_config_key(
|
||||
"disable_custom_emojis"
|
||||
)
|
||||
else:
|
||||
self._db.set(main.__name__, key, state)
|
||||
|
||||
if key == "no_nickname" and state and self.get_prefix() == ".":
|
||||
await call.answer(
|
||||
self.strings("nonick_warning"),
|
||||
show_alert=True,
|
||||
)
|
||||
else:
|
||||
await call.answer("Configuration value saved!")
|
||||
|
||||
await call.edit(
|
||||
self.strings("inline_settings"),
|
||||
reply_markup=self._get_settings_markup(),
|
||||
)
|
||||
|
||||
async def inline__update(
|
||||
self,
|
||||
call: InlineCall,
|
||||
confirm_required: bool = False,
|
||||
):
|
||||
if confirm_required:
|
||||
await call.edit(
|
||||
self.strings("confirm_update"),
|
||||
reply_markup=[
|
||||
{"text": "🪂 Update", "callback": self.inline__update},
|
||||
{"text": "🚫 Cancel", "action": "close"},
|
||||
],
|
||||
)
|
||||
return
|
||||
|
||||
await call.answer("You userbot is being updated...", show_alert=True)
|
||||
await call.delete()
|
||||
await self.invoke("update", "-f", peer="me")
|
||||
|
||||
async def _remove_core_protection(self, call: InlineCall):
|
||||
self._db.set(main.__name__, "remove_core_protection", True)
|
||||
await call.edit(self.strings("core_protection_removed"))
|
||||
|
||||
@loader.command()
|
||||
async def remove_core_protection(self, message: Message):
|
||||
if self._db.get(main.__name__, "remove_core_protection", False):
|
||||
await utils.answer(message, self.strings("core_protection_already_removed"))
|
||||
return
|
||||
|
||||
await self.inline.form(
|
||||
message=message,
|
||||
text=self.strings("core_protection_confirm"),
|
||||
reply_markup=[
|
||||
{
|
||||
"text": self.strings("core_protection_btn"),
|
||||
"callback": self._remove_core_protection,
|
||||
},
|
||||
{
|
||||
"text": self.strings("btn_no"),
|
||||
"action": "close",
|
||||
},
|
||||
],
|
||||
)
|
||||
|
||||
async def inline__restart(
|
||||
self,
|
||||
call: InlineCall,
|
||||
confirm_required: bool = False,
|
||||
):
|
||||
if confirm_required:
|
||||
await call.edit(
|
||||
self.strings("confirm_restart"),
|
||||
reply_markup=[
|
||||
{"text": "🔄 Restart", "callback": self.inline__restart},
|
||||
{"text": "🚫 Cancel", "action": "close"},
|
||||
],
|
||||
)
|
||||
return
|
||||
|
||||
await call.answer("You userbot is being restarted...", show_alert=True)
|
||||
await call.delete()
|
||||
await self.invoke("restart", "-f", peer="me")
|
||||
|
||||
def _get_settings_markup(self) -> list:
|
||||
return [
|
||||
[
|
||||
(
|
||||
{
|
||||
"text": "✅ NoNick",
|
||||
"callback": self.inline__setting,
|
||||
"args": (
|
||||
"no_nickname",
|
||||
False,
|
||||
),
|
||||
}
|
||||
if self._db.get(main.__name__, "no_nickname", False)
|
||||
else {
|
||||
"text": "🚫 NoNick",
|
||||
"callback": self.inline__setting,
|
||||
"args": (
|
||||
"no_nickname",
|
||||
True,
|
||||
),
|
||||
}
|
||||
),
|
||||
(
|
||||
{
|
||||
"text": "✅ Grep",
|
||||
"callback": self.inline__setting,
|
||||
"args": (
|
||||
"grep",
|
||||
False,
|
||||
),
|
||||
}
|
||||
if self._db.get(main.__name__, "grep", False)
|
||||
else {
|
||||
"text": "🚫 Grep",
|
||||
"callback": self.inline__setting,
|
||||
"args": (
|
||||
"grep",
|
||||
True,
|
||||
),
|
||||
}
|
||||
),
|
||||
(
|
||||
{
|
||||
"text": "✅ InlineLogs",
|
||||
"callback": self.inline__setting,
|
||||
"args": (
|
||||
"inlinelogs",
|
||||
False,
|
||||
),
|
||||
}
|
||||
if self._db.get(main.__name__, "inlinelogs", True)
|
||||
else {
|
||||
"text": "🚫 InlineLogs",
|
||||
"callback": self.inline__setting,
|
||||
"args": (
|
||||
"inlinelogs",
|
||||
True,
|
||||
),
|
||||
}
|
||||
),
|
||||
],
|
||||
[
|
||||
(
|
||||
{
|
||||
"text": self.strings("do_not_suggest_fs"),
|
||||
"callback": self.inline__setting,
|
||||
"args": (
|
||||
"disable_modules_fs",
|
||||
False,
|
||||
),
|
||||
}
|
||||
if self._db.get(main.__name__, "disable_modules_fs", False)
|
||||
else {
|
||||
"text": self.strings("suggest_fs"),
|
||||
"callback": self.inline__setting,
|
||||
"args": (
|
||||
"disable_modules_fs",
|
||||
True,
|
||||
),
|
||||
}
|
||||
)
|
||||
],
|
||||
[
|
||||
(
|
||||
{
|
||||
"text": self.strings("use_fs"),
|
||||
"callback": self.inline__setting,
|
||||
"args": (
|
||||
"permanent_modules_fs",
|
||||
False,
|
||||
),
|
||||
}
|
||||
if self._db.get(main.__name__, "permanent_modules_fs", False)
|
||||
else {
|
||||
"text": self.strings("do_not_use_fs"),
|
||||
"callback": self.inline__setting,
|
||||
"args": (
|
||||
"permanent_modules_fs",
|
||||
True,
|
||||
),
|
||||
}
|
||||
),
|
||||
],
|
||||
[
|
||||
(
|
||||
{
|
||||
"text": self.strings("suggest_subscribe"),
|
||||
"callback": self.inline__setting,
|
||||
"args": (
|
||||
"suggest_subscribe",
|
||||
False,
|
||||
),
|
||||
}
|
||||
if self._db.get(main.__name__, "suggest_subscribe", True)
|
||||
else {
|
||||
"text": self.strings("do_not_suggest_subscribe"),
|
||||
"callback": self.inline__setting,
|
||||
"args": (
|
||||
"suggest_subscribe",
|
||||
True,
|
||||
),
|
||||
}
|
||||
),
|
||||
],
|
||||
[
|
||||
(
|
||||
{
|
||||
"text": self.strings("no_custom_emojis"),
|
||||
"callback": self.inline__setting,
|
||||
"args": (
|
||||
lambda: main.save_config_key(
|
||||
"disable_custom_emojis", False
|
||||
),
|
||||
),
|
||||
}
|
||||
if main.get_config_key("disable_custom_emojis")
|
||||
else {
|
||||
"text": self.strings("custom_emojis"),
|
||||
"callback": self.inline__setting,
|
||||
"args": (
|
||||
lambda: main.save_config_key("disable_custom_emojis", True),
|
||||
),
|
||||
}
|
||||
),
|
||||
],
|
||||
[
|
||||
(
|
||||
{
|
||||
"text": self.strings("disable_debugger"),
|
||||
"callback": self.inline__setting,
|
||||
"args": (lambda: self._db.set(log.__name__, "debugger", False),),
|
||||
}
|
||||
if self._db.get(log.__name__, "debugger", False)
|
||||
else {
|
||||
"text": self.strings("enable_debugger"),
|
||||
"callback": self.inline__setting,
|
||||
"args": (lambda: self._db.set(log.__name__, "debugger", True),),
|
||||
}
|
||||
),
|
||||
],
|
||||
[
|
||||
{
|
||||
"text": self.strings("btn_restart"),
|
||||
"callback": self.inline__restart,
|
||||
"args": (True,),
|
||||
},
|
||||
{
|
||||
"text": self.strings("btn_update"),
|
||||
"callback": self.inline__update,
|
||||
"args": (True,),
|
||||
},
|
||||
],
|
||||
[{"text": self.strings("close_menu"), "action": "close"}],
|
||||
]
|
||||
|
||||
@loader.command()
|
||||
async def settings(self, message: Message):
|
||||
await self.inline.form(
|
||||
self.strings("inline_settings"),
|
||||
message=message,
|
||||
reply_markup=self._get_settings_markup(),
|
||||
)
|
||||
|
||||
@loader.command()
|
||||
async def weburl(self, message: Message, force: bool = False):
|
||||
if "LAVHOST" in os.environ:
|
||||
form = await self.inline.form(
|
||||
self.strings("lavhost_web"),
|
||||
message=message,
|
||||
reply_markup={
|
||||
"text": self.strings("web_btn"),
|
||||
"url": await main.hikka.web.get_url(proxy_pass=False),
|
||||
},
|
||||
gif="https://t.me/hikari_assets/28",
|
||||
)
|
||||
return
|
||||
|
||||
if (
|
||||
not force
|
||||
and not message.is_private
|
||||
and "force_insecure" not in message.raw_text.lower()
|
||||
):
|
||||
try:
|
||||
if not await self.inline.form(
|
||||
self.strings("privacy_leak_nowarn").format(self._client.tg_id),
|
||||
message=message,
|
||||
reply_markup=[
|
||||
{
|
||||
"text": self.strings("btn_yes"),
|
||||
"callback": self.weburl,
|
||||
"args": (True,),
|
||||
},
|
||||
{"text": self.strings("btn_no"), "action": "close"},
|
||||
],
|
||||
gif="https://i.gifer.com/embedded/download/Z5tS.gif",
|
||||
):
|
||||
raise Exception
|
||||
except Exception:
|
||||
await utils.answer(
|
||||
message,
|
||||
self.strings("privacy_leak").format(
|
||||
self._client.tg_id,
|
||||
utils.escape_html(self.get_prefix()),
|
||||
),
|
||||
)
|
||||
|
||||
return
|
||||
|
||||
if not main.hikka.web:
|
||||
main.hikka.web = core.Web(
|
||||
data_root=main.BASE_DIR,
|
||||
api_token=main.hikka.api_token,
|
||||
proxy=main.hikka.proxy,
|
||||
connection=main.hikka.conn,
|
||||
)
|
||||
await main.hikka.web.add_loader(self._client, self.allmodules, self._db)
|
||||
await main.hikka.web.start_if_ready(
|
||||
len(self.allclients),
|
||||
main.hikka.arguments.port,
|
||||
proxy_pass=main.hikka.arguments.proxy_pass,
|
||||
)
|
||||
|
||||
if force:
|
||||
form = message
|
||||
await form.edit(
|
||||
self.strings("opening_tunnel"),
|
||||
reply_markup={"text": "🕔 Wait...", "data": "empty"},
|
||||
gif=(
|
||||
"https://i.gifer.com/origin/e4/e43e1b221fd960003dc27d2f2f1b8ce1.gif"
|
||||
),
|
||||
)
|
||||
else:
|
||||
form = await self.inline.form(
|
||||
self.strings("opening_tunnel"),
|
||||
message=message,
|
||||
reply_markup={"text": "🕔 Wait...", "data": "empty"},
|
||||
gif=(
|
||||
"https://i.gifer.com/origin/e4/e43e1b221fd960003dc27d2f2f1b8ce1.gif"
|
||||
),
|
||||
)
|
||||
|
||||
url = await main.hikka.web.get_url(proxy_pass=True)
|
||||
|
||||
await form.edit(
|
||||
self.strings("tunnel_opened"),
|
||||
reply_markup={"text": self.strings("web_btn"), "url": url},
|
||||
gif="https://t.me/hikari_assets/48",
|
||||
)
|
||||
|
||||
def _get_all_IDM(self, module: str):
|
||||
return {
|
||||
getattr(getattr(self.lookup(module), name), "name", name): getattr(
|
||||
self.lookup(module), name
|
||||
)
|
||||
for name in dir(self.lookup(module))
|
||||
if getattr(getattr(self.lookup(module), name), "is_debug_method", False)
|
||||
}
|
||||
|
||||
@loader.command()
|
||||
async def invokecmd(self, message: Message):
|
||||
if not (args := utils.get_args_raw(message)) or len(args.split()) < 2:
|
||||
await utils.answer(message, self.strings("no_args"))
|
||||
return
|
||||
|
||||
module = args.split()[0]
|
||||
method = args.split(maxsplit=1)[1]
|
||||
|
||||
if module != "core" and not self.lookup(module):
|
||||
await utils.answer(message, self.strings("module404").format(module))
|
||||
return
|
||||
|
||||
if (
|
||||
module == "core"
|
||||
and method not in ALL_INVOKES
|
||||
or module != "core"
|
||||
and method not in self._get_all_IDM(module)
|
||||
):
|
||||
await utils.answer(message, self.strings("invoke404").format(method))
|
||||
return
|
||||
|
||||
message = await utils.answer(
|
||||
message, self.strings("invoking").format(method, module)
|
||||
)
|
||||
result = ""
|
||||
|
||||
if module == "core":
|
||||
if method == "flush_entity_cache":
|
||||
result = (
|
||||
f"Dropped {len(self._client._hikka_entity_cache)} cache records"
|
||||
)
|
||||
self._client._hikka_entity_cache = {}
|
||||
elif method == "flush_fulluser_cache":
|
||||
result = (
|
||||
f"Dropped {len(self._client._hikka_fulluser_cache)} cache records"
|
||||
)
|
||||
self._client._hikka_fulluser_cache = {}
|
||||
elif method == "flush_fullchannel_cache":
|
||||
result = (
|
||||
f"Dropped {len(self._client._hikka_fullchannel_cache)} cache"
|
||||
" records"
|
||||
)
|
||||
self._client._hikka_fullchannel_cache = {}
|
||||
elif method == "flush_perms_cache":
|
||||
result = f"Dropped {len(self._client._hikka_perms_cache)} cache records"
|
||||
self._client._hikka_perms_cache = {}
|
||||
elif method == "flush_loader_cache":
|
||||
result = (
|
||||
f"Dropped {await self.lookup('loader').flush_cache()} cache records"
|
||||
)
|
||||
elif method == "flush_cache":
|
||||
count = self.lookup("loader").flush_cache()
|
||||
result = (
|
||||
f"Dropped {len(self._client._hikka_entity_cache)} entity cache"
|
||||
" records\nDropped"
|
||||
f" {len(self._client._hikka_fulluser_cache)} fulluser cache"
|
||||
" records\nDropped"
|
||||
f" {len(self._client._hikka_fullchannel_cache)} fullchannel cache"
|
||||
" records\nDropped"
|
||||
f" {count} loader links cache records"
|
||||
)
|
||||
self._client._hikka_entity_cache = {}
|
||||
self._client._hikka_fulluser_cache = {}
|
||||
self._client._hikka_fullchannel_cache = {}
|
||||
self._client.hikka_me = await self._client.get_me()
|
||||
elif method == "reload_core":
|
||||
core_quantity = await self.lookup("loader").reload_core()
|
||||
result = f"Reloaded {core_quantity} core modules"
|
||||
elif method == "inspect_cache":
|
||||
result = (
|
||||
"Entity cache:"
|
||||
f" {len(self._client._hikka_entity_cache)} records\nFulluser cache:"
|
||||
f" {len(self._client._hikka_fulluser_cache)} records\nFullchannel"
|
||||
" cache:"
|
||||
f" {len(self._client._hikka_fullchannel_cache)} records\nLoader"
|
||||
f" links cache: {self.lookup('loader').inspect_cache()} records"
|
||||
)
|
||||
elif method == "inspect_modules":
|
||||
result = (
|
||||
"Loaded modules: {}\nLoaded core modules: {}\nLoaded user"
|
||||
" modules: {}"
|
||||
).format(
|
||||
len(self.allmodules.modules),
|
||||
sum(
|
||||
module.__origin__.startswith("<core")
|
||||
for module in self.allmodules.modules
|
||||
),
|
||||
sum(
|
||||
not module.__origin__.startswith("<core")
|
||||
for module in self.allmodules.modules
|
||||
),
|
||||
)
|
||||
else:
|
||||
result = await self._get_all_IDM(module)[method](message)
|
||||
|
||||
await utils.answer(
|
||||
message,
|
||||
self.strings("invoke").format(method, utils.escape_html(result)),
|
||||
)
|
Loading…
Reference in New Issue