From 2ba1a7ea695baa899ad7d64453171aac6b01551f Mon Sep 17 00:00:00 2001
From: Who? <155328415+coddrago@users.noreply.github.com>
Date: Thu, 26 Dec 2024 10:07:31 +0700
Subject: [PATCH] Delete hikka/modules/hikka_settings.py
---
hikka/modules/hikka_settings.py | 927 --------------------------------
1 file changed, 927 deletions(-)
delete mode 100644 hikka/modules/hikka_settings.py
diff --git a/hikka/modules/hikka_settings.py b/hikka/modules/hikka_settings.py
deleted file mode 100644
index 9565359..0000000
--- a/hikka/modules/hikka_settings.py
+++ /dev/null
@@ -1,927 +0,0 @@
-# ÂŠī¸ Dan Gazizullin, 2021-2023
-# This file is a part of Hikka Userbot
-# đ https://github.com/hikariatama/Hikka
-# You can redistribute it and/or modify it under the terms of the GNU AGPLv3
-# đ https://www.gnu.org/licenses/agpl-3.0.html
-
-import logging
-import os
-import random
-
-import hikkatl
-from hikkatl.tl.functions.messages import (
- GetDialogFiltersRequest,
- UpdateDialogFilterRequest,
-)
-from hikkatl.tl.types import Message
-from hikkatl.utils import get_display_name
-
-from .. import loader, log, main, utils
-from .._internal import fw_protect, restart
-from ..inline.types import InlineCall
-from ..web import core
-
-logger = logging.getLogger(__name__)
-
-ALL_INVOKES = [
- "flush_entity_cache",
- "flush_fulluser_cache",
- "flush_fullchannel_cache",
- "flush_perms_cache",
- "flush_loader_cache",
- "flush_cache",
- "reload_core",
- "inspect_cache",
- "inspect_modules",
-]
-
-
-@loader.tds
-class HikkaSettingsMod(loader.Module):
- """Advanced settings for Heroku Userbot"""
-
- strings = {"name": "HerokuSettings"}
-
- def get_watchers(self) -> tuple:
- return [
- str(watcher.__self__.__class__.strings["name"])
- for watcher in self.allmodules.watchers
- if watcher.__self__.__class__.strings is not None
- ], self._db.get(main.__name__, "disabled_watchers", {})
-
- async def _uninstall(self, call: InlineCall):
- await call.edit(self.strings("uninstall"))
-
- async with self._client.conversation("@BotFather") as conv:
- for msg in [
- "/deletebot",
- f"@{self.inline.bot_username}",
- "Yes, I am totally sure.",
- ]:
- await fw_protect()
- m = await conv.send_message(msg)
- r = await conv.get_response()
-
- logger.debug(">> %s", m.raw_text)
- logger.debug("<< %s", r.raw_text)
-
- await fw_protect()
-
- await m.delete()
- await r.delete()
-
- async for dialog in self._client.iter_dialogs(
- None,
- ignore_migrated=True,
- ):
- if (
- dialog.name
- in {
- "heroku-logs",
- "heroku-onload",
- "heroku-assets",
- "heroku-backups",
- "heroku-acc-switcher",
- "silent-tags",
- }
- and dialog.is_channel
- and (
- dialog.entity.participants_count == 1
- or dialog.entity.participants_count == 2
- and dialog.name in {"heroku-logs", "silent-tags"}
- )
- or (
- self._client.loader.inline.init_complete
- and dialog.entity.id == self._client.loader.inline.bot_id
- )
- ):
- await fw_protect()
- await self._client.delete_dialog(dialog.entity)
-
- await fw_protect()
-
- folders = await self._client(GetDialogFiltersRequest())
-
- if any(folder.title == "heroku" for folder in folders):
- folder_id = max(
- folders,
- key=lambda x: x.id,
- ).id
- await fw_protect()
- await self._client(UpdateDialogFilterRequest(id=folder_id))
-
- for handler in logging.getLogger().handlers:
- handler.setLevel(logging.CRITICAL)
-
- await fw_protect()
-
- await self._client.log_out()
-
- restart()
-
- async def _uninstall_confirm_step_2(self, call: InlineCall):
- await call.edit(
- self.strings("deauth_confirm_step2"),
- utils.chunks(
- list(
- sorted(
- [
- {
- "text": self.strings("deauth_yes"),
- "callback": self._uninstall,
- },
- *[
- {
- "text": self.strings(f"deauth_no_{i}"),
- "action": "close",
- }
- for i in range(1, 4)
- ],
- ],
- key=lambda _: random.random(),
- )
- ),
- 2,
- )
- + [
- [
- {
- "text": self.strings("deauth_cancel"),
- "action": "close",
- }
- ]
- ],
- )
-
- @loader.command()
- async def uninstall_heroku(self, message: Message):
- await self.inline.form(
- self.strings("deauth_confirm"),
- message,
- [
- {
- "text": self.strings("deauth_confirm_btn"),
- "callback": self._uninstall_confirm_step_2,
- },
- {"text": self.strings("deauth_cancel"), "action": "close"},
- ],
- )
-
- @loader.command()
- async def watchers(self, message: Message):
- watchers, disabled_watchers = self.get_watchers()
- watchers = [
- f"âģī¸ {watcher}"
- for watcher in watchers
- if watcher not in list(disabled_watchers.keys())
- ]
- watchers += [f"đĸ {k} {v}" for k, v in disabled_watchers.items()]
- await utils.answer(
- message, self.strings("watchers").format("\n".join(watchers))
- )
-
- @loader.command()
- async def watcherbl(self, message: Message):
- if not (args := utils.get_args_raw(message)):
- await utils.answer(message, self.strings("args"))
- return
-
- watchers, disabled_watchers = self.get_watchers()
-
- if args.lower() not in map(lambda x: x.lower(), watchers):
- await utils.answer(message, self.strings("mod404").format(args))
- return
-
- args = next((x.lower() == args.lower() for x in watchers), False)
-
- current_bl = [
- v for k, v in disabled_watchers.items() if k.lower() == args.lower()
- ]
- current_bl = current_bl[0] if current_bl else []
-
- chat = utils.get_chat_id(message)
- if chat not in current_bl:
- if args in disabled_watchers:
- for k in disabled_watchers:
- if k.lower() == args.lower():
- disabled_watchers[k].append(chat)
- break
- else:
- disabled_watchers[args] = [chat]
-
- await utils.answer(
- message,
- self.strings("disabled").format(args) + " in current chat",
- )
- else:
- for k in disabled_watchers.copy():
- if k.lower() == args.lower():
- disabled_watchers[k].remove(chat)
- if not disabled_watchers[k]:
- del disabled_watchers[k]
- break
-
- await utils.answer(
- message,
- self.strings("enabled").format(args) + " in current chat",
- )
-
- self._db.set(main.__name__, "disabled_watchers", disabled_watchers)
-
- @loader.command()
- async def watchercmd(self, message: Message):
- if not (args := utils.get_args_raw(message)):
- return await utils.answer(message, self.strings("args"))
-
- chats, pm, out, incoming = False, False, False, False
-
- if "-c" in args:
- args = args.replace("-c", "").replace(" ", " ").strip()
- chats = True
-
- if "-p" in args:
- args = args.replace("-p", "").replace(" ", " ").strip()
- pm = True
-
- if "-o" in args:
- args = args.replace("-o", "").replace(" ", " ").strip()
- out = True
-
- if "-i" in args:
- args = args.replace("-i", "").replace(" ", " ").strip()
- incoming = True
-
- if chats and pm:
- pm = False
- if out and incoming:
- incoming = False
-
- watchers, disabled_watchers = self.get_watchers()
-
- if args.lower() not in [watcher.lower() for watcher in watchers]:
- return await utils.answer(message, self.strings("mod404").format(args))
-
- args = [watcher for watcher in watchers if watcher.lower() == args.lower()][0]
-
- if chats or pm or out or incoming:
- disabled_watchers[args] = [
- *(["only_chats"] if chats else []),
- *(["only_pm"] if pm else []),
- *(["out"] if out else []),
- *(["in"] if incoming else []),
- ]
- self._db.set(main.__name__, "disabled_watchers", disabled_watchers)
- await utils.answer(
- message,
- self.strings("enabled").format(args)
- + f" ({disabled_watchers[args]}
)",
- )
- return
-
- if args in disabled_watchers and "*" in disabled_watchers[args]:
- await utils.answer(message, self.strings("enabled").format(args))
- del disabled_watchers[args]
- self._db.set(main.__name__, "disabled_watchers", disabled_watchers)
- return
-
- disabled_watchers[args] = ["*"]
- self._db.set(main.__name__, "disabled_watchers", disabled_watchers)
- await utils.answer(message, self.strings("disabled").format(args))
-
- @loader.command()
- async def nonickuser(self, message: Message):
- if not (reply := await message.get_reply_message()):
- await utils.answer(message, self.strings("reply_required"))
- return
-
- u = reply.sender_id
- if not isinstance(u, int):
- u = u.user_id
-
- nn = self._db.get(main.__name__, "nonickusers", [])
- if u not in nn:
- nn += [u]
- nn = list(set(nn)) # skipcq: PTC-W0018
- await utils.answer(message, self.strings("user_nn").format("on"))
- else:
- nn = list(set(nn) - {u})
- await utils.answer(message, self.strings("user_nn").format("off"))
-
- self._db.set(main.__name__, "nonickusers", nn)
-
- @loader.command()
- async def nonickchat(self, message: Message):
- if message.is_private:
- await utils.answer(message, self.strings("private_not_allowed"))
- return
-
- chat = utils.get_chat_id(message)
-
- nn = self._db.get(main.__name__, "nonickchats", [])
- if chat not in nn:
- nn += [chat]
- nn = list(set(nn)) # skipcq: PTC-W0018
- await utils.answer(
- message,
- self.strings("cmd_nn").format(
- utils.escape_html((await message.get_chat()).title),
- "on",
- ),
- )
- else:
- nn = list(set(nn) - {chat})
- await utils.answer(
- message,
- self.strings("cmd_nn").format(
- utils.escape_html((await message.get_chat()).title),
- "off",
- ),
- )
-
- self._db.set(main.__name__, "nonickchats", nn)
-
- @loader.command()
- async def nonickcmdcmd(self, message: Message):
- if not (args := utils.get_args_raw(message)):
- await utils.answer(message, self.strings("no_cmd"))
- return
-
- if args not in self.allmodules.commands:
- await utils.answer(message, self.strings("cmd404"))
- return
-
- nn = self._db.get(main.__name__, "nonickcmds", [])
- if args not in nn:
- nn += [args]
- nn = list(set(nn))
- await utils.answer(
- message,
- self.strings("cmd_nn").format(
- utils.escape_html(self.get_prefix() + args),
- "on",
- ),
- )
- else:
- nn = list(set(nn) - {args})
- await utils.answer(
- message,
- self.strings("cmd_nn").format(
- utils.escape_html(self.get_prefix() + args),
- "off",
- ),
- )
-
- self._db.set(main.__name__, "nonickcmds", nn)
-
- @loader.command()
- async def nonickcmds(self, message: Message):
- if not self._db.get(main.__name__, "nonickcmds", []):
- await utils.answer(message, self.strings("nothing"))
- return
-
- await utils.answer(
- message,
- self.strings("cmd_nn_list").format(
- "\n".join(
- [
- f"âĢī¸ {utils.escape_html(self.get_prefix() + cmd)}
"
- for cmd in self._db.get(main.__name__, "nonickcmds", [])
- ]
- )
- ),
- )
-
- @loader.command()
- async def nonickusers(self, message: Message):
- users = []
- for user_id in self._db.get(main.__name__, "nonickusers", []).copy():
- try:
- user = await self._client.get_entity(user_id)
- except Exception:
- self._db.set(
- main.__name__,
- "nonickusers",
- list(
- (
- set(self._db.get(main.__name__, "nonickusers", []))
- - {user_id}
- )
- ),
- )
-
- logger.warning("User %s removed from nonickusers list", user_id)
- continue
-
- users += [
- 'âĢī¸ {}'.format(
- user_id,
- utils.escape_html(get_display_name(user)),
- )
- ]
-
- if not users:
- await utils.answer(message, self.strings("nothing"))
- return
-
- await utils.answer(
- message,
- self.strings("user_nn_list").format("\n".join(users)),
- )
-
- @loader.command()
- async def nonickchats(self, message: Message):
- chats = []
- for chat in self._db.get(main.__name__, "nonickchats", []):
- try:
- chat_entity = await self._client.get_entity(int(chat))
- except Exception:
- self._db.set(
- main.__name__,
- "nonickchats",
- list(
- (set(self._db.get(main.__name__, "nonickchats", [])) - {chat})
- ),
- )
-
- logger.warning("Chat %s removed from nonickchats list", chat)
- continue
-
- chats += [
- 'âĢī¸ {}'.format(
- utils.get_entity_url(chat_entity),
- utils.escape_html(get_display_name(chat_entity)),
- )
- ]
-
- if not chats:
- await utils.answer(message, self.strings("nothing"))
- return
-
- await utils.answer(
- message,
- self.strings("user_nn_list").format("\n".join(chats)),
- )
-
- async def inline__setting(self, call: InlineCall, key: str, state: bool = False):
- if callable(key):
- key()
- hikkatl.extensions.html.CUSTOM_EMOJIS = not main.get_config_key(
- "disable_custom_emojis"
- )
- else:
- self._db.set(main.__name__, key, state)
-
- if key == "no_nickname" and state and self.get_prefix() == ".":
- await call.answer(
- self.strings("nonick_warning"),
- show_alert=True,
- )
- else:
- await call.answer("Configuration value saved!")
-
- await call.edit(
- self.strings("inline_settings"),
- reply_markup=self._get_settings_markup(),
- )
-
- async def inline__update(
- self,
- call: InlineCall,
- confirm_required: bool = False,
- ):
- if confirm_required:
- await call.edit(
- self.strings("confirm_update"),
- reply_markup=[
- {"text": "đĒ Update", "callback": self.inline__update},
- {"text": "đĢ Cancel", "action": "close"},
- ],
- )
- return
-
- await call.answer("You userbot is being updated...", show_alert=True)
- await call.delete()
- await self.invoke("update", "-f", peer="me")
-
- async def _remove_core_protection(self, call: InlineCall):
- self._db.set(main.__name__, "remove_core_protection", True)
- await call.edit(self.strings("core_protection_removed"))
-
- @loader.command()
- async def remove_core_protection(self, message: Message):
- if self._db.get(main.__name__, "remove_core_protection", False):
- await utils.answer(message, self.strings("core_protection_already_removed"))
- return
-
- await self.inline.form(
- message=message,
- text=self.strings("core_protection_confirm"),
- reply_markup=[
- {
- "text": self.strings("core_protection_btn"),
- "callback": self._remove_core_protection,
- },
- {
- "text": self.strings("btn_no"),
- "action": "close",
- },
- ],
- )
-
- async def inline__restart(
- self,
- call: InlineCall,
- confirm_required: bool = False,
- ):
- if confirm_required:
- await call.edit(
- self.strings("confirm_restart"),
- reply_markup=[
- {"text": "đ Restart", "callback": self.inline__restart},
- {"text": "đĢ Cancel", "action": "close"},
- ],
- )
- return
-
- await call.answer("You userbot is being restarted...", show_alert=True)
- await call.delete()
- await self.invoke("restart", "-f", peer="me")
-
- def _get_settings_markup(self) -> list:
- return [
- [
- (
- {
- "text": "â
NoNick",
- "callback": self.inline__setting,
- "args": (
- "no_nickname",
- False,
- ),
- }
- if self._db.get(main.__name__, "no_nickname", False)
- else {
- "text": "đĢ NoNick",
- "callback": self.inline__setting,
- "args": (
- "no_nickname",
- True,
- ),
- }
- ),
- (
- {
- "text": "â
Grep",
- "callback": self.inline__setting,
- "args": (
- "grep",
- False,
- ),
- }
- if self._db.get(main.__name__, "grep", False)
- else {
- "text": "đĢ Grep",
- "callback": self.inline__setting,
- "args": (
- "grep",
- True,
- ),
- }
- ),
- (
- {
- "text": "â
InlineLogs",
- "callback": self.inline__setting,
- "args": (
- "inlinelogs",
- False,
- ),
- }
- if self._db.get(main.__name__, "inlinelogs", True)
- else {
- "text": "đĢ InlineLogs",
- "callback": self.inline__setting,
- "args": (
- "inlinelogs",
- True,
- ),
- }
- ),
- ],
- [
- (
- {
- "text": self.strings("do_not_suggest_fs"),
- "callback": self.inline__setting,
- "args": (
- "disable_modules_fs",
- False,
- ),
- }
- if self._db.get(main.__name__, "disable_modules_fs", False)
- else {
- "text": self.strings("suggest_fs"),
- "callback": self.inline__setting,
- "args": (
- "disable_modules_fs",
- True,
- ),
- }
- )
- ],
- [
- (
- {
- "text": self.strings("use_fs"),
- "callback": self.inline__setting,
- "args": (
- "permanent_modules_fs",
- False,
- ),
- }
- if self._db.get(main.__name__, "permanent_modules_fs", False)
- else {
- "text": self.strings("do_not_use_fs"),
- "callback": self.inline__setting,
- "args": (
- "permanent_modules_fs",
- True,
- ),
- }
- ),
- ],
- [
- (
- {
- "text": self.strings("suggest_subscribe"),
- "callback": self.inline__setting,
- "args": (
- "suggest_subscribe",
- False,
- ),
- }
- if self._db.get(main.__name__, "suggest_subscribe", True)
- else {
- "text": self.strings("do_not_suggest_subscribe"),
- "callback": self.inline__setting,
- "args": (
- "suggest_subscribe",
- True,
- ),
- }
- ),
- ],
- [
- (
- {
- "text": self.strings("no_custom_emojis"),
- "callback": self.inline__setting,
- "args": (
- lambda: main.save_config_key(
- "disable_custom_emojis", False
- ),
- ),
- }
- if main.get_config_key("disable_custom_emojis")
- else {
- "text": self.strings("custom_emojis"),
- "callback": self.inline__setting,
- "args": (
- lambda: main.save_config_key("disable_custom_emojis", True),
- ),
- }
- ),
- ],
- [
- (
- {
- "text": self.strings("disable_debugger"),
- "callback": self.inline__setting,
- "args": (lambda: self._db.set(log.__name__, "debugger", False),),
- }
- if self._db.get(log.__name__, "debugger", False)
- else {
- "text": self.strings("enable_debugger"),
- "callback": self.inline__setting,
- "args": (lambda: self._db.set(log.__name__, "debugger", True),),
- }
- ),
- ],
- [
- {
- "text": self.strings("btn_restart"),
- "callback": self.inline__restart,
- "args": (True,),
- },
- {
- "text": self.strings("btn_update"),
- "callback": self.inline__update,
- "args": (True,),
- },
- ],
- [{"text": self.strings("close_menu"), "action": "close"}],
- ]
-
- @loader.command()
- async def settings(self, message: Message):
- await self.inline.form(
- self.strings("inline_settings"),
- message=message,
- reply_markup=self._get_settings_markup(),
- )
-
- @loader.command()
- async def weburl(self, message: Message, force: bool = False):
- if "LAVHOST" in os.environ:
- form = await self.inline.form(
- self.strings("lavhost_web"),
- message=message,
- reply_markup={
- "text": self.strings("web_btn"),
- "url": await main.hikka.web.get_url(proxy_pass=False),
- },
- gif="https://t.me/hikari_assets/28",
- )
- return
-
- if (
- not force
- and not message.is_private
- and "force_insecure" not in message.raw_text.lower()
- ):
- try:
- if not await self.inline.form(
- self.strings("privacy_leak_nowarn").format(self._client.tg_id),
- message=message,
- reply_markup=[
- {
- "text": self.strings("btn_yes"),
- "callback": self.weburl,
- "args": (True,),
- },
- {"text": self.strings("btn_no"), "action": "close"},
- ],
- gif="https://i.gifer.com/embedded/download/Z5tS.gif",
- ):
- raise Exception
- except Exception:
- await utils.answer(
- message,
- self.strings("privacy_leak").format(
- self._client.tg_id,
- utils.escape_html(self.get_prefix()),
- ),
- )
-
- return
-
- if not main.hikka.web:
- main.hikka.web = core.Web(
- data_root=main.BASE_DIR,
- api_token=main.hikka.api_token,
- proxy=main.hikka.proxy,
- connection=main.hikka.conn,
- )
- await main.hikka.web.add_loader(self._client, self.allmodules, self._db)
- await main.hikka.web.start_if_ready(
- len(self.allclients),
- main.hikka.arguments.port,
- proxy_pass=main.hikka.arguments.proxy_pass,
- )
-
- if force:
- form = message
- await form.edit(
- self.strings("opening_tunnel"),
- reply_markup={"text": "đ Wait...", "data": "empty"},
- gif=(
- "https://i.gifer.com/origin/e4/e43e1b221fd960003dc27d2f2f1b8ce1.gif"
- ),
- )
- else:
- form = await self.inline.form(
- self.strings("opening_tunnel"),
- message=message,
- reply_markup={"text": "đ Wait...", "data": "empty"},
- gif=(
- "https://i.gifer.com/origin/e4/e43e1b221fd960003dc27d2f2f1b8ce1.gif"
- ),
- )
-
- url = await main.hikka.web.get_url(proxy_pass=True)
-
- await form.edit(
- self.strings("tunnel_opened"),
- reply_markup={"text": self.strings("web_btn"), "url": url},
- gif="https://t.me/hikari_assets/48",
- )
-
- def _get_all_IDM(self, module: str):
- return {
- getattr(getattr(self.lookup(module), name), "name", name): getattr(
- self.lookup(module), name
- )
- for name in dir(self.lookup(module))
- if getattr(getattr(self.lookup(module), name), "is_debug_method", False)
- }
-
- @loader.command()
- async def invokecmd(self, message: Message):
- if not (args := utils.get_args_raw(message)) or len(args.split()) < 2:
- await utils.answer(message, self.strings("no_args"))
- return
-
- module = args.split()[0]
- method = args.split(maxsplit=1)[1]
-
- if module != "core" and not self.lookup(module):
- await utils.answer(message, self.strings("module404").format(module))
- return
-
- if (
- module == "core"
- and method not in ALL_INVOKES
- or module != "core"
- and method not in self._get_all_IDM(module)
- ):
- await utils.answer(message, self.strings("invoke404").format(method))
- return
-
- message = await utils.answer(
- message, self.strings("invoking").format(method, module)
- )
- result = ""
-
- if module == "core":
- if method == "flush_entity_cache":
- result = (
- f"Dropped {len(self._client._hikka_entity_cache)} cache records"
- )
- self._client._hikka_entity_cache = {}
- elif method == "flush_fulluser_cache":
- result = (
- f"Dropped {len(self._client._hikka_fulluser_cache)} cache records"
- )
- self._client._hikka_fulluser_cache = {}
- elif method == "flush_fullchannel_cache":
- result = (
- f"Dropped {len(self._client._hikka_fullchannel_cache)} cache"
- " records"
- )
- self._client._hikka_fullchannel_cache = {}
- elif method == "flush_perms_cache":
- result = f"Dropped {len(self._client._hikka_perms_cache)} cache records"
- self._client._hikka_perms_cache = {}
- elif method == "flush_loader_cache":
- result = (
- f"Dropped {await self.lookup('loader').flush_cache()} cache records"
- )
- elif method == "flush_cache":
- count = self.lookup("loader").flush_cache()
- result = (
- f"Dropped {len(self._client._hikka_entity_cache)} entity cache"
- " records\nDropped"
- f" {len(self._client._hikka_fulluser_cache)} fulluser cache"
- " records\nDropped"
- f" {len(self._client._hikka_fullchannel_cache)} fullchannel cache"
- " records\nDropped"
- f" {count} loader links cache records"
- )
- self._client._hikka_entity_cache = {}
- self._client._hikka_fulluser_cache = {}
- self._client._hikka_fullchannel_cache = {}
- self._client.hikka_me = await self._client.get_me()
- elif method == "reload_core":
- core_quantity = await self.lookup("loader").reload_core()
- result = f"Reloaded {core_quantity} core modules"
- elif method == "inspect_cache":
- result = (
- "Entity cache:"
- f" {len(self._client._hikka_entity_cache)} records\nFulluser cache:"
- f" {len(self._client._hikka_fulluser_cache)} records\nFullchannel"
- " cache:"
- f" {len(self._client._hikka_fullchannel_cache)} records\nLoader"
- f" links cache: {self.lookup('loader').inspect_cache()} records"
- )
- elif method == "inspect_modules":
- result = (
- "Loaded modules: {}\nLoaded core modules: {}\nLoaded user"
- " modules: {}"
- ).format(
- len(self.allmodules.modules),
- sum(
- module.__origin__.startswith("