# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
# █▀█ █ █ █ █▀█ █▀▄ █
# © Copyright 2022
# https://t.me/hikariatama
#
# 🔒 Licensed under the GNU AGPLv3
# 🌐 https://www.gnu.org/licenses/agpl-3.0.html
# scope: inline
import logging
import time
from typing import List, Union
from telethon.tl.types import Message, PeerUser, User
from telethon.utils import get_display_name
from telethon.hints import EntityLike
from .. import loader, security, utils, main
from ..inline.types import InlineCall, InlineMessage
from ..security import (
DEFAULT_PERMISSIONS,
EVERYONE,
GROUP_ADMIN,
GROUP_ADMIN_ADD_ADMINS,
GROUP_ADMIN_BAN_USERS,
GROUP_ADMIN_CHANGE_INFO,
GROUP_ADMIN_DELETE_MESSAGES,
GROUP_ADMIN_INVITE_USERS,
GROUP_ADMIN_PIN_MESSAGES,
GROUP_MEMBER,
GROUP_OWNER,
PM,
SUDO,
SUPPORT,
)
logger = logging.getLogger(__name__)
@loader.tds
class HikkaSecurityMod(loader.Module):
"""Control security settings"""
service_strings = {
"for": "for",
"forever": "forever",
"user": "user",
"chat": "chat",
"command": "command",
"module": "module",
"day": "day",
"days": "days",
"hour": "hour",
"hours": "hours",
"minute": "minute",
"minutes": "minutes",
"second": "second",
"seconds": "seconds",
}
service_strings_ru = {
"for": "на",
"forever": "навсегда",
"command": "команду",
"module": "модуль",
"chat": "чату",
"user": "пользователю",
"day": "день",
"days": "дня(-ей)",
"hour": "час",
"hours": "часа(-ов)",
"minute": "минута",
"minutes": "минут(-ы)",
"second": "секунда",
"seconds": "секунд(-ы)",
}
strings = {
"name": "HikkaSecurity",
"no_command": "🚫 Command {}
not found!",
"permissions": (
"🔐 Here you can configure permissions for {}{}
"
),
"close_menu": "🙈 Close this menu",
"global": (
"🔐 Here you can configure global bounding mask. If the permission is"
" excluded here, it is excluded everywhere!"
),
"owner": "😎 Owner",
"sudo": "🧐 Sudo",
"support": "🤓 Support",
"group_owner": "🧛♂️ Group owner",
"group_admin_add_admins": "🧑⚖️ Admin (add members)",
"group_admin_change_info": "🧑⚖️ Admin (change info)",
"group_admin_ban_users": "🧑⚖️ Admin (ban)",
"group_admin_delete_messages": "🧑⚖️ Admin (delete msgs)",
"group_admin_pin_messages": "🧑⚖️ Admin (pin)",
"group_admin_invite_users": "🧑⚖️ Admin (invite)",
"group_admin": "🧑⚖️ Admin (any)",
"group_member": "👥 In group",
"pm": "🤙 In PM",
"everyone": "🌍 Everyone (Inline)",
"owner_list": (
"😎 Users in group"
" owner
:\n\n{}"
),
"sudo_list": (
"🧐 Users in group"
" sudo
:\n\n{}"
),
"support_list": (
"🤓 Users in group"
" support
:\n\n{}"
),
"no_owner": (
"😎 There is no users in"
" group owner
"
),
"no_sudo": (
"🧐 There is no users in"
" group sudo
"
),
"no_support": (
"🤓 There is no users in"
" group support
"
),
"owner_added": (
'😎 {} added to group owner
'
),
"sudo_added": (
'🧐 {} added to group sudo
'
),
"support_added": (
'🤓 {} added to group support
'
),
"owner_removed": (
'😎 {} removed from group owner
'
),
"sudo_removed": (
'🧐 {} removed from group sudo
'
),
"support_removed": (
'🤓 {} removed from group support
'
),
"no_user": (
"🚫 Specify user to"
" permit"
),
"not_a_user": (
"🚫 Specified entity is"
" not a user"
),
"li": '⦿ {}',
"warning": (
"⚠️ Please, confirm, that you want to add {} to group {}
!\nThis'
" action may reveal personal info and grant full or partial access to"
" userbot to this user"
),
"cancel": "🚫 Cancel",
"confirm": "👑 Confirm",
"enable_nonick_btn": "🔰 Enable",
"self": (
"🚫 You can't"
" promote/demote yourself!"
),
"suggest_nonick": "🔰 Do you want to enable NoNick for this user?",
"user_nn": '🔰 NoNick for {} enabled',
"what": (
"🚫 You need to specify"
" the type of target as first argument (user
or"
" chat
)"
),
"no_target": (
"🚫 You didn't specify"
" the target of security rule"
),
"no_rule": (
"🚫 You didn't specify"
" the rule (module or command)"
),
"confirm_rule": (
"🔐 Please, confirm that you want to give {} {} a"
" permission to use {} {}
{}?"
),
"rule_added": (
"🔐 You gave {} {} a"
" permission to use {} {}
{}"
),
"confirm_btn": "👑 Confirm",
"cancel_btn": "🚫 Cancel",
"multiple_rules": (
"🔐 Unable to unambiguously determine the security rule. Please, choose"
" the one you meant:\n\n{}"
),
"rules": (
"🔐 Targeted security"
" rules:\n\n{}"
),
"no_rules": (
"🚫 No targeted security"
" rules"
),
"owner_target": (
"🚫 This user is owner"
" and can't be promoted by targeted security"
),
"rules_removed": (
"🔐 Targeted security"
' rules for {} removed'
),
**service_strings,
}
strings_ru = {
"no_command": (
"🚫 Команда"
" {}
не найдена!"
),
"permissions": (
"🔐 Здесь можно настроить разрешения для команды {}{}
"
),
"close_menu": "🙈 Закрыть это меню",
"global": (
"🔐 Здесь можно настроить глобальную исключающую маску. Если тумблер"
" выключен здесь, он выключен для всех команд"
),
"owner": "😎 Владелец",
"sudo": "🧐 Sudo",
"support": "🤓 Помощник",
"group_owner": "🧛♂️ Влад. группы",
"group_admin_add_admins": "🧑⚖️ Админ (добавлять участников)",
"group_admin_change_info": "🧑⚖️ Админ (изменять инфо)",
"group_admin_ban_users": "🧑⚖️ Админ (банить)",
"group_admin_delete_messages": "🧑⚖️ Админ (удалять сообщения)",
"group_admin_pin_messages": "🧑⚖️ Админ (закреплять)",
"group_admin_invite_users": "🧑⚖️ Админ (приглашать)",
"group_admin": "🧑⚖️ Админ (любой)",
"group_member": "👥 В группе",
"pm": "🤙 В лс",
"owner_list": (
"😎 Пользователи группы"
" owner
:\n\n{}"
),
"sudo_list": (
"🧐 Пользователи группы"
" sudo
:\n\n{}"
),
"support_list": (
"🤓 Пользователи группы"
" support
:\n\n{}"
),
"no_owner": (
"😎 Нет пользователей в"
" группе owner
"
),
"no_sudo": (
"🧐 Нет пользователей в"
" группе sudo
"
),
"no_support": (
"🤓 Нет пользователей в"
" группе support
"
),
"no_user": (
"🚫 Укажи, кому выдавать"
" права"
),
"not_a_user": (
"🚫 Указанная цель - не"
" пользователь"
),
"cancel": "🚫 Отмена",
"confirm": "👑 Подтвердить",
"self": (
"🚫 Нельзя управлять"
" своими правами!"
),
"warning": (
'⚠️ Ты действительно хочешь добавить {} в'
" группу {}
!\nЭто действие может передать частичный или"
" полный доступ к юзерботу этому пользователю!"
),
"suggest_nonick": (
"🔰 Хочешь ли ты включить NoNick для этого пользователя?"
),
"user_nn": '🔰 NoNick для {} включен',
"enable_nonick_btn": "🔰 Включить",
"owner_added": (
'😎 {} добавлен в группу owner
'
),
"sudo_added": (
'🧐 {} добавлен в группу sudo
'
),
"support_added": (
'🤓 {} добавлен в группу support
'
),
"owner_removed": (
'😎 {} удален из группы owner
'
),
"sudo_removed": (
'🧐 {} удален из группы sudo
'
),
"support_removed": (
'🤓 {} удален из группы support
'
),
"_cls_doc": "Управление настройками безопасности",
"what": (
"🚫 Вам нужно указать"
" тип цели первым аргументов (user
or"
" chat
)"
),
"no_target": (
"🚫 Не указана цель"
" правила безопасности"
),
"no_rule": (
"🚫 Не указано правило"
" безопасности (модуль или команда)"
),
"confirm_rule": (
"🔐 Пожалуйста, подтвердите что хотите выдать {} {}"
" право использовать {} {}
{}"
),
"multiple_rules": (
"🔐 Не получилось однозначно распознать правила безопасности. Выберите"
" то, которое имели ввиду:\n\n{}"
),
"rule_added": (
"🔐 Вы выдали {} {} право"
" использовать {} {}
{}"
),
"rules": (
"🔐 Таргетированные"
" правила безопасности:\n\n{}"
),
"no_rules": (
"🚫 Нет таргетированных"
" правил безопасности"
),
"owner_target": (
"🚫 Этот пользователь -"
" владелец, его права не могут управляться таргетированной"
" безопасностью"
),
"rules_removed": (
"🔐 Правила"
' таргетированной безопасности для {} удалены'
),
**service_strings_ru,
}
async def inline__switch_perm(
self,
call: InlineCall,
command: str,
group: str,
level: bool,
is_inline: bool,
):
cmd = (
self.allmodules.inline_handlers[command]
if is_inline
else self.allmodules.commands[command]
)
mask = self._db.get(security.__name__, "masks", {}).get(
f"{cmd.__module__}.{cmd.__name__}",
getattr(cmd, "security", security.DEFAULT_PERMISSIONS),
)
bit = security.BITMAP[group.upper()]
if level:
mask |= bit
else:
mask &= ~bit
masks = self._db.get(security.__name__, "masks", {})
masks[f"{cmd.__module__}.{cmd.__name__}"] = mask
self._db.set(security.__name__, "masks", masks)
if (
not self._db.get(security.__name__, "bounding_mask", DEFAULT_PERMISSIONS)
& bit
and level
):
await call.answer(
"Security value set but not applied. Consider enabling this value in"
f" .{'inlinesec' if is_inline else 'security'}",
show_alert=True,
)
else:
await call.answer("Security value set!")
await call.edit(
self.strings("permissions").format(
f"@{self.inline.bot_username} " if is_inline else self.get_prefix(),
command,
),
reply_markup=self._build_markup(cmd, is_inline),
)
async def inline__switch_perm_bm(
self,
call: InlineCall,
group: str,
level: bool,
is_inline: bool,
):
mask = self._db.get(security.__name__, "bounding_mask", DEFAULT_PERMISSIONS)
bit = security.BITMAP[group.upper()]
if level:
mask |= bit
else:
mask &= ~bit
self._db.set(security.__name__, "bounding_mask", mask)
await call.answer("Bounding mask value set!")
await call.edit(
self.strings("global"),
reply_markup=self._build_markup_global(is_inline),
)
def _build_markup(
self,
command: callable,
is_inline: bool = False,
) -> List[List[dict]]:
perms = self._get_current_perms(command, is_inline)
return (
utils.chunks(
[
{
"text": f"{'✅' if level else '🚫'} {self.strings[group]}",
"callback": self.inline__switch_perm,
"args": (
command.__name__.rsplit("_inline_handler", maxsplit=1)[0],
group,
not level,
is_inline,
),
}
for group, level in perms.items()
],
2,
)
+ [[{"text": self.strings("close_menu"), "action": "close"}]]
if is_inline
else utils.chunks(
[
{
"text": f"{'✅' if level else '🚫'} {self.strings[group]}",
"callback": self.inline__switch_perm,
"args": (
command.__name__.rsplit("cmd", maxsplit=1)[0],
group,
not level,
is_inline,
),
}
for group, level in perms.items()
],
2,
)
+ [
[
{
"text": self.strings("close_menu"),
"action": "close",
}
]
]
)
def _build_markup_global(self, is_inline: bool = False) -> List[List[dict]]:
perms = self._get_current_bm(is_inline)
return utils.chunks(
[
{
"text": f"{'✅' if level else '🚫'} {self.strings[group]}",
"callback": self.inline__switch_perm_bm,
"args": (group, not level, is_inline),
}
for group, level in perms.items()
],
2,
) + [[{"text": self.strings("close_menu"), "action": "close"}]]
def _get_current_bm(self, is_inline: bool = False) -> dict:
return self._perms_map(
self._db.get(security.__name__, "bounding_mask", DEFAULT_PERMISSIONS),
is_inline,
)
@staticmethod
def _perms_map(perms: int, is_inline: bool) -> dict:
return (
{
"sudo": bool(perms & SUDO),
"support": bool(perms & SUPPORT),
"everyone": bool(perms & EVERYONE),
}
if is_inline
else {
"sudo": bool(perms & SUDO),
"support": bool(perms & SUPPORT),
"group_owner": bool(perms & GROUP_OWNER),
"group_admin_add_admins": bool(perms & GROUP_ADMIN_ADD_ADMINS),
"group_admin_change_info": bool(perms & GROUP_ADMIN_CHANGE_INFO),
"group_admin_ban_users": bool(perms & GROUP_ADMIN_BAN_USERS),
"group_admin_delete_messages": bool(
perms & GROUP_ADMIN_DELETE_MESSAGES
),
"group_admin_pin_messages": bool(perms & GROUP_ADMIN_PIN_MESSAGES),
"group_admin_invite_users": bool(perms & GROUP_ADMIN_INVITE_USERS),
"group_admin": bool(perms & GROUP_ADMIN),
"group_member": bool(perms & GROUP_MEMBER),
"pm": bool(perms & PM),
"everyone": bool(perms & EVERYONE),
}
)
def _get_current_perms(
self,
command: callable,
is_inline: bool = False,
) -> dict:
config = self._db.get(security.__name__, "masks", {}).get(
f"{command.__module__}.{command.__name__}",
getattr(command, "security", self._client.dispatcher.security.default),
)
return self._perms_map(config, is_inline)
@loader.owner
@loader.command(ru_doc="[команда] - Настроить разрешения для команды")
async def security(self, message: Message):
"""[command] - Configure command's security settings"""
args = utils.get_args_raw(message).lower().strip()
if args and args not in self.allmodules.commands:
await utils.answer(message, self.strings("no_command").format(args))
return
if not args:
await self.inline.form(
self.strings("global"),
reply_markup=self._build_markup_global(),
message=message,
ttl=5 * 60,
)
return
cmd = self.allmodules.commands[args]
await self.inline.form(
self.strings("permissions").format(self.get_prefix(), args),
reply_markup=self._build_markup(cmd),
message=message,
ttl=5 * 60,
)
@loader.owner
@loader.command(ru_doc="[команда] - Настроить разрешения для инлайн команды")
async def inlinesec(self, message: Message):
"""[command] - Configure inline command's security settings"""
args = utils.get_args_raw(message).lower().strip()
if not args:
await self.inline.form(
self.strings("global"),
reply_markup=self._build_markup_global(True),
message=message,
ttl=5 * 60,
)
return
if args not in self.allmodules.inline_handlers:
await utils.answer(message, self.strings("no_command").format(args))
return
i_handler = self.allmodules.inline_handlers[args]
await self.inline.form(
self.strings("permissions").format(f"@{self.inline.bot_username} ", args),
reply_markup=self._build_markup(i_handler, True),
message=message,
ttl=5 * 60,
)
async def _resolve_user(self, message: Message):
reply = await message.get_reply_message()
args = utils.get_args_raw(message)
if not args and not reply:
await utils.answer(message, self.strings("no_user"))
return
user = None
if args:
try:
if str(args).isdigit():
args = int(args)
user = await self._client.get_entity(args)
except Exception:
pass
if user is None:
user = await self._client.get_entity(reply.sender_id)
if not isinstance(user, (User, PeerUser)):
await utils.answer(message, self.strings("not_a_user"))
return
if user.id == self.tg_id:
await utils.answer(message, self.strings("self"))
return
return user
async def _add_to_group(
self,
message: Union[Message, InlineCall], # noqa: F821
group: str,
confirmed: bool = False,
user: int = None,
):
if user is None:
user = await self._resolve_user(message)
if not user:
return
if isinstance(user, int):
user = await self._client.get_entity(user)
if not confirmed:
await self.inline.form(
self.strings("warning").format(
user.id,
utils.escape_html(get_display_name(user)),
group,
),
message=message,
ttl=10 * 60,
reply_markup=[
{
"text": self.strings("cancel"),
"action": "close",
},
{
"text": self.strings("confirm"),
"callback": self._add_to_group,
"args": (group, True, user.id),
},
],
)
return
if user.id not in getattr(self._client.dispatcher.security, group):
getattr(self._client.dispatcher.security, group).append(user.id)
m = (
self.strings(f"{group}_added").format(
user.id,
utils.escape_html(get_display_name(user)),
)
+ "\n\n"
+ self.strings("suggest_nonick")
)
await utils.answer(message, m)
await message.edit(
m,
reply_markup=[
{
"text": self.strings("cancel"),
"action": "close",
},
{
"text": self.strings("enable_nonick_btn"),
"callback": self._enable_nonick,
"args": (user,),
},
],
)
async def _enable_nonick(self, call: InlineCall, user: User):
self._db.set(
main.__name__,
"nonickusers",
list(set(self._db.get(main.__name__, "nonickusers", []) + [user.id])),
)
await call.edit(
self.strings("user_nn").format(
user.id,
utils.escape_html(get_display_name(user)),
)
)
await call.unload()
async def _remove_from_group(self, message: Message, group: str):
user = await self._resolve_user(message)
if not user:
return
if user.id in getattr(self._client.dispatcher.security, group):
getattr(self._client.dispatcher.security, group).remove(user.id)
m = self.strings(f"{group}_removed").format(
user.id,
utils.escape_html(get_display_name(user)),
)
await utils.answer(message, m)
async def _list_group(self, message: Message, group: str):
_resolved_users = []
for user in getattr(self._client.dispatcher.security, group) + (
[self.tg_id] if group == "owner" else []
):
try:
_resolved_users += [await self._client.get_entity(user)]
except Exception:
pass
if _resolved_users:
await utils.answer(
message,
self.strings(f"{group}_list").format(
"\n".join(
[
self.strings("li").format(
i.id, utils.escape_html(get_display_name(i))
)
for i in _resolved_users
]
)
),
)
else:
await utils.answer(message, self.strings(f"no_{group}"))
@loader.command(ru_doc="<пользователь> - Добавить пользователя в группу `sudo`")
async def sudoadd(self, message: Message):
""" - Add user to `sudo`"""
await self._add_to_group(message, "sudo")
@loader.command(ru_doc="<пользователь> - Добавить пользователя в группу `owner`")
async def owneradd(self, message: Message):
""" - Add user to `owner`"""
await self._add_to_group(message, "owner")
@loader.command(ru_doc="<пользователь> - Добавить пользователя в группу `support`")
async def supportadd(self, message: Message):
""" - Add user to `support`"""
await self._add_to_group(message, "support")
@loader.command(ru_doc="<пользователь> - Удалить пользователя из группы `sudo`")
async def sudorm(self, message: Message):
""" - Remove user from `sudo`"""
await self._remove_from_group(message, "sudo")
@loader.command(ru_doc="<пользователь> - Удалить пользователя из группы `owner`")
async def ownerrm(self, message: Message):
""" - Remove user from `owner`"""
await self._remove_from_group(message, "owner")
@loader.command(ru_doc="<пользователь> - Удалить пользователя из группы `support`")
async def supportrm(self, message: Message):
""" - Remove user from `support`"""
await self._remove_from_group(message, "support")
@loader.command(ru_doc="Показать список пользователей в группе `sudo`")
async def sudolist(self, message: Message):
"""List users in `sudo`"""
await self._list_group(message, "sudo")
@loader.command(ru_doc="Показать список пользователей в группе `owner`")
async def ownerlist(self, message: Message):
"""List users in `owner`"""
await self._list_group(message, "owner")
@loader.command(ru_doc="Показать список пользователей в группе `support`")
async def supportlist(self, message: Message):
"""List users in `support`"""
await self._list_group(message, "support")
def _lookup(self, needle: str) -> str:
return (
[]
if needle.lower().startswith(self.get_prefix())
else (
[f"module/{self.lookup(needle).__class__.__name__}"]
if self.lookup(needle)
else []
)
) + (
[f"command/{needle.lower().strip(self.get_prefix())}"]
if needle.lower().strip(self.get_prefix()) in self.allmodules.commands
else []
)
@staticmethod
def _extract_time(args: list) -> int:
suffixes = {
"d": 24 * 60 * 60,
"h": 60 * 60,
"m": 60,
"s": 1,
}
for suffix, quantifier in suffixes.items():
duration = next(
(
int(arg.rsplit(suffix, maxsplit=1)[0])
for arg in args
if arg.endswith(suffix)
and arg.rsplit(suffix, maxsplit=1)[0].isdigit()
),
None,
)
if duration is not None:
return duration * quantifier
return 0
def _convert_time(self, duration: int) -> str:
return (
(
f"{duration // (24 * 60 * 60)} "
+ self.strings(f"day{'s' if duration // (24 * 60 * 60) > 1 else ''}")
)
if duration >= 24 * 60 * 60
else (
(
f"{duration // (60 * 60)} "
+ self.strings(f"hour{'s' if duration // (60 * 60) > 1 else ''}")
)
if duration >= 60 * 60
else (
(
f"{duration // 60} "
+ self.strings(f"minute{'s' if duration // 60 > 1 else ''}")
)
if duration >= 60
else (
f"{duration} "
+ self.strings(f"second{'s' if duration > 1 else ''}")
)
)
)
)
async def _add_rule(
self,
call: InlineCall,
target_type: str,
target: EntityLike,
rule: str,
duration: int,
):
self._client.dispatcher.security.add_rule(
target_type,
target,
rule,
duration,
)
await call.edit(
self.strings("rule_added").format(
self.strings(target_type),
utils.get_entity_url(target),
utils.escape_html(get_display_name(target)),
self.strings(rule.split("/", maxsplit=1)[0]),
rule.split("/", maxsplit=1)[1],
(self.strings("for") + " " + self._convert_time(duration))
if duration
else self.strings("forever"),
)
)
async def _confirm(
self,
obj: Union[Message, InlineMessage],
target_type: str,
target: EntityLike,
rule: str,
duration: int,
):
await utils.answer(
obj,
self.strings("confirm_rule").format(
self.strings(target_type),
utils.get_entity_url(target),
utils.escape_html(get_display_name(target)),
self.strings(rule.split("/", maxsplit=1)[0]),
rule.split("/", maxsplit=1)[1],
(self.strings("for") + " " + self._convert_time(duration))
if duration
else self.strings("forever"),
),
reply_markup=[
{
"text": self.strings("confirm_btn"),
"callback": self._add_rule,
"args": (target_type, target, rule, duration),
},
{"text": self.strings("cancel_btn"), "action": "close"},
],
)
async def _tsec_chat(self, message: Message, args: list):
if len(args) == 1 and message.is_private:
await utils.answer(message, self.strings("no_target"))
return
if len(args) >= 2:
try:
if not args[1].isdigit() and not args[1].startswith("@"):
raise ValueError
target = await self._client.get_entity(
int(args[1]) if args[1].isdigit() else args[1]
)
except (ValueError, TypeError):
if not message.is_private:
target = await self._client.get_entity(message.peer_id)
else:
await utils.answer(message, self.strings("no_target"))
return
duration = self._extract_time(args)
possible_rules = utils.array_sum([self._lookup(arg) for arg in args])
if not possible_rules:
await utils.answer(message, self.strings("no_rule"))
return
if len(possible_rules) > 1:
def case(text: str) -> str:
return text.upper()[0] + text[1:]
await self.inline.form(
message=message,
text=self.strings("multiple_rules").format(
"\n".join(
f"🛡 {case(self.strings(i.split('/')[0]))} {i.split('/', maxsplit=1)[1]}
"
for i in possible_rules
)
),
reply_markup=utils.chunks(
[
{
"text": (
f"🛡 {case(self.strings(i.split('/')[0]))} {i.split('/', maxsplit=1)[1]}"
),
"callback": self._confirm,
"args": ("chat", target, i, duration),
}
for i in possible_rules
],
3,
),
)
return
await self._confirm(message, "chat", target, possible_rules[0], duration)
async def _tsec_user(self, message: Message, args: list):
if len(args) == 1 and not message.is_private and not message.is_reply:
await utils.answer(message, self.strings("no_target"))
return
if len(args) >= 2:
try:
if not args[1].isdigit() and not args[1].startswith("@"):
raise ValueError
target = await self._client.get_entity(
int(args[1]) if args[1].isdigit() else args[1]
)
except (ValueError, TypeError):
if message.is_private:
target = await self._client.get_entity(message.peer_id)
elif message.is_reply:
target = await self._client.get_entity(
(await message.get_reply_message()).sender_id
)
else:
await utils.answer(message, self.strings("no_target"))
return
if target.id in self._client.dispatcher.security.owner:
await utils.answer(message, self.strings("owner_target"))
return
duration = self._extract_time(args)
possible_rules = utils.array_sum([self._lookup(arg) for arg in args])
if not possible_rules:
await utils.answer(message, self.strings("no_rule"))
return
if len(possible_rules) > 1:
def case(text: str) -> str:
return text.upper()[0] + text[1:]
await self.inline.form(
message=message,
text=self.strings("multiple_rules").format(
"\n".join(
f"🛡 {case(self.strings(i.split('/')[0]))} {i.split('/', maxsplit=1)[1]}
"
for i in possible_rules
)
),
reply_markup=utils.chunks(
[
{
"text": (
f"🛡 {case(self.strings(i.split('/')[0]))} {i.split('/', maxsplit=1)[1]}"
),
"callback": self._confirm,
"args": ("user", target, i, duration),
}
for i in possible_rules
],
3,
),
)
return
await self._confirm(message, "user", target, possible_rules[0], duration)
@loader.command(
ru_doc='<"user"/"chat"> - Удалить правило таргетированной безопасности'
)
async def tsecrm(self, message: Message):
"""<"user"/"chat"> - Remove targeted security rule"""
if (
not self._client.dispatcher.security.tsec_chat
and not self._client.dispatcher.security.tsec_user
):
await utils.answer(message, self.strings("no_rules"))
return
args = utils.get_args_raw(message)
if not args or args not in {"user", "chat"}:
await utils.answer(message, self.strings("no_target"))
return
if args == "user":
if not message.is_private and not message.is_reply:
await utils.answer(message, self.strings("no_target"))
return
if message.is_private:
target = await self._client.get_entity(message.peer_id)
elif message.is_reply:
target = await self._client.get_entity(
(await message.get_reply_message()).sender_id
)
else:
await utils.answer(message, self.strings("no_target"))
return
if not self._client.dispatcher.security.remove_rules("user", target.id):
await utils.answer(message, self.strings("no_rules"))
return
await utils.answer(
message,
self.strings("rules_removed").format(
utils.get_entity_url(target),
utils.escape_html(get_display_name(target)),
),
)
return
if message.is_private:
await utils.answer(message, self.strings("no_target"))
return
target = await self._client.get_entity(message.peer_id)
if not self._client.dispatcher.security.remove_rules("chat", target.id):
await utils.answer(message, self.strings("no_rules"))
return
await utils.answer(
message,
self.strings("rules_removed").format(
utils.get_entity_url(target),
utils.escape_html(get_display_name(target)),
),
)
@loader.command(
ru_doc=(
'<"user"/"chat"> [цель - пользователь или чат] [правило - команда или'
" модуль] [время] - Настроить таргетированную безопасность"
)
)
async def tsec(self, message: Message):
"""<"user"/"chat"> [target user or chat] [rule (command/module)] [time] - Add new targeted security rule"""
args = utils.get_args(message)
if not args:
if (
not self._client.dispatcher.security.tsec_chat
and not self._client.dispatcher.security.tsec_user
):
await utils.answer(message, self.strings("no_rules"))
return
await utils.answer(
message,
self.strings("rules").format(
"\n".join(
[
"👥 {utils.escape_html(rule['entity_name'])}"
f" {self._convert_time(int(rule['expires'] - time.time()))} {self.strings('for')} {self.strings(rule['rule_type'])}"
f" {rule['rule']}
"
for rule in self._client.dispatcher.security.tsec_chat
]
+ [
"👤 {utils.escape_html(rule['entity_name'])}"
f" {self._convert_time(int(rule['expires'] - time.time()))} {self.strings('for')} {self.strings(rule['rule_type'])}"
f" {rule['rule']}
"
for rule in self._client.dispatcher.security.tsec_user
]
)
),
)
return
if args[0] not in {"user", "chat"}:
await utils.answer(message, self.strings("what"))
return
await getattr(self, f"_tsec_{args[0]}")(message, args)