- Add QR login using ASCII for noweb
- Fix `.tsecrm`
pull/1/head
hikariatama 2023-03-02 13:03:34 +00:00
parent bb94218cfc
commit e917222ff6
4 changed files with 1697 additions and 21 deletions

View File

@ -37,24 +37,32 @@ import socket
import sqlite3
import sys
import typing
from getpass import getpass
from math import ceil
from pathlib import Path
import hikkatl
from hikkatl import events
from hikkatl.errors.rpcerrorlist import (
from hikkatl.errors import (
ApiIdInvalidError,
AuthKeyDuplicatedError,
FloodWaitError,
PasswordHashInvalidError,
PhoneNumberInvalidError,
SessionPasswordNeededError,
)
from hikkatl.network.connection import (
ConnectionTcpFull,
ConnectionTcpMTProxyRandomizedIntermediate,
)
from hikkatl.password import compute_check
from hikkatl.sessions import MemorySession, SQLiteSession
from hikkatl.tl.functions.account import GetPasswordRequest
from hikkatl.tl.functions.auth import CheckPasswordRequest
from . import database, loader, utils, version
from .dispatcher import CommandDispatcher
from .qr import QRCode
from .tl_cache import CustomTelegramClient
from .translations import Translator
from .version import __version__
@ -223,6 +231,15 @@ def parse_arguments() -> dict:
)
parser.add_argument("--phone", "-p", action="append")
parser.add_argument("--no-web", dest="disable_web", action="store_true")
parser.add_argument(
"--qr-login",
dest="qr_login",
action="store_true",
help=(
"Use QR code login instead of phone number (will only work if scanned from"
" another device)"
),
)
parser.add_argument(
"--data-root",
dest="data_root",
@ -402,7 +419,11 @@ class Hikka:
def _init_web(self):
"""Initialize web"""
if not web_available or getattr(self.arguments, "disable_web", False):
if (
not web_available
or getattr(self.arguments, "disable_web", False)
or IS_TERMUX
):
self.web = None
return
@ -487,37 +508,106 @@ class Hikka:
return False
def _initial_setup(self) -> bool:
async def _initial_setup(self) -> bool:
"""Responsible for first start"""
if self.arguments.no_auth:
return False
if not self.web:
try:
client = CustomTelegramClient(
MemorySession(),
self.api_token.ID,
self.api_token.HASH,
connection=self.conn,
proxy=self.proxy,
connection_retries=None,
device_model=generate_app_name(),
system_version="Windows 10",
app_version=".".join(map(str, __version__)) + " x64",
lang_code="en",
system_lang_code="en-US",
)
if (
input(
"\033[0;96mUse QR code? [y/N]: \033[0m"
if IS_TERMUX
else "Use QR code? [y/N]: "
).lower()
!= "y"
):
phone = input(
"\033[0;96mEnter phone: \033[0m" if IS_TERMUX else "Enter phone: "
)
client = CustomTelegramClient(
MemorySession(),
self.api_token.ID,
self.api_token.HASH,
connection=self.conn,
proxy=self.proxy,
connection_retries=None,
device_model=generate_app_name(),
system_version="Windows 10",
app_version=".".join(map(str, __version__)) + " x64",
lang_code="en",
system_lang_code="en-US",
)
client.start(phone)
asyncio.ensure_future(self.save_client_session(client))
self.clients += [client]
except (EOFError, OSError):
raise
return True
qr_login = await client.qr_login()
def print_qr():
qr = QRCode()
qr.add_data(qr_login.url)
qr.print_ascii(invert=True)
async def qr_login_poll() -> bool:
logged_in = False
while not logged_in:
try:
logged_in = await qr_login.wait(10)
except asyncio.TimeoutError:
try:
await qr_login.recreate()
print_qr()
except SessionPasswordNeededError:
return True
except SessionPasswordNeededError:
return True
return False
if await qr_login_poll():
password = await client(GetPasswordRequest())
while True:
_2fa = getpass(
f"\033[0;96mEnter 2FA password ({password}): \033[0m"
if IS_TERMUX
else f"Enter 2FA password ({password}): "
)
try:
await client._on_login(
(
await client(
CheckPasswordRequest(
compute_check(password, _2fa.strip())
)
)
).user
)
except PasswordHashInvalidError:
print("\033[0;91mInvalid 2FA password!\033[0m")
except FloodWaitError as e:
seconds, minutes, hours = (
e.seconds % 3600 % 60,
e.seconds % 3600 // 60,
e.seconds // 3600,
)
seconds, minutes, hours = (
f"{seconds} second(-s)",
f"{minutes} minute(-s) " if minutes else "",
f"{hours} hour(-s) " if hours else "",
)
print(
"\033[0;91mYou got FloodWait error! Please wait"
f" {hours}{minutes}{seconds}\033[0m"
)
return False
else:
break
return True
@ -745,7 +835,7 @@ class Hikka:
not self.clients # Search for already inited clients
and not self.sessions # Search for already added sessions
or not self._init_clients() # Attempt to read sessions from env
) and not self._initial_setup(): # Otherwise attempt to run setup
) and not asyncio.run(self._initial_setup()):
return
self.loop.set_exception_handler(

View File

@ -2370,7 +2370,9 @@ class HikkaSecurityMod(loader.Module):
)
if not self._client.dispatcher.security.remove_rule(
"user", target.id, args[1]
"user",
target.id,
args[1],
):
await utils.answer(message, self.strings("no_rules"))
return

1559
hikka/qr.py 100644

File diff suppressed because it is too large Load Diff

View File

@ -257,6 +257,31 @@ class SecurityManager:
return any_
def remove_rule(self, target_type: str, target_id: int, rule_cont: str) -> bool:
"""
Removes targeted security rules for the given target
:param target_type: "user" or "chat"
:param target_id: target entity ID
:param rule_cont: rule name (module or command)
:return: True if any rules were removed
"""
any_ = False
if target_type == "user":
for rule in self.tsec_user.copy():
if rule["target"] == target_id and rule["rule"] == rule_cont:
self.tsec_user.remove(rule)
any_ = True
elif target_type == "chat":
for rule in self.tsec_chat.copy():
if rule["target"] == target_id and rule["rule"] == rule_cont:
self.tsec_chat.remove(rule)
any_ = True
return any_
def get_flags(self, func: typing.Union[Command, int]) -> int:
"""
Gets the security flags for the given function