Heroku/hikka/modules/hikka_settings.py

641 lines
25 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀ ▄▀█ ▀█▀ ▄▀█ █▀▄▀█ ▄▀█
# █▀█ █ █ █ █▀█ █▀▄ █ ▄ █▀█ █ █▀█ █ ▀ █ █▀█
#
# © Copyright 2022
#
# https://t.me/hikariatama
#
# 🔒 Licensed under the GNU GPLv3
# 🌐 https://www.gnu.org/licenses/agpl-3.0.html
# scope: inline
import logging
import atexit
import functools
import random
import sys
import os
from telethon.tl.types import Message
from .. import loader, main, utils
from ..inline.types import InlineCall
logger = logging.getLogger(__name__)
def restart(*argv):
os.execl(
sys.executable,
sys.executable,
"-m",
os.path.relpath(utils.get_base_dir()),
*argv,
)
@loader.tds
class HikkaSettingsMod(loader.Module):
"""Advanced settings for Hikka Userbot"""
strings = {
"name": "HikkaSettings",
"watchers": "👀 <b>Watchers:</b>\n\n<b>{}</b>",
"mod404": "🚫 <b>Watcher {} not found</b>",
"disabled": "👀 <b>Watcher {} is now <u>disabled</u></b>",
"enabled": "👀 <b>Watcher {} is now <u>enabled</u></b>",
"args": "🚫 <b>You need to specify watcher name</b>",
"user_nn": "🔰 <b>NoNick for this user is now {}</b>",
"no_cmd": "🔰 <b>Please, specify command to toggle NoNick for</b>",
"cmd_nn": "🔰 <b>NoNick for </b><code>{}</code><b> is now {}</b>",
"cmd404": "🔰 <b>Command not found</b>",
"inline_settings": "⚙️ <b>Here you can configure your Hikka settings</b>",
"confirm_update": "🧭 <b>Please, confirm that you want to update. Your userbot will be restarted</b>",
"confirm_restart": "🔄 <b>Please, confirm that you want to restart</b>",
"suggest_fs": "✅ Suggest FS for modules",
"do_not_suggest_fs": "🚫 Suggest FS for modules",
"use_fs": "✅ Always use FS for modules",
"do_not_use_fs": "🚫 Always use FS for modules",
"btn_restart": "🔄 Restart",
"btn_update": "🧭 Update",
"close_menu": "😌 Close menu",
"download_btn": "✅ Download via button",
"no_download_btn": "🚫 Download via button",
"private_not_allowed": "🚫 <b>This command must be executed in chat</b>",
"nonick_warning": (
"Warning! You enabled NoNick with default prefix! "
"You may get muted in Hikka chats. Change prefix or "
"disable NoNick!"
),
"reply_required": "🚫 <b>Reply to a message of user, which needs to be added to NoNick</b>",
"deauth_confirm": "⚠️ <b>This action will fully remove Hikka from this account and can't be reverted!</b>",
"deauth_confirm_step2": "⚠️ <b>Are you really sure you want to delete Hikka?</b>",
"deauth_yes": "I'm sure",
"deauth_no_1": "I'm not sure",
"deauth_no_2": "I'm uncertain",
"deauth_no_3": "I'm struggling to answer",
"deauth_cancel": "🚫 Cancel",
"deauth_confirm_btn": "😢 Delete",
"undinstall": "😢 <b>Uninstalling Hikka...</b>",
"uninstalled": "😢 <b>Hikka uninstalled. Web interface is still active, you can add another account</b>",
"logs_cleared": "🗑 <b>Logs cleared</b>",
}
strings_ru = {
"watchers": "👀 <b>Смотрители:</b>\n\n<b>{}</b>",
"mod404": "🚫 <b>Смотритель {} не найден</b>",
"disabled": "👀 <b>Смотритель {} теперь <u>выключен</u></b>",
"enabled": "👀 <b>Смотритель {} теперь <u>включен</u></b>",
"args": "🚫 <b>Укажи имя смотрителя</b>",
"user_nn": "🔰 <b>Состояние NoNick для этого пользователя: {}</b>",
"no_cmd": "🔰 <b>Укажи команду, для которой надо включить\\выключить NoNick</b>",
"cmd_nn": "🔰 <b>Состояние NoNick для </b><code>{}</code><b>: {}</b>",
"cmd404": "🔰 <b>Команда не найдена</b>",
"inline_settings": "⚙️ <b>Здесь можно управлять настройками Hikka</b>",
"confirm_update": "🧭 <b>Подтвердите обновление. Юзербот будет перезагружен</b>",
"confirm_restart": "🔄 <b>Подтвердите перезагрузку</b>",
"suggest_fs": "✅ Предлагать сохранение модулей",
"do_not_suggest_fs": "🚫 Предлагать сохранение модулей",
"use_fs": "✅ Всегда сохранять модули",
"do_not_use_fs": "🚫 Всегда сохранять модули",
"btn_restart": "🔄 Перезагрузка",
"btn_update": "🧭 Обновление",
"close_menu": "😌 Закрыть меню",
"download_btn": "✅ Скачивать кнопкой",
"no_download_btn": "🚫 Скачивать кнопкой",
"private_not_allowed": "🚫 <b>Эту команду нужно выполнять в чате</b>",
"_cmd_doc_watchers": "Показать список смотрителей",
"_cmd_doc_watcherbl": "<модуль> - Включить\\выключить смотритель в чате",
"_cmd_doc_watcher": "<модуль> - Управление глобальными правилами смотрителя\nАргументы:\n[-c - только в чатаъ]\n[-p - только в лс]\n[-o - только исходящие]\n[-i - только входящие]",
"_cmd_doc_nonickuser": "Разрешить пользователю выполнять какую-то команду без ника",
"_cmd_doc_nonickcmd": "Разрешить выполнять определенную команду без ника",
"_cls_doc": "Дополнительные настройки Hikka",
"nonick_warning": (
"Внимание! Ты включил NoNick со стандартным префиксом! "
"Тебя могут замьютить в чатах Hikka. Измени префикс или "
"отключи глобальный NoNick!"
),
"reply_required": "🚫 <b>Ответь на сообщение пользователя, для которого нужно включить NoNick</b>",
"deauth_confirm": "⚠️ <b>Это действие полностью удалит Hikka с этого аккаунта! Его нельзя отменить</b>",
"deauth_confirm_step2": "⚠️ <b>Ты точно уверен, что хочешь удалить Hikka?</b>",
"deauth_yes": "Я уверен",
"deauth_no_1": "Я не уверен",
"deauth_no_2": "Не точно",
"deauth_no_3": "Нет",
"deauth_cancel": "🚫 Отмена",
"deauth_confirm_btn": "😢 Уда1лить",
"uninstall": "😢 <b>Удаляю Hikka...</b>",
"uninstalled": "😢 <b>Hikka удалена. Веб-интерфейс все еще активен, можно добавить другие аккаунты!</b>",
"logs_cleared": "🗑 <b>Логи очищены</b>",
}
def get_watchers(self) -> tuple:
return [
str(watcher.__self__.__class__.strings["name"])
for watcher in self.allmodules.watchers
if watcher.__self__.__class__.strings is not None
], self._db.get(main.__name__, "disabled_watchers", {})
async def client_ready(self, client, db):
self._db = db
self._client = client
async def _uninstall(self, call: InlineCall):
await call.edit(self.strings("uninstall"))
for handler in logging.getLogger().handlers:
handler.setLevel(logging.CRITICAL)
await self._client.log_out()
await call.edit(self.strings("uninstalled"))
if "LAVHOST" in os.environ:
os.system("lavhost restart")
return
atexit.register(functools.partial(restart, *sys.argv[1:]))
sys.exit(0)
async def _uninstall_confirm_step_2(self, call: InlineCall):
await call.edit(
self.strings("deauth_confirm_step2"),
utils.chunks(
list(
sorted(
[
{
"text": self.strings("deauth_yes"),
"callback": self._uninstall,
},
{
"text": self.strings("deauth_no_1"),
"callback": self.inline__close,
},
{
"text": self.strings("deauth_no_2"),
"callback": self.inline__close,
},
{
"text": self.strings("deauth_no_3"),
"callback": self.inline__close,
},
],
key=lambda _: random.random(),
)
),
2,
)
+ [
[
{
"text": self.strings("deauth_cancel"),
"callback": self.inline__close,
}
]
],
)
async def uninstall_hikkacmd(self, message: Message):
"""Uninstall Hikka"""
await self.inline.form(
self.strings("deauth_confirm"),
message,
[
{
"text": self.strings("deauth_confirm_btn"),
"callback": self._uninstall_confirm_step_2,
},
{"text": self.strings("deauth_cancel"), "callback": self.inline__close},
],
)
async def clearlogscmd(self, message: Message):
"""Clear logs"""
for handler in logging.getLogger().handlers:
handler.buffer = []
handler.handledbuffer = []
handler.tg_buff = ""
await utils.answer(message, self.strings("logs_cleared"))
async def watcherscmd(self, message: Message):
"""List current watchers"""
watchers, disabled_watchers = self.get_watchers()
watchers = [
f"♻️ {watcher}" for watcher in watchers if watcher not in list(disabled_watchers.keys())
]
watchers += [f"💢 {k} {v}" for k, v in disabled_watchers.items()]
await utils.answer(
message, self.strings("watchers").format("\n".join(watchers))
)
async def watcherblcmd(self, message: Message):
"""<module> - Toggle watcher in current chat"""
args = utils.get_args_raw(message)
if not args:
return await utils.answer(message, self.strings("args"))
watchers, disabled_watchers = self.get_watchers()
if args.lower() not in [watcher.lower() for watcher in watchers]:
return await utils.answer(message, self.strings("mod404").format(args))
args = [watcher for watcher in watchers if watcher.lower() == args.lower()][0]
current_bl = [
v for k, v in disabled_watchers.items() if k.lower() == args.lower()
]
current_bl = current_bl[0] if current_bl else []
chat = utils.get_chat_id(message)
if chat not in current_bl:
if args in disabled_watchers:
for k, _ in disabled_watchers.items():
if k.lower() == args.lower():
disabled_watchers[k].append(chat)
break
else:
disabled_watchers[args] = [chat]
await utils.answer(
message,
self.strings("disabled").format(args) + " <b>in current chat</b>",
)
else:
for k in disabled_watchers.copy():
if k.lower() == args.lower():
disabled_watchers[k].remove(chat)
if not disabled_watchers[k]:
del disabled_watchers[k]
break
await utils.answer(
message,
self.strings("enabled").format(args) + " <b>in current chat</b>",
)
self._db.set(main.__name__, "disabled_watchers", disabled_watchers)
async def watchercmd(self, message: Message):
"""<module> - Toggle global watcher rules
Args:
[-c - only in chats]
[-p - only in pm]
[-o - only out]
[-i - only incoming]"""
args = utils.get_args_raw(message)
if not args:
return await utils.answer(message, self.strings("args"))
chats, pm, out, incoming = False, False, False, False
if "-c" in args:
args = args.replace("-c", "").replace(" ", " ").strip()
chats = True
if "-p" in args:
args = args.replace("-p", "").replace(" ", " ").strip()
pm = True
if "-o" in args:
args = args.replace("-o", "").replace(" ", " ").strip()
out = True
if "-i" in args:
args = args.replace("-i", "").replace(" ", " ").strip()
incoming = True
if chats and pm:
pm = False
if out and incoming:
incoming = False
watchers, disabled_watchers = self.get_watchers()
if args.lower() not in [watcher.lower() for watcher in watchers]:
return await utils.answer(message, self.strings("mod404").format(args))
args = [watcher for watcher in watchers if watcher.lower() == args.lower()][0]
if chats or pm or out or incoming:
disabled_watchers[args] = [
*(["only_chats"] if chats else []),
*(["only_pm"] if pm else []),
*(["out"] if out else []),
*(["in"] if incoming else []),
]
self._db.set(main.__name__, "disabled_watchers", disabled_watchers)
await utils.answer(
message,
self.strings("enabled").format(args)
+ f" (<code>{disabled_watchers[args]}</code>)",
)
return
if args in disabled_watchers and "*" in disabled_watchers[args]:
await utils.answer(message, self.strings("enabled").format(args))
del disabled_watchers[args]
self._db.set(main.__name__, "disabled_watchers", disabled_watchers)
return
disabled_watchers[args] = ["*"]
self._db.set(main.__name__, "disabled_watchers", disabled_watchers)
await utils.answer(message, self.strings("disabled").format(args))
async def nonickusercmd(self, message: Message):
"""Allow no nickname for certain user"""
reply = await message.get_reply_message()
if not reply:
await utils.answer(message, self.strings("reply_required"))
return
u = reply.sender_id
if not isinstance(u, int):
u = u.user_id
nn = self._db.get(main.__name__, "nonickusers", [])
if u not in nn:
nn += [u]
nn = list(set(nn)) # skipcq: PTC-W0018
await utils.answer(message, self.strings("user_nn").format("on"))
else:
nn = list(set(nn) - set([u])) # skipcq: PTC-W0018
await utils.answer(message, self.strings("user_nn").format("off"))
self._db.set(main.__name__, "nonickusers", nn)
async def nonickchatcmd(self, message: Message):
"""Allow no nickname in certain chat"""
if message.is_private:
await utils.answer(message, self.strings("private_not_allowed"))
return
chat = utils.get_chat_id(message)
nn = self._db.get(main.__name__, "nonickchats", [])
if chat not in nn:
nn += [chat]
nn = list(set(nn)) # skipcq: PTC-W0018
await utils.answer(
message,
self.strings("cmd_nn").format(
utils.escape_html((await message.get_chat()).title),
"on",
),
)
else:
nn = list(set(nn) - set([chat])) # skipcq: PTC-W0018
await utils.answer(
message,
self.strings("cmd_nn").format(
utils.escape_html((await message.get_chat()).title),
"off",
),
)
self._db.set(main.__name__, "nonickchats", nn)
async def nonickcmdcmd(self, message: Message):
"""Allow certain command to be executed without nickname"""
args = utils.get_args_raw(message)
if not args:
return await utils.answer(message, self.strings("no_cmd"))
if args not in self.allmodules.commands:
return await utils.answer(message, self.strings("cmd404"))
nn = self._db.get(main.__name__, "nonickcmds", [])
if args not in nn:
nn += [args]
nn = list(set(nn))
await utils.answer(
message,
self.strings("cmd_nn").format(
self._db.get(main.__name__, "command_prefix", ".") + args, "on"
),
)
else:
nn = list(set(nn) - set([args])) # skipcq: PTC-W0018
await utils.answer(
message,
self.strings("cmd_nn").format(
self._db.get(main.__name__, "command_prefix", ".") + args,
"off",
),
)
self._db.set(main.__name__, "nonickcmds", nn)
async def inline__setting(self, call: InlineCall, key: str, state: bool):
self._db.set(main.__name__, key, state)
if (
key == "no_nickname"
and state
and self._db.get(main.__name__, "command_prefix", ".") == "."
):
await call.answer(
self.strings("nonick_warning"),
show_alert=True,
)
else:
await call.answer("Configuration value saved!")
await call.edit(
self.strings("inline_settings"),
reply_markup=self._get_settings_markup(),
)
async def inline__close(self, call: InlineCall):
await call.delete()
async def inline__update(
self,
call: InlineCall,
confirm_required: bool = False,
):
if confirm_required:
await call.edit(
self.strings("confirm_update"),
reply_markup=[
{"text": "🪂 Update", "callback": self.inline__update},
{"text": "🚫 Cancel", "callback": self.inline__close},
],
)
return
await call.answer("You userbot is being updated...", show_alert=True)
await call.delete()
m = await self._client.send_message("me", f"{self.get_prefix()}update --force")
await self.allmodules.commands["update"](m)
async def inline__restart(
self,
call: InlineCall,
confirm_required: bool = False,
):
if confirm_required:
await call.edit(
self.strings("confirm_restart"),
reply_markup=[
{"text": "🔄 Restart", "callback": self.inline__restart},
{"text": "🚫 Cancel", "callback": self.inline__close},
],
)
return
await call.answer("You userbot is being restarted...", show_alert=True)
await call.delete()
m = await self._client.send_message("me", f"{self.get_prefix()}restart --force")
await self.allmodules.commands["restart"](m)
def _get_settings_markup(self) -> list:
return [
[
(
{
"text": "✅ NoNick",
"callback": self.inline__setting,
"args": (
"no_nickname",
False,
),
}
if self._db.get(main.__name__, "no_nickname", False)
else {
"text": "🚫 NoNick",
"callback": self.inline__setting,
"args": (
"no_nickname",
True,
),
}
),
(
{
"text": "✅ Grep",
"callback": self.inline__setting,
"args": (
"grep",
False,
),
}
if self._db.get(main.__name__, "grep", False)
else {
"text": "🚫 Grep",
"callback": self.inline__setting,
"args": (
"grep",
True,
),
}
),
(
{
"text": "✅ InlineLogs",
"callback": self.inline__setting,
"args": (
"inlinelogs",
False,
),
}
if self._db.get(main.__name__, "inlinelogs", True)
else {
"text": "🚫 InlineLogs",
"callback": self.inline__setting,
"args": (
"inlinelogs",
True,
),
}
),
],
[
(
{
"text": self.strings("suggest_fs"),
"callback": self.inline__setting,
"args": (
"disable_modules_fs",
True,
),
}
if not self._db.get(main.__name__, "disable_modules_fs", False)
else {
"text": self.strings("do_not_suggest_fs"),
"callback": self.inline__setting,
"args": (
"disable_modules_fs",
False,
),
}
),
],
[
(
{
"text": self.strings("use_fs"),
"callback": self.inline__setting,
"args": (
"permanent_modules_fs",
False,
),
}
if self._db.get(main.__name__, "permanent_modules_fs", False)
else {
"text": self.strings("do_not_use_fs"),
"callback": self.inline__setting,
"args": (
"permanent_modules_fs",
True,
),
}
),
],
[
(
{
"text": self.strings("download_btn"),
"callback": self.inline__setting,
"args": (
"use_dl_btn",
False,
),
}
if self._db.get(main.__name__, "use_dl_btn", True)
else {
"text": self.strings("no_download_btn"),
"callback": self.inline__setting,
"args": (
"use_dl_btn",
True,
),
}
),
],
[
{
"text": self.strings("btn_restart"),
"callback": self.inline__restart,
"args": (True,),
},
{
"text": self.strings("btn_update"),
"callback": self.inline__update,
"args": (True,),
},
],
[{"text": self.strings("close_menu"), "callback": self.inline__close}],
]
@loader.owner
async def settingscmd(self, message: Message):
"""Show settings menu"""
await self.inline.form(
self.strings("inline_settings"),
message=message,
reply_markup=self._get_settings_markup(),
)