diff --git a/hikka/web/debugger.py b/hikka/web/debugger.py new file mode 100644 index 0000000..1a2959a --- /dev/null +++ b/hikka/web/debugger.py @@ -0,0 +1,105 @@ +# ÂŠī¸ Dan Gazizullin, 2021-2023 +# This file is a part of Hikka Userbot +# 🌐 https://github.com/hikariatama/Hikka +# You can redistribute it and/or modify it under the terms of the GNU AGPLv3 +# 🔑 https://www.gnu.org/licenses/agpl-3.0.html + +import asyncio +import logging +import os +import random +from threading import Thread + +from werkzeug import Request, Response +from werkzeug.debug import DebuggedApplication +from werkzeug.serving import BaseWSGIServer, make_server + +from .. import main, utils +from . import proxypass + +logger = logging.getLogger(__name__) + + +class ServerThread(Thread): + def __init__(self, server: BaseWSGIServer): + Thread.__init__(self) + self.server = server + + def run(self): + logger.debug("Starting werkzeug debug server") + self.server.serve_forever() + + def shutdown(self): + logger.debug("Shutting down werkzeug debug server") + self.server.shutdown() + + +class WebDebugger: + def __init__(self): + self._url = None + self.exceptions = {} + self.pin = str(random.randint(100000, 999999)) + self.port = main.gen_port("werkzeug_port", True) + main.save_config_key("werkzeug_port", self.port) + self._proxypasser = proxypass.ProxyPasser(self._url_changed) + asyncio.ensure_future(self._getproxy()) + self._create_server() + self._controller = ServerThread(self._server) + logging.getLogger("werkzeug").setLevel(logging.WARNING) + self._controller.start() + utils.atexit(self._controller.shutdown) + self.proxy_ready = asyncio.Event() + + async def _getproxy(self): + self._url = await self._proxypasser.get_url(self.port) + self.proxy_ready.set() + + def _url_changed(self, url: str): + self._url = url + + def _create_server(self) -> BaseWSGIServer: + logger.debug("Creating new werkzeug server instance") + os.environ["WERKZEUG_DEBUG_PIN"] = self.pin + os.environ["WERKZEUG_RUN_MAIN"] = "true" + + @Request.application + def app(request): + if request.args.get("ping", "N").upper() == "Y": + return Response("ok") + + if request.args.get("shutdown", "N").upper() == "Y": + self._server._BaseServer__shutdown_request = True + return Response("Shutdown!") + + raise self.exceptions.get(request.args.get("ex_id"), Exception("idk")) + + app = DebuggedApplication(app, evalex=True, pin_security=True) + + try: + fd = int(os.environ["WERKZEUG_SERVER_FD"]) + except (LookupError, ValueError): + fd = None + + self._server = make_server( + "localhost", + self.port, + app, + threaded=False, + processes=1, + request_handler=None, + passthrough_errors=False, + ssl_context=None, + fd=fd, + ) + + return self._server + + @property + def url(self) -> str: + return self._url or f"http://127.0.0.1:{self.port}" + + def feed(self, exc_type, exc_value, exc_traceback) -> str: + logger.debug("Feeding exception %s to werkzeug debugger", exc_type) + id_ = utils.rand(8) + self.exceptions[id_] = exc_type(exc_value).with_traceback(exc_traceback) + return self.url.strip("/") + f"?ex_id={id_}"