diff --git a/hikka/modules/hikka_security.py b/hikka/modules/hikka_security.py deleted file mode 100644 index d6109c5..0000000 --- a/hikka/modules/hikka_security.py +++ /dev/null @@ -1,1254 +0,0 @@ -# ÂŠī¸ Dan Gazizullin, 2021-2023 -# This file is a part of Hikka Userbot -# 🌐 https://github.com/hikariatama/Hikka -# You can redistribute it and/or modify it under the terms of the GNU AGPLv3 -# 🔑 https://www.gnu.org/licenses/agpl-3.0.html - -import contextlib -import datetime -import time -import typing - -from hikkatl.hints import EntityLike -from hikkatl.tl.types import Message, PeerUser, User -from hikkatl.utils import get_display_name - -from .. import loader, main, security, utils -from ..inline.types import InlineCall, InlineMessage -from ..security import ( - DEFAULT_PERMISSIONS, - EVERYONE, - GROUP_ADMIN, - GROUP_ADMIN_ADD_ADMINS, - GROUP_ADMIN_BAN_USERS, - GROUP_ADMIN_CHANGE_INFO, - GROUP_ADMIN_DELETE_MESSAGES, - GROUP_ADMIN_INVITE_USERS, - GROUP_ADMIN_PIN_MESSAGES, - GROUP_MEMBER, - GROUP_OWNER, - PM, - SecurityGroup, -) - - -@loader.tds -class HikkaSecurityMod(loader.Module): - """Control security settings""" - - strings = {"name": "HerokuSecurity"} - - async def client_ready(self): - self._sgroups: typing.Iterable[str, SecurityGroup] = self.pointer( - "sgroups", {}, item_type=SecurityGroup - ) - self._reload_sgroups() - - def _reload_sgroups(self): - self._client.dispatcher.security.apply_sgroups(self._sgroups.todict()) - - async def inline__switch_perm( - self, - call: InlineCall, - command: str, - group: str, - level: bool, - is_inline: bool, - ): - cmd = ( - self.allmodules.inline_handlers[command] - if is_inline - else self.allmodules.commands[command] - ) - - mask = self._db.get(security.__name__, "masks", {}).get( - f"{cmd.__module__}.{cmd.__name__}", - getattr(cmd, "security", security.DEFAULT_PERMISSIONS), - ) - - bit = security.BITMAP[group.upper()] - - if level: - mask |= bit - else: - mask &= ~bit - - masks = self._db.get(security.__name__, "masks", {}) - masks[f"{cmd.__module__}.{cmd.__name__}"] = mask - self._db.set(security.__name__, "masks", masks) - - if ( - not self._db.get(security.__name__, "bounding_mask", DEFAULT_PERMISSIONS) - & bit - and level - ): - await call.answer( - ( - "Security value set but not applied. Consider enabling this value" - f" in .{'inlinesec' if is_inline else 'security'}" - ), - show_alert=True, - ) - else: - await call.answer("Security value set!") - - await call.edit( - self.strings("permissions").format( - utils.escape_html( - f"@{self.inline.bot_username} " if is_inline else self.get_prefix() - ), - command, - ), - reply_markup=self._build_markup(cmd, is_inline), - ) - - async def inline__switch_perm_bm( - self, - call: InlineCall, - group: str, - level: bool, - is_inline: bool, - ): - mask = self._db.get(security.__name__, "bounding_mask", DEFAULT_PERMISSIONS) - bit = security.BITMAP[group.upper()] - - if level: - mask |= bit - else: - mask &= ~bit - - self._db.set(security.__name__, "bounding_mask", mask) - - await call.answer("Bounding mask value set!") - await call.edit( - self.strings("global"), - reply_markup=self._build_markup_global(is_inline), - ) - - def _build_markup( - self, - command: callable, - is_inline: bool = False, - ) -> typing.List[typing.List[dict]]: - perms = self._get_current_perms(command, is_inline) - return ( - utils.chunks( - [ - { - "text": f"{'✅' if level else 'đŸšĢ'} {self.strings[group]}", - "callback": self.inline__switch_perm, - "args": ( - command.__name__.rsplit("_inline_handler", maxsplit=1)[0], - group, - not level, - is_inline, - ), - } - for group, level in perms.items() - ], - 2, - ) - + [[{"text": self.strings("close_menu"), "action": "close"}]] - if is_inline - else utils.chunks( - [ - { - "text": f"{'✅' if level else 'đŸšĢ'} {self.strings[group]}", - "callback": self.inline__switch_perm, - "args": ( - command.__name__.rsplit("cmd", maxsplit=1)[0], - group, - not level, - is_inline, - ), - } - for group, level in perms.items() - ], - 2, - ) - + [ - [ - { - "text": self.strings("close_menu"), - "action": "close", - } - ] - ] - ) - - def _build_markup_global( - self, - is_inline: bool = False, - ) -> typing.List[typing.List[dict]]: - return utils.chunks( - [ - { - "text": f"{'✅' if level else 'đŸšĢ'} {self.strings[group]}", - "callback": self.inline__switch_perm_bm, - "args": (group, not level, is_inline), - } - for group, level in self._get_current_bm(is_inline).items() - ], - 2, - ) + [[{"text": self.strings("close_menu"), "action": "close"}]] - - def _get_current_bm(self, is_inline: bool = False) -> dict: - return self._perms_map( - self._db.get(security.__name__, "bounding_mask", DEFAULT_PERMISSIONS), - is_inline, - ) - - @staticmethod - def _perms_map(perms: int, is_inline: bool) -> dict: - return ( - {"everyone": bool(perms & EVERYONE)} - if is_inline - else { - "group_owner": bool(perms & GROUP_OWNER), - "group_admin_add_admins": bool(perms & GROUP_ADMIN_ADD_ADMINS), - "group_admin_change_info": bool(perms & GROUP_ADMIN_CHANGE_INFO), - "group_admin_ban_users": bool(perms & GROUP_ADMIN_BAN_USERS), - "group_admin_delete_messages": bool( - perms & GROUP_ADMIN_DELETE_MESSAGES - ), - "group_admin_pin_messages": bool(perms & GROUP_ADMIN_PIN_MESSAGES), - "group_admin_invite_users": bool(perms & GROUP_ADMIN_INVITE_USERS), - "group_admin": bool(perms & GROUP_ADMIN), - "group_member": bool(perms & GROUP_MEMBER), - "pm": bool(perms & PM), - "everyone": bool(perms & EVERYONE), - } - ) - - def _get_current_perms( - self, - command: callable, - is_inline: bool = False, - ) -> dict: - return self._perms_map( - self._db.get(security.__name__, "masks", {}).get( - f"{command.__module__}.{command.__name__}", - getattr(command, "security", self._client.dispatcher.security.default), - ), - is_inline, - ) - - @loader.command() - async def newsgroup(self, message: Message): - if not (args := utils.get_args_raw(message)): - await utils.answer(message, self.strings("no_args")) - return - - if not all([i.isalnum() for i in args]): - await utils.answer(message, self.strings("invalid_name")) - return - - if args in self._sgroups: - await utils.answer( - message, self.strings("sgroup_already_exists").format(args) - ) - return - - self._sgroups[args] = SecurityGroup(args, [], []) - self._reload_sgroups() - - await utils.answer(message, self.strings("created_sgroup").format(args)) - - @loader.command() - async def sgroups(self, message: Message): - await utils.answer( - message, - self.strings("sgroups_list").format( - "\n".join( - self.strings("sgroup_li").format( - group.name, len(group.users), len(group.permissions) - ) - for group in self._sgroups.values() - ) - ), - ) - - @loader.command() - async def sgroup(self, message: Message): - if not (args := utils.get_args_raw(message)): - await utils.answer(message, self.strings("no_args")) - return - - if not (group := self._sgroups.get(args)): - await utils.answer(message, self.strings("sgroup_not_found").format(args)) - return - - await utils.answer( - message, - self.strings("sgroup_info").format( - group.name, - ( - self.strings("users_list").format( - "\n".join( - [ - self.strings("li").format( - utils.get_entity_url( - await self._client.get_entity(user, exp=0) - ), - utils.escape_html( - get_display_name( - await self._client.get_entity(user, exp=0) - ) - ), - ) - for user in group.users - ] - ) - ) - if group.users - else self.strings("no_users") - ), - ( - self.strings("permissions_list").format( - "\n".join( - "â–Ģī¸" - " {} {} {}".format( - self.strings(rule["rule_type"]), - rule["rule"], - ( - ( - self.strings("until") - + " " - + self._convert_time_abs(rule["expires"]) - ) - if rule["expires"] - else self.strings("forever") - ), - ) - for rule in group.permissions - ) - ) - if group.permissions - else self.strings("no_permissions") - ), - ), - ) - - @loader.command() - async def delsgroup(self, message: Message): - if not (args := utils.get_args_raw(message)): - await utils.answer(message, self.strings("no_args")) - return - - if not self._sgroups.get(args): - await utils.answer(message, self.strings("sgroup_not_found").format(args)) - return - - del self._sgroups[args] - self._reload_sgroups() - - await utils.answer(message, self.strings("deleted_sgroup").format(args)) - - @loader.command() - async def sgroupadd(self, message: Message): - if not (args := utils.get_args_raw(message)): - await utils.answer(message, self.strings("no_args")) - return - - if len(args.split()) >= 2: - group, user = args.split() - - if user.isdigit(): - user = int(user) - - try: - user = await self._client.get_entity(user, exp=0) - except ValueError: - await utils.answer(message, self.strings("no_args")) - return - else: - if not message.is_reply: - await utils.answer(message, self.strings("no_args")) - return - - group, user = args, await (await message.get_reply_message()).get_sender() - - if not (group := self._sgroups.get(group)): - await utils.answer(message, self.strings("sgroup_not_found").format(group)) - return - - if user.id in group.users: - await utils.answer( - message, - self.strings("user_already_in_sgroup").format( - utils.escape_html(get_display_name(user)), - group.name, - ), - ) - return - - group.users.append(user.id) - self._sgroups[group.name] = group - self._reload_sgroups() - - await utils.answer( - message, - self.strings("user_added_to_sgroup").format( - utils.escape_html(get_display_name(user)), - group.name, - ), - ) - - @loader.command() - async def sgroupdel(self, message: Message): - if not (args := utils.get_args_raw(message)): - await utils.answer(message, self.strings("no_args")) - return - - if len(args.split()) >= 2: - group, user = args.split() - - if user.isdigit(): - user = int(user) - - try: - user = await self._client.get_entity(user, exp=0) - except ValueError: - await utils.answer(message, self.strings("no_args")) - return - else: - if not message.is_reply: - await utils.answer(message, self.strings("no_args")) - return - - group, user = args, await (await message.get_reply_message()).get_sender() - - if not (group := self._sgroups.get(group)): - await utils.answer(message, self.strings("sgroup_not_found").format(group)) - return - - if user.id not in group.users: - await utils.answer( - message, - self.strings("user_not_in_sgroup").format( - utils.escape_html(get_display_name(user)), - group.name, - ), - ) - return - - group.users.remove(user.id) - self._sgroups[group.name] = group - self._reload_sgroups() - - await utils.answer( - message, - self.strings("user_removed_from_sgroup").format( - utils.escape_html(get_display_name(user)), - group.name, - ), - ) - - @loader.command() - async def security(self, message: Message): - if ( - args := utils.get_args_raw(message).lower().strip() - ) and args not in self.allmodules.commands: - await utils.answer(message, self.strings("no_command").format(args)) - return - - if not args: - await self.inline.form( - self.strings("global"), - reply_markup=self._build_markup_global(), - message=message, - ttl=5 * 60, - ) - return - - await self.inline.form( - self.strings("permissions").format(self.get_prefix(), args), - reply_markup=self._build_markup(self.allmodules.commands[args]), - message=message, - ttl=5 * 60, - ) - - @loader.command() - async def inlinesec(self, message: Message): - if not (args := utils.get_args_raw(message).lower().strip()): - await self.inline.form( - self.strings("global"), - reply_markup=self._build_markup_global(True), - message=message, - ttl=5 * 60, - ) - return - - if args not in self.allmodules.inline_handlers: - await utils.answer(message, self.strings("no_command").format(args)) - return - - await self.inline.form( - self.strings("permissions").format(f"@{self.inline.bot_username} ", args), - reply_markup=self._build_markup( - self.allmodules.inline_handlers[args], - True, - ), - message=message, - ttl=5 * 60, - ) - - async def _resolve_user(self, message: Message): - if not (args := utils.get_args_raw(message)) and not ( - reply := await message.get_reply_message() - ): - await utils.answer(message, self.strings("no_user")) - return - - user = None - - if args: - with contextlib.suppress(Exception): - if str(args).isdigit(): - args = int(args) - - user = await self._client.get_entity(args, exp=0) - - if user is None: - try: - user = await self._client.get_entity(reply.sender_id, exp=0) - except ValueError: - user = await reply.get_sender() - - if not isinstance(user, (User, PeerUser)): - await utils.answer(message, self.strings("not_a_user")) - return - - if user.id == self.tg_id: - await utils.answer(message, self.strings("self")) - return - - return user - - async def _add_to_group( - self, - message: typing.Union[Message, InlineCall], - group: str, - confirmed: bool = False, - user: int = None, - ): - if user is None and not (user := await self._resolve_user(message)): - return - - if isinstance(user, int): - user = await self._client.get_entity(user, exp=0) - - if not confirmed: - await self.inline.form( - self.strings("warning").format( - user.id, - utils.escape_html(get_display_name(user)), - group, - ), - message=message, - ttl=10 * 60, - reply_markup=[ - { - "text": self.strings("cancel"), - "action": "close", - }, - { - "text": self.strings("confirm"), - "callback": self._add_to_group, - "args": (group, True, user.id), - }, - ], - ) - return - - if user.id not in getattr(self._client.dispatcher.security, group): - getattr(self._client.dispatcher.security, group).append(user.id) - - await message.edit( - ( - self.strings(f"{group}_added").format( - user.id, - utils.escape_html(get_display_name(user)), - ) - + "\n\n" - + self.strings("suggest_nonick") - ), - reply_markup=[ - { - "text": self.strings("cancel"), - "action": "close", - }, - { - "text": self.strings("enable_nonick_btn"), - "callback": self._enable_nonick, - "args": (user,), - }, - ], - ) - - async def _enable_nonick(self, call: InlineCall, user: User): - self._db.set( - main.__name__, - "nonickusers", - list(set(self._db.get(main.__name__, "nonickusers", []) + [user.id])), - ) - - await call.edit( - self.strings("user_nn").format( - user.id, - utils.escape_html(get_display_name(user)), - ) - ) - - await call.unload() - - @loader.command() - async def owneradd(self, message: Message): - await self._add_to_group(message, "owner") - - @loader.command() - async def ownerrm(self, message: Message): - if not (user := await self._resolve_user(message)): - return - - if user.id in self._client.dispatcher.security.owner: - self._client.dispatcher.security.owner.remove(user.id) - - await utils.answer( - message, - self.strings("owner_removed").format( - user.id, - utils.escape_html(get_display_name(user)), - ), - ) - - @loader.command() - async def ownerlist(self, message: Message): - _resolved_users = [] - for user in set(self._client.dispatcher.security.owner + [self.tg_id]): - with contextlib.suppress(Exception): - _resolved_users += [await self._client.get_entity(user, exp=0)] - - if not _resolved_users: - await utils.answer(message, self.strings("no_owner")) - return - - await utils.answer( - message, - self.strings("owner_list").format( - "\n".join( - [ - self.strings("li").format( - i.id, utils.escape_html(get_display_name(i)) - ) - for i in _resolved_users - ] - ) - ), - ) - - def _lookup(self, needle: str) -> str: - return ( - ( - [] - if needle.lower().startswith(self.get_prefix()) - else ( - [f"module/{self.lookup(needle).__class__.__name__}"] - if self.lookup(needle) - else [] - ) - ) - + ( - [f"command/{needle.lower().strip(self.get_prefix())}"] - if needle.lower().strip(self.get_prefix()) in self.allmodules.commands - else [] - ) - + ( - [f"inline/{needle.lower().strip('@')}"] - if needle.lower().strip("@") in self.allmodules.inline_handlers - else [] - ) - ) - - @staticmethod - def _extract_time(args: list) -> int: - for suffix, quantifier in [ - ("d", 24 * 60 * 60), - ("h", 60 * 60), - ("m", 60), - ("s", 1), - ]: - duration = next( - ( - int(arg.rsplit(suffix, maxsplit=1)[0]) - for arg in args - if arg.endswith(suffix) - and arg.rsplit(suffix, maxsplit=1)[0].isdigit() - ), - None, - ) - if duration is not None: - return duration * quantifier - - return 0 - - def _convert_time_abs(self, timestamp: int) -> str: - return ( - self.strings("forever") - if not timestamp - else datetime.datetime.fromtimestamp(timestamp).strftime( - "%Y-%m-%d %H:%M:%S" - ) - ) - - def _convert_time(self, duration: int) -> str: - return ( - self.strings("forever") - if not duration or duration < 0 - else ( - ( - f"{duration // (24 * 60 * 60)} " - + self.strings( - f"day{'s' if duration // (24 * 60 * 60) > 1 else ''}" - ) - ) - if duration >= 24 * 60 * 60 - else ( - ( - f"{duration // (60 * 60)} " - + self.strings( - f"hour{'s' if duration // (60 * 60) > 1 else ''}" - ) - ) - if duration >= 60 * 60 - else ( - ( - f"{duration // 60} " - + self.strings(f"minute{'s' if duration // 60 > 1 else ''}") - ) - if duration >= 60 - else ( - f"{duration} " - + self.strings(f"second{'s' if duration > 1 else ''}") - ) - ) - ) - ) - ) - - async def _add_rule( - self, - call: InlineCall, - target_type: str, - target: typing.Union[EntityLike, str], - rule: str, - duration: int, - ): - if rule.startswith("inline") and target_type == "chat": - await call.edit(self.strings("chat_inline")) - return - - if target_type == "sgroup": - group = self._sgroups[target] - group.permissions.append( - { - "target": target, - "rule_type": rule.split("/")[0], - "rule": rule.split("/", maxsplit=1)[1], - "expires": int(time.time() + duration) if duration else 0, - "entity_name": group.name, - "entity_url": "", - } - ) - self._reload_sgroups() - else: - self._client.dispatcher.security.add_rule( - target_type, - target, - rule, - duration, - ) - - await call.edit( - self.strings("rule_added").format( - self.strings(target_type), - utils.get_entity_url(target) if not isinstance(target, str) else "", - utils.escape_html( - get_display_name(target) if not isinstance(target, str) else target - ), - self.strings(rule.split("/", maxsplit=1)[0]), - ( - ( - f"@{self.inline.bot_username} " - if rule.split("/", maxsplit=1)[0] == "inline" - else "" - ) - + rule.split("/", maxsplit=1)[1] - ), - ( - (self.strings("for") + " " + self._convert_time(duration)) - if duration - else self.strings("forever") - ), - ) - ) - - async def _confirm( - self, - obj: typing.Union[Message, InlineMessage], - target_type: str, - target: typing.Union[EntityLike, str], - rule: str, - duration: int, - ): - await utils.answer( - obj, - self.strings("confirm_rule").format( - self.strings(target_type), - utils.get_entity_url(target) if not isinstance(target, str) else "", - utils.escape_html( - get_display_name(target) if not isinstance(target, str) else target - ), - self.strings(rule.split("/", maxsplit=1)[0]), - ( - ( - f"@{self.inline.bot_username} " - if rule.split("/", maxsplit=1)[0] == "inline" - else "" - ) - + rule.split("/", maxsplit=1)[1] - ), - ( - (self.strings("for") + " " + self._convert_time(duration)) - if duration - else self.strings("forever") - ), - ), - reply_markup=[ - { - "text": self.strings("confirm_btn"), - "callback": self._add_rule, - "args": (target_type, target, rule, duration), - }, - {"text": self.strings("cancel_btn"), "action": "close"}, - ], - ) - - async def _tsec_chat(self, message: Message, args: list): - if len(args) == 1 and message.is_private: - await utils.answer(message, self.strings("no_target")) - return - - if len(args) >= 2: - try: - if not args[1].isdigit() and not args[1].startswith("@"): - raise ValueError - - target = await self._client.get_entity( - int(args[1]) if args[1].isdigit() else args[1], - exp=0, - ) - except (ValueError, TypeError): - if not message.is_private: - target = await self._client.get_entity(message.peer_id, exp=0) - else: - await utils.answer(message, self.strings("no_target")) - return - - if not ( - possible_rules := utils.array_sum([self._lookup(arg) for arg in args[1:]]) - ): - await utils.answer(message, self.strings("no_rule")) - return - - duration = self._extract_time(args) - - if len(possible_rules) > 1: - await self.inline.form( - message=message, - text=self.strings("multiple_rules").format( - "\n".join( - "🛡 {} {}".format( - self.strings(rule.split("/")[0]).capitalize(), - rule.split("/", maxsplit=1)[1], - ) - for rule in possible_rules - ) - ), - reply_markup=utils.chunks( - [ - { - "text": "🛡 {} {}".format( - self.strings(rule.split("/")[0]).capitalize(), - rule.split("/", maxsplit=1)[1], - ), - "callback": self._confirm, - "args": ("chat", target, rule, duration), - } - for rule in possible_rules - ], - 3, - ), - ) - return - - await self._confirm(message, "chat", target, possible_rules[0], duration) - - async def _tsec_sgroup(self, message: Message, args: list): - if len(args) <= 1: - await utils.answer(message, self.strings("no_target")) - return - - if (target := args[1]) not in self._sgroups: - await utils.answer(message, self.strings("sgroup_not_found").format(target)) - return - - if not ( - possible_rules := utils.array_sum([self._lookup(arg) for arg in args[1:]]) - ): - await utils.answer(message, self.strings("no_rule")) - return - - duration = self._extract_time(args) - - if len(possible_rules) > 1: - await self.inline.form( - message=message, - text=self.strings("multiple_rules").format( - "\n".join( - "🛡 {} {}".format( - self.strings(rule.split("/")[0]).capitalize(), - rule.split("/", maxsplit=1)[1], - ) - for rule in possible_rules - ) - ), - reply_markup=utils.chunks( - [ - { - "text": "🛡 {} {}".format( - self.strings(rule.split("/")[0]).capitalize(), - rule.split("/", maxsplit=1)[1], - ), - "callback": self._confirm, - "args": ("sgroup", target, rule, duration), - } - for rule in possible_rules - ], - 3, - ), - ) - return - - await self._confirm(message, "sgroup", target, possible_rules[0], duration) - - async def _tsec_user(self, message: Message, args: list): - if len(args) == 1 and not message.is_private and not message.is_reply: - await utils.answer(message, self.strings("no_target")) - return - - if len(args) >= 2: - try: - if not args[1].isdigit() and not args[1].startswith("@"): - raise ValueError - - target = await self._client.get_entity( - int(args[1]) if args[1].isdigit() else args[1], - exp=0, - ) - except (ValueError, TypeError): - if message.is_private: - target = await self._client.get_entity(message.peer_id, exp=0) - elif message.is_reply: - target = await self._client.get_entity( - (await message.get_reply_message()).sender_id, - exp=0, - ) - else: - await utils.answer(message, self.strings("no_target")) - return - - if target.id in self._client.dispatcher.security.owner: - await utils.answer(message, self.strings("owner_target")) - return - - duration = self._extract_time(args) - - if not ( - possible_rules := utils.array_sum([self._lookup(arg) for arg in args[1:]]) - ): - await utils.answer(message, self.strings("no_rule")) - return - - if len(possible_rules) > 1: - await self.inline.form( - message=message, - text=self.strings("multiple_rules").format( - "\n".join( - "🛡 {} {}".format( - self.strings(rule.split("/")[0]).capitalize(), - rule.split("/", maxsplit=1)[1], - ) - for rule in possible_rules - ) - ), - reply_markup=utils.chunks( - [ - { - "text": "🛡 {} {}".format( - self.strings(rule.split("/")[0]).capitalize(), - rule.split("/", maxsplit=1)[1], - ), - "callback": self._confirm, - "args": ("user", target, rule, duration), - } - for rule in possible_rules - ], - 3, - ), - ) - return - - await self._confirm(message, "user", target, possible_rules[0], duration) - - @loader.command() - async def tsecrm(self, message: Message): - if ( - not self._client.dispatcher.security.tsec_chat - and not self._client.dispatcher.security.tsec_user - ): - await utils.answer(message, self.strings("no_rules")) - return - - if not (args := utils.get_args(message)) or args[0] not in { - "user", - "chat", - "sgroup", - }: - await utils.answer(message, self.strings("no_target")) - return - - if args[0] == "user": - if not message.is_private and not message.is_reply: - await utils.answer(message, self.strings("no_target")) - return - - if message.is_private: - target = await self._client.get_entity(message.peer_id, exp=0) - elif message.is_reply: - target = await self._client.get_entity( - (await message.get_reply_message()).sender_id, - exp=0, - ) - - if not self._client.dispatcher.security.remove_rule( - "user", - target.id, - args[1], - ): - await utils.answer(message, self.strings("no_rules")) - return - - await utils.answer( - message, - self.strings("rule_removed").format( - utils.get_entity_url(target), - utils.escape_html(get_display_name(target)), - utils.escape_html(args[1]), - ), - ) - return - - if args[0] == "sgroup": - if len(args) < 3 or args[1] not in self._sgroups: - await utils.answer(message, self.strings("no_target")) - return - - group = self._sgroups[args[1]] - _any = False - for rule in group.permissions: - if rule["rule"] == args[2]: - group.permissions.remove(rule) - _any = True - - if not _any: - await utils.answer(message, self.strings("no_rules")) - return - - self._reload_sgroups() - - await utils.answer( - message, - self.strings("rule_removed").format( - "", - utils.escape_html(group.name), - utils.escape_html(args[2]), - ), - ) - return - - if message.is_private: - await utils.answer(message, self.strings("no_target")) - return - - target = await self._client.get_entity(message.peer_id, exp=0) - - if not self._client.dispatcher.security.remove_rule("chat", target.id, args[1]): - await utils.answer(message, self.strings("no_rules")) - return - - await utils.answer( - message, - self.strings("rule_removed").format( - utils.get_entity_url(target), - utils.escape_html(get_display_name(target)), - utils.escape_html(args[1]), - ), - ) - - @loader.command() - async def tsecclr(self, message: Message): - if ( - not self._client.dispatcher.security.tsec_chat - and not self._client.dispatcher.security.tsec_user - ): - await utils.answer(message, self.strings("no_rules")) - return - - if ( - not (args := utils.get_args(message)) - or not (args := args[0]) - or args not in {"user", "chat", "sgroup"} - ): - await utils.answer(message, self.strings("no_target")) - return - - if args == "user": - if not message.is_private and not message.is_reply: - await utils.answer(message, self.strings("no_target")) - return - - if message.is_private: - target = await self._client.get_entity(message.peer_id, exp=0) - elif message.is_reply: - target = await self._client.get_entity( - (await message.get_reply_message()).sender_id, - exp=0, - ) - - if not self._client.dispatcher.security.remove_rules("user", target.id): - await utils.answer(message, self.strings("no_rules")) - return - - await utils.answer( - message, - self.strings("rules_removed").format( - utils.get_entity_url(target), - utils.escape_html(get_display_name(target)), - ), - ) - return - - if args == "sgroup": - group = utils.get_args(message)[1] - if not (group := self._sgroups.get(group)): - await utils.answer(message, self.strings("no_target")) - return - - group.permissions.clear() - self._sgroups[group.name] = group - self._reload_sgroups() - - await utils.answer( - message, - self.strings("rules_removed").format( - "", - utils.escape_html(group.name), - ), - ) - return - - if message.is_private: - await utils.answer(message, self.strings("no_target")) - return - - target = await self._client.get_entity(message.peer_id, exp=0) - - if not self._client.dispatcher.security.remove_rules("chat", target.id): - await utils.answer(message, self.strings("no_rules")) - return - - await utils.answer( - message, - self.strings("rules_removed").format( - utils.get_entity_url(target), - utils.escape_html(get_display_name(target)), - ), - ) - - @loader.command() - async def tsec(self, message: Message): - if not (args := utils.get_args(message)): - if ( - not self._client.dispatcher.security.tsec_chat - and not self._client.dispatcher.security.tsec_user - ): - await utils.answer(message, self.strings("no_rules")) - return - - await utils.answer( - message, - self.strings("rules").format( - "\n".join( - [ - "đŸ‘Ĩ {} {} {} {} {}".format( - rule["entity_url"], - utils.escape_html(rule["entity_name"]), - self._convert_time(int(rule["expires"] - time.time())), - self.strings("for"), - self.strings(rule["rule_type"]), - rule["rule"], - ) - for rule in self._client.dispatcher.security.tsec_chat - ] - + [ - "👤 {} {} {} {} {}".format( - rule["entity_url"], - utils.escape_html(rule["entity_name"]), - self._convert_time(int(rule["expires"] - time.time())), - self.strings("for"), - self.strings(rule["rule_type"]), - rule["rule"], - ) - for rule in self._client.dispatcher.security.tsec_user - ] - + [ - "\n".join( - [ - "🔒" - " {} {} {} {} {}".format( - utils.escape_html(group.name), - self._convert_time( - int(rule["expires"] - time.time()) - ), - self.strings("for"), - self.strings(rule["rule_type"]), - rule["rule"], - ) - for rule in group.permissions - ] - ) - for group in self._sgroups.values() - ] - ) - ), - ) - return - - if args[0] not in {"user", "chat", "sgroup"}: - await utils.answer(message, self.strings("what")) - return - - await getattr(self, f"_tsec_{args[0]}")(message, args)