From cd4a255abaa110a294adaa03e36288a19f1d2e6b Mon Sep 17 00:00:00 2001
From: Who? <155328415+coddrago@users.noreply.github.com>
Date: Wed, 12 Mar 2025 23:54:00 +0700
Subject: [PATCH] =?UTF-8?q?=D0=BE=D0=B1=D0=BB=D0=BE=D0=BC(((?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
hikka/web/root.py | 346 ++++++++++++++++++++++++++++++++++------------
1 file changed, 259 insertions(+), 87 deletions(-)
diff --git a/hikka/web/root.py b/hikka/web/root.py
index 2ececf7..9894de4 100644
--- a/hikka/web/root.py
+++ b/hikka/web/root.py
@@ -1,49 +1,66 @@
"""Main bot page"""
-# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
-# █▀█ █ █ █ █▀█ █▀▄ █
-# © Copyright 2022
-# https://t.me/hikariatama
-#
-# 🔒 Licensed under the GNU AGPLv3
-# 🌐 https://www.gnu.org/licenses/agpl-3.0.html
+# ©️ Dan Gazizullin, 2021-2023
+# This file is a part of Hikka Userbot
+# 🌐 https://github.com/hikariatama/Hikka
+# You can redistribute it and/or modify it under the terms of the GNU AGPLv3
+# 🔑 https://www.gnu.org/licenses/agpl-3.0.html
+
+# ©️ Codrago, 2024-2025
+# This file is a part of Heroku Userbot
+# 🌐 https://github.com/coddrago/Heroku
+# You can redistribute it and/or modify it under the terms of the GNU AGPLv3
+# 🔑 https://www.gnu.org/licenses/agpl-3.0.html
import asyncio
import collections
-import os
-import string
-
-from aiohttp import web
-import aiohttp_jinja2
-import atexit
import functools
import logging
-import sys
+import os
import re
-import requests
+import string
import time
-from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton
+import aiohttp_jinja2
+import requests
+from aiogram.types import InlineKeyboardButton, InlineKeyboardMarkup
+from aiohttp import web
+from hikkatl.errors import (
+ FloodWaitError,
+ PasswordHashInvalidError,
+ PhoneCodeExpiredError,
+ PhoneCodeInvalidError,
+ SessionPasswordNeededError,
+ YouBlockedUserError,
+)
+from hikkatl.password import compute_check
+from hikkatl.sessions import MemorySession
+from hikkatl.tl.functions.account import GetPasswordRequest
+from hikkatl.tl.functions.auth import CheckPasswordRequest
+from hikkatl.tl.functions.contacts import UnblockRequest
+from hikkatl.utils import parse_phone
-import telethon
-from telethon.errors.rpcerrorlist import YouBlockedUserError, FloodWaitError
-from telethon.tl.functions.contacts import UnblockRequest
-
-from .. import utils, main, database
-from ..tl_cache import CustomTelegramClient
+from .. import database, main, utils
from .._internal import restart
+from ..tl_cache import CustomTelegramClient
+from ..version import __version__
DATA_DIR = (
- os.path.normpath(os.path.join(utils.get_base_dir(), ".."))
- if "OKTETO" not in os.environ and "DOCKER" not in os.environ
- else "/data"
+ "/data"
+ if "DOCKER" in os.environ
+ else os.path.normpath(os.path.join(utils.get_base_dir(), ".."))
)
+logger = logging.getLogger(__name__)
+
class Web:
def __init__(self, **kwargs):
self.sign_in_clients = {}
self._pending_client = None
+ self._qr_login = None
+ self._qr_task = None
+ self._2fa_needed = None
self._sessions = []
self._ratelimit = {}
self.api_token = kwargs.pop("api_token")
@@ -52,24 +69,49 @@ class Web:
self.proxy = kwargs.pop("proxy")
self.app.router.add_get("/", self.root)
- self.app.router.add_put("/setApi", self.set_tg_api)
- self.app.router.add_post("/sendTgCode", self.send_tg_code)
+ self.app.router.add_put("/set_api", self.set_tg_api)
+ self.app.router.add_post("/send_tg_code", self.send_tg_code)
self.app.router.add_post("/check_session", self.check_session)
self.app.router.add_post("/web_auth", self.web_auth)
- self.app.router.add_post("/okteto", self.okteto)
- self.app.router.add_post("/tgCode", self.tg_code)
- self.app.router.add_post("/finishLogin", self.finish_login)
+ self.app.router.add_post("/tg_code", self.tg_code)
+ self.app.router.add_post("/finish_login", self.finish_login)
self.app.router.add_post("/custom_bot", self.custom_bot)
+ self.app.router.add_post("/init_qr_login", self.init_qr_login)
+ self.app.router.add_post("/get_qr_url", self.get_qr_url)
+ self.app.router.add_post("/qr_2fa", self.qr_2fa)
+ self.app.router.add_post("/can_add", self.can_add)
self.api_set = asyncio.Event()
self.clients_set = asyncio.Event()
+ async def schedule_restart():
+ # Yeah-yeah, ikr, but it's the only way to restart
+ await asyncio.sleep(1)
+ restart()
+
+ @property
+ def _platform_emoji(self) -> str:
+ return {
+ "vds": "https://github.com/hikariatama/assets/raw/master/waning-crescent-moon_1f318.png",
+ "lavhost": "https://github.com/hikariatama/assets/raw/master/victory-hand_270c-fe0f.png",
+ "termux": "https://github.com/hikariatama/assets/raw/master/smiling-face-with-sunglasses_1f60e.png",
+ "docker": "https://github.com/hikariatama/assets/raw/master/spouting-whale_1f433.png",
+ }[(
+ "lavhost"
+ if "LAVHOST" in os.environ
+ else (
+ "termux"
+ if "com.termux" in os.environ.get("PREFIX", "")
+ else "docker" if "DOCKER" in os.environ else "vds"
+ )
+ )]
+
@aiohttp_jinja2.template("root.jinja2")
async def root(self, _):
return {
"skip_creds": self.api_token is not None,
"tg_done": bool(self.client_data),
- "okteto": "OKTETO" in os.environ,
"lavhost": "LAVHOST" in os.environ,
+ "platform_emoji": self._platform_emoji,
}
async def check_session(self, request: web.Request) -> web.Response:
@@ -172,11 +214,8 @@ class Web:
body="You specified invalid API ID and/or API HASH",
)
- with open(
- os.path.join(self.data_root or DATA_DIR, "api_token.txt"),
- "w",
- ) as f:
- f.write(api_id + "\n" + api_hash)
+ main.save_config_key("api_id", int(api_id))
+ main.save_config_key("api_hash", api_hash)
self.api_token = collections.namedtuple("api_token", ("ID", "HASH"))(
api_id,
@@ -186,52 +225,194 @@ class Web:
self.api_set.set()
return web.Response(body="ok")
- async def send_tg_code(self, request: web.Request) -> web.Response:
+ async def _qr_login_poll(self):
+ logged_in = False
+ self._2fa_needed = False
+ logger.debug("Waiting for QR login to complete")
+ while not logged_in:
+ try:
+ logged_in = await self._qr_login.wait(10)
+ except asyncio.TimeoutError:
+ logger.debug("Recreating QR login")
+ try:
+ await self._qr_login.recreate()
+ except SessionPasswordNeededError:
+ self._2fa_needed = True
+ return
+ except SessionPasswordNeededError:
+ self._2fa_needed = True
+ break
+
+ logger.debug("QR login completed. 2FA needed: %s", self._2fa_needed)
+ self._qr_login = True
+
+ async def init_qr_login(self, request: web.Request) -> web.Response:
+ if self.client_data and "LAVHOST" in os.environ:
+ return web.Response(status=403, body="Forbidden by LavHost EULA")
+
+ if "JAMHOST" in os.environ:
+ return web.Response(status=403, body="Forbidden by JamHost EULA")
+
+ if "HIKKAHOST" in os.environ:
+ return web.Response(status=403, body="Forbidden by HikkaHost EULA")
+
if not self._check_session(request):
- return web.Response(status=401, body="Authorization required")
+ return web.Response(status=401)
- if self._pending_client:
- return web.Response(status=208, body="Already pending")
+ if self._pending_client is not None:
+ self._pending_client = None
+ self._qr_login = None
+ if self._qr_task:
+ self._qr_task.cancel()
+ self._qr_task = None
- text = await request.text()
- phone = telethon.utils.parse_phone(text)
+ self._2fa_needed = False
+ logger.debug("QR login cancelled, new session created")
- if not phone:
- return web.Response(status=400, body="Invalid phone number")
+ client = self._get_client()
+ self._pending_client = client
- client = CustomTelegramClient(
- telethon.sessions.MemorySession(),
+ await client.connect()
+ self._qr_login = await client.qr_login()
+ self._qr_task = asyncio.ensure_future(self._qr_login_poll())
+
+ return web.Response(body=self._qr_login.url)
+
+ async def get_qr_url(self, request: web.Request) -> web.Response:
+ if not self._check_session(request):
+ return web.Response(status=401)
+
+ if self._qr_login is True:
+ if self._2fa_needed:
+ return web.Response(status=403, body="2FA")
+
+ await main.hikka.save_client_session(
+ self._pending_client, delay_restart=True
+ )
+ asyncio.ensure_future(self.schedule_restart())
+ return web.Response(status=200, body="SUCCESS")
+
+ if self._qr_login is None:
+ await self.init_qr_login(request)
+
+ if self._qr_login is None:
+ return web.Response(
+ status=500,
+ body="Internal Server Error: Unable to initialize QR login",
+ )
+
+ return web.Response(status=201, body=self._qr_login.url)
+
+ def _get_client(self) -> CustomTelegramClient:
+ return CustomTelegramClient(
+ MemorySession(),
self.api_token.ID,
self.api_token.HASH,
connection=self.connection,
proxy=self.proxy,
connection_retries=None,
- device_model="Hikka",
+ device_model=main.get_app_name(),
+ system_version="Windows 10",
+ app_version=".".join(map(str, __version__)) + " x64",
+ lang_code="en",
+ system_lang_code="en-US",
)
+ async def can_add(self, request: web.Request) -> web.Response:
+ if self.client_data and "LAVHOST" in os.environ:
+ return web.Response(status=403, body="Forbidden by host EULA")
+
+ return web.Response(status=200, body="Yes")
+
+ async def send_tg_code(self, request: web.Request) -> web.Response:
+ if not self._check_session(request):
+ return web.Response(status=401, body="Authorization required")
+
+ if self.client_data and "LAVHOST" in os.environ:
+ return web.Response(status=403, body="Forbidden by host EULA")
+
+ if "JAMHOST" in os.environ:
+ return web.Response(status=403, body="Forbidden by JamHost EULA")
+
+ if "HIKKAHOST" in os.environ:
+ return web.Response(status=403, body="Forbidden by HikkaHost EULA")
+
+ if self._pending_client:
+ return web.Response(status=208, body="Already pending")
+
+ text = await request.text()
+ phone = parse_phone(text)
+
+ if not phone:
+ return web.Response(status=400, body="Invalid phone number")
+
+ client = self._get_client()
+
self._pending_client = client
await client.connect()
try:
await client.send_code_request(phone)
except FloodWaitError as e:
- return web.Response(
- status=429,
- body=(
- f"You got FloodWait of {e.seconds} seconds. Wait the specified"
- " amount of time and try again."
- ),
- )
+ return web.Response(status=429, body=self._render_fw_error(e))
return web.Response(body="ok")
- async def okteto(self, request: web.Request) -> web.Response:
- if main.get_config_key("okteto_uri"):
- return web.Response(status=418)
+ @staticmethod
+ def _render_fw_error(e: FloodWaitError) -> str:
+ seconds, minutes, hours = (
+ e.seconds % 3600 % 60,
+ e.seconds % 3600 // 60,
+ e.seconds // 3600,
+ )
+ seconds, minutes, hours = (
+ f"{seconds} second(-s)",
+ f"{minutes} minute(-s) " if minutes else "",
+ f"{hours} hour(-s) " if hours else "",
+ )
+ return (
+ f"You got FloodWait for {hours}{minutes}{seconds}. Wait the specified"
+ " amount of time and try again."
+ )
+
+ async def qr_2fa(self, request: web.Request) -> web.Response:
+ if not self._check_session(request):
+ return web.Response(status=401)
text = await request.text()
- main.save_config_key("okteto_uri", text)
- return web.Response(body="URI_SAVED")
+
+ logger.debug("2FA code received for QR login: %s", text)
+
+ try:
+ await self._pending_client._on_login(
+ (
+ await self._pending_client(
+ CheckPasswordRequest(
+ compute_check(
+ await self._pending_client(GetPasswordRequest()),
+ text.strip(),
+ )
+ )
+ )
+ ).user
+ )
+ except PasswordHashInvalidError:
+ logger.debug("Invalid 2FA code")
+ return web.Response(
+ status=403,
+ body="Invalid 2FA password",
+ )
+ except FloodWaitError as e:
+ logger.debug("FloodWait for 2FA code")
+ return web.Response(
+ status=421,
+ body=(self._render_fw_error(e)),
+ )
+
+ logger.debug("2FA code accepted, logging in")
+ await main.hikka.save_client_session(self._pending_client, delay_restart=True)
+ asyncio.ensure_future(self.schedule_restart())
+ return web.Response()
async def tg_code(self, request: web.Request) -> web.Response:
if not self._check_session(request):
@@ -248,7 +429,7 @@ class Web:
return web.Response(status=400)
code = split[0]
- phone = telethon.utils.parse_phone(split[1])
+ phone = parse_phone(split[1])
password = split[2]
if (
@@ -261,41 +442,36 @@ class Web:
if not password:
try:
await self._pending_client.sign_in(phone, code=code)
- except telethon.errors.SessionPasswordNeededError:
+ except SessionPasswordNeededError:
return web.Response(
status=401,
body="2FA Password required",
- ) # Requires 2FA login
- except telethon.errors.PhoneCodeExpiredError:
+ )
+ except PhoneCodeExpiredError:
return web.Response(status=404, body="Code expired")
- except telethon.errors.PhoneCodeInvalidError:
+ except PhoneCodeInvalidError:
return web.Response(status=403, body="Invalid code")
- except telethon.errors.FloodWaitError as e:
+ except FloodWaitError as e:
return web.Response(
status=421,
- body=(
- f"You got FloodWait of {e.seconds} seconds. Wait the specified"
- " amount of time and try again."
- ),
+ body=(self._render_fw_error(e)),
)
else:
try:
await self._pending_client.sign_in(phone, password=password)
- except telethon.errors.PasswordHashInvalidError:
+ except PasswordHashInvalidError:
return web.Response(
status=403,
body="Invalid 2FA password",
- ) # Invalid 2FA password
- except telethon.errors.FloodWaitError as e:
+ )
+ except FloodWaitError as e:
return web.Response(
status=421,
- body=(
- f"You got FloodWait of {e.seconds} seconds. Wait the specified"
- " amount of time and try again."
- ),
+ body=(self._render_fw_error(e)),
)
- await main.hikka.save_client_session(self._pending_client)
+ await main.hikka.save_client_session(self._pending_client, delay_restart=True)
+ asyncio.ensure_future(self.schedule_restart())
return web.Response()
async def finish_login(self, request: web.Request) -> web.Response:
@@ -314,13 +490,7 @@ class Web:
self.clients_set.set()
if not first_session:
- atexit.register(functools.partial(restart, *sys.argv[1:]))
- handler = logging.getLogger().handlers[0]
- handler.setLevel(logging.CRITICAL)
- for client in main.hikka.clients:
- await client.disconnect()
-
- sys.exit(0)
+ restart()
return web.Response()
@@ -386,9 +556,11 @@ class Web:
bot = user[0].inline.bot
msg = await bot.send_message(
user[1].tg_id,
- "🌘🔐 Click button below to confirm web application"
- f" ops\n\nClient IP: {ips}\n{cities}\nIf you did not"
- " request any codes, simply ignore this message",
+ (
+ "🌘🔐 Click button below to confirm web application"
+ f" ops\n\nClient IP: {ips}\n{cities}\nIf you did"
+ " not request any codes, simply ignore this message"
+ ),
disable_web_page_preview=True,
reply_markup=markup,
)