Deepsource fixes

pull/1/head
hikariatama 2023-01-01 22:39:42 +00:00
parent 16ddeee41e
commit cbc00985c8
6 changed files with 57 additions and 53 deletions

View File

@ -97,7 +97,7 @@ else:
log.init()
from . import main
except (ModuleNotFoundError, ImportError) as e:
except ImportError as e:
print(f"{str(e)}\n🔄 Attempting dependencies installation... Just wait ⏱")
subprocess.run(

View File

@ -40,11 +40,13 @@ DRAGON_EMOJI = "<emoji document_id=5375360100196163660>🐲</emoji>"
native_import = builtins.__import__
logger = logging.getLogger(__name__)
# This is used to ensure, that dynamic dragon import passes in
# the right client. Whenever one of the clients attempts to install
# dragon-specific module, it must aqcuire the `import_lock` or wait
# until it's released. Then set the `current_client` variable to self.
class ImportLock:
# This is used to ensure, that dynamic dragon import passes in
# the right client. Whenever one of the clients attempts to install
# dragon-specific module, it must aqcuire the `import_lock` or wait
# until it's released. Then set the `current_client` variable to self.
def __init__(self):
self.lock = asyncio.Lock()
self.current_client = None
@ -203,14 +205,12 @@ class DragonScripts:
"<b>Telegram API error!</b>\n"
f"<code>[{e.CODE} {e.ID or e.NAME}] - {e.MESSAGE}</code>"
)
else:
if hint:
hint_text = f"\n\n<b>Hint: {hint}</b>"
else:
hint_text = ""
return (
f"<b>Error!</b>\n<code>{e.__class__.__name__}: {e}</code>" + hint_text
)
hint_text = f"\n\n<b>Hint: {hint}</b>" if hint else ""
return (
f"<b>Error!</b>\n<code>{e.__class__.__name__}: {e}</code>" + hint_text
)
@staticmethod
def with_reply(func):
@ -306,7 +306,7 @@ class DragonScripts:
try:
return importlib.import_module(library_name)
except (ImportError, ModuleNotFoundError):
except ImportError:
completed = subprocess.run(
[
sys.executable,
@ -325,12 +325,15 @@ class DragonScripts:
),
package_name,
],
check=False,
)
if completed.returncode != 0:
raise AssertionError(
raise RuntimeError(
f"Failed to install library {package_name} (pip exited with code"
f" {completed.returncode})"
)
return importlib.import_module(library_name)
@staticmethod

View File

@ -142,24 +142,25 @@ class PyroProxyClient(PyroClient):
pyro_obj = self._convert(pyro_obj)
if isinstance(pyro_obj, list):
return [self._pyro2tl(i) for i in pyro_obj]
elif isinstance(pyro_obj, dict):
if isinstance(pyro_obj, dict):
return {k: self._pyro2tl(v) for k, v in pyro_obj.items()}
else:
if not isinstance(pyro_obj, raw.core.TLObject):
return pyro_obj
if type(pyro_obj) not in PROXY:
raise TypeError(
f"Cannot convert Pyrogram's {type(pyro_obj)} to Telethon TLObject"
)
if not isinstance(pyro_obj, raw.core.TLObject):
return pyro_obj
return PROXY[type(pyro_obj)](
**{
attr: self._pyro2tl(getattr(pyro_obj, attr))
for attr in pyro_obj.__slots__
}
if type(pyro_obj) not in PROXY:
raise TypeError(
f"Cannot convert Pyrogram's {type(pyro_obj)} to Telethon TLObject"
)
return PROXY[type(pyro_obj)](
**{
attr: self._pyro2tl(getattr(pyro_obj, attr))
for attr in pyro_obj.__slots__
}
)
def _tl2pyro(self, tl_obj: telethon.tl.TLObject) -> raw.core.TLObject:
"""
Recursively converts Telethon TLObjects to Pyrogram TLObjects (methods,
@ -180,32 +181,33 @@ class PyroProxyClient(PyroClient):
if isinstance(tl_obj, list):
return [self._tl2pyro(i) for i in tl_obj]
elif isinstance(tl_obj, dict):
if isinstance(tl_obj, dict):
return {k: self._tl2pyro(v) for k, v in tl_obj.items()}
else:
if isinstance(tl_obj, int) and str(tl_obj).startswith("-100"):
return int(str(tl_obj)[4:])
if not isinstance(tl_obj, telethon.tl.TLObject):
return tl_obj
if isinstance(tl_obj, int) and str(tl_obj).startswith("-100"):
return int(str(tl_obj)[4:])
if type(tl_obj) not in REVERSED_PROXY:
raise TypeError(
f"Cannot convert Telethon's {type(tl_obj)} to Pyrogram TLObject"
)
if not isinstance(tl_obj, telethon.tl.TLObject):
return tl_obj
hints = typing.get_type_hints(REVERSED_PROXY[type(tl_obj)].__init__) or {}
return REVERSED_PROXY[type(tl_obj)](
**{
attr: self._convert_types(
hints.get(attr),
self._tl2pyro(getattr(tl_obj, attr)),
)
for attr in REVERSED_PROXY[type(tl_obj)].__slots__
}
if type(tl_obj) not in REVERSED_PROXY:
raise TypeError(
f"Cannot convert Telethon's {type(tl_obj)} to Pyrogram TLObject"
)
hints = typing.get_type_hints(REVERSED_PROXY[type(tl_obj)].__init__) or {}
return REVERSED_PROXY[type(tl_obj)](
**{
attr: self._convert_types(
hints.get(attr),
self._tl2pyro(getattr(tl_obj, attr)),
)
for attr in REVERSED_PROXY[type(tl_obj)].__slots__
}
)
@staticmethod
def _get_origin(hint: typing.Any) -> typing.Any:
try:

View File

@ -4,8 +4,6 @@
# You can redistribute it and/or modify it under the terms of the GNU AGPLv3
# 🔑 https://www.gnu.org/licenses/agpl-3.0.html
import os
import pyrogram
import telethon
from telethon.extensions.html import CUSTOM_EMOJIS

View File

@ -298,7 +298,7 @@ class Module:
from . import utils
local_event = asyncio.Event()
self.__approve += [(channel, local_event)]
self.__approve += [(channel, local_event)] # skipcq: PTC-W0037
await local_event.wait()
event.status = local_event.status
event.set()

View File

@ -66,11 +66,12 @@ class WebDebugger:
def app(request):
if request.args.get("ping", "N").upper() == "Y":
return Response("ok")
if request.args.get("shutdown", "N").upper() == "Y":
self._server._BaseServer__shutdown_request = True
return Response("Shutdown!")
else:
raise self.exceptions.get(request.args.get("ex_id"), Exception("idk"))
raise self.exceptions.get(request.args.get("ex_id"), Exception("idk"))
app = DebuggedApplication(app, evalex=True, pin_security=True)