облом(((

pull/127/head
Who? 2025-03-12 23:54:00 +07:00 committed by GitHub
parent e52467b843
commit cd4a255aba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 259 additions and 87 deletions

View File

@ -1,49 +1,66 @@
"""Main bot page"""
# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀
# █▀█ █ █ █ █▀█ █▀▄ █
# © Copyright 2022
# https://t.me/hikariatama
#
# 🔒 Licensed under the GNU AGPLv3
# 🌐 https://www.gnu.org/licenses/agpl-3.0.html
# ©️ Dan Gazizullin, 2021-2023
# This file is a part of Hikka Userbot
# 🌐 https://github.com/hikariatama/Hikka
# You can redistribute it and/or modify it under the terms of the GNU AGPLv3
# 🔑 https://www.gnu.org/licenses/agpl-3.0.html
# ©️ Codrago, 2024-2025
# This file is a part of Heroku Userbot
# 🌐 https://github.com/coddrago/Heroku
# You can redistribute it and/or modify it under the terms of the GNU AGPLv3
# 🔑 https://www.gnu.org/licenses/agpl-3.0.html
import asyncio
import collections
import os
import string
from aiohttp import web
import aiohttp_jinja2
import atexit
import functools
import logging
import sys
import os
import re
import requests
import string
import time
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton
import aiohttp_jinja2
import requests
from aiogram.types import InlineKeyboardButton, InlineKeyboardMarkup
from aiohttp import web
from hikkatl.errors import (
FloodWaitError,
PasswordHashInvalidError,
PhoneCodeExpiredError,
PhoneCodeInvalidError,
SessionPasswordNeededError,
YouBlockedUserError,
)
from hikkatl.password import compute_check
from hikkatl.sessions import MemorySession
from hikkatl.tl.functions.account import GetPasswordRequest
from hikkatl.tl.functions.auth import CheckPasswordRequest
from hikkatl.tl.functions.contacts import UnblockRequest
from hikkatl.utils import parse_phone
import telethon
from telethon.errors.rpcerrorlist import YouBlockedUserError, FloodWaitError
from telethon.tl.functions.contacts import UnblockRequest
from .. import utils, main, database
from ..tl_cache import CustomTelegramClient
from .. import database, main, utils
from .._internal import restart
from ..tl_cache import CustomTelegramClient
from ..version import __version__
DATA_DIR = (
os.path.normpath(os.path.join(utils.get_base_dir(), ".."))
if "OKTETO" not in os.environ and "DOCKER" not in os.environ
else "/data"
"/data"
if "DOCKER" in os.environ
else os.path.normpath(os.path.join(utils.get_base_dir(), ".."))
)
logger = logging.getLogger(__name__)
class Web:
def __init__(self, **kwargs):
self.sign_in_clients = {}
self._pending_client = None
self._qr_login = None
self._qr_task = None
self._2fa_needed = None
self._sessions = []
self._ratelimit = {}
self.api_token = kwargs.pop("api_token")
@ -52,24 +69,49 @@ class Web:
self.proxy = kwargs.pop("proxy")
self.app.router.add_get("/", self.root)
self.app.router.add_put("/setApi", self.set_tg_api)
self.app.router.add_post("/sendTgCode", self.send_tg_code)
self.app.router.add_put("/set_api", self.set_tg_api)
self.app.router.add_post("/send_tg_code", self.send_tg_code)
self.app.router.add_post("/check_session", self.check_session)
self.app.router.add_post("/web_auth", self.web_auth)
self.app.router.add_post("/okteto", self.okteto)
self.app.router.add_post("/tgCode", self.tg_code)
self.app.router.add_post("/finishLogin", self.finish_login)
self.app.router.add_post("/tg_code", self.tg_code)
self.app.router.add_post("/finish_login", self.finish_login)
self.app.router.add_post("/custom_bot", self.custom_bot)
self.app.router.add_post("/init_qr_login", self.init_qr_login)
self.app.router.add_post("/get_qr_url", self.get_qr_url)
self.app.router.add_post("/qr_2fa", self.qr_2fa)
self.app.router.add_post("/can_add", self.can_add)
self.api_set = asyncio.Event()
self.clients_set = asyncio.Event()
async def schedule_restart():
# Yeah-yeah, ikr, but it's the only way to restart
await asyncio.sleep(1)
restart()
@property
def _platform_emoji(self) -> str:
return {
"vds": "https://github.com/hikariatama/assets/raw/master/waning-crescent-moon_1f318.png",
"lavhost": "https://github.com/hikariatama/assets/raw/master/victory-hand_270c-fe0f.png",
"termux": "https://github.com/hikariatama/assets/raw/master/smiling-face-with-sunglasses_1f60e.png",
"docker": "https://github.com/hikariatama/assets/raw/master/spouting-whale_1f433.png",
}[(
"lavhost"
if "LAVHOST" in os.environ
else (
"termux"
if "com.termux" in os.environ.get("PREFIX", "")
else "docker" if "DOCKER" in os.environ else "vds"
)
)]
@aiohttp_jinja2.template("root.jinja2")
async def root(self, _):
return {
"skip_creds": self.api_token is not None,
"tg_done": bool(self.client_data),
"okteto": "OKTETO" in os.environ,
"lavhost": "LAVHOST" in os.environ,
"platform_emoji": self._platform_emoji,
}
async def check_session(self, request: web.Request) -> web.Response:
@ -172,11 +214,8 @@ class Web:
body="You specified invalid API ID and/or API HASH",
)
with open(
os.path.join(self.data_root or DATA_DIR, "api_token.txt"),
"w",
) as f:
f.write(api_id + "\n" + api_hash)
main.save_config_key("api_id", int(api_id))
main.save_config_key("api_hash", api_hash)
self.api_token = collections.namedtuple("api_token", ("ID", "HASH"))(
api_id,
@ -186,52 +225,194 @@ class Web:
self.api_set.set()
return web.Response(body="ok")
async def send_tg_code(self, request: web.Request) -> web.Response:
async def _qr_login_poll(self):
logged_in = False
self._2fa_needed = False
logger.debug("Waiting for QR login to complete")
while not logged_in:
try:
logged_in = await self._qr_login.wait(10)
except asyncio.TimeoutError:
logger.debug("Recreating QR login")
try:
await self._qr_login.recreate()
except SessionPasswordNeededError:
self._2fa_needed = True
return
except SessionPasswordNeededError:
self._2fa_needed = True
break
logger.debug("QR login completed. 2FA needed: %s", self._2fa_needed)
self._qr_login = True
async def init_qr_login(self, request: web.Request) -> web.Response:
if self.client_data and "LAVHOST" in os.environ:
return web.Response(status=403, body="Forbidden by LavHost EULA")
if "JAMHOST" in os.environ:
return web.Response(status=403, body="Forbidden by JamHost EULA")
if "HIKKAHOST" in os.environ:
return web.Response(status=403, body="Forbidden by HikkaHost EULA")
if not self._check_session(request):
return web.Response(status=401, body="Authorization required")
return web.Response(status=401)
if self._pending_client:
return web.Response(status=208, body="Already pending")
if self._pending_client is not None:
self._pending_client = None
self._qr_login = None
if self._qr_task:
self._qr_task.cancel()
self._qr_task = None
text = await request.text()
phone = telethon.utils.parse_phone(text)
self._2fa_needed = False
logger.debug("QR login cancelled, new session created")
if not phone:
return web.Response(status=400, body="Invalid phone number")
client = self._get_client()
self._pending_client = client
client = CustomTelegramClient(
telethon.sessions.MemorySession(),
await client.connect()
self._qr_login = await client.qr_login()
self._qr_task = asyncio.ensure_future(self._qr_login_poll())
return web.Response(body=self._qr_login.url)
async def get_qr_url(self, request: web.Request) -> web.Response:
if not self._check_session(request):
return web.Response(status=401)
if self._qr_login is True:
if self._2fa_needed:
return web.Response(status=403, body="2FA")
await main.hikka.save_client_session(
self._pending_client, delay_restart=True
)
asyncio.ensure_future(self.schedule_restart())
return web.Response(status=200, body="SUCCESS")
if self._qr_login is None:
await self.init_qr_login(request)
if self._qr_login is None:
return web.Response(
status=500,
body="Internal Server Error: Unable to initialize QR login",
)
return web.Response(status=201, body=self._qr_login.url)
def _get_client(self) -> CustomTelegramClient:
return CustomTelegramClient(
MemorySession(),
self.api_token.ID,
self.api_token.HASH,
connection=self.connection,
proxy=self.proxy,
connection_retries=None,
device_model="Hikka",
device_model=main.get_app_name(),
system_version="Windows 10",
app_version=".".join(map(str, __version__)) + " x64",
lang_code="en",
system_lang_code="en-US",
)
async def can_add(self, request: web.Request) -> web.Response:
if self.client_data and "LAVHOST" in os.environ:
return web.Response(status=403, body="Forbidden by host EULA")
return web.Response(status=200, body="Yes")
async def send_tg_code(self, request: web.Request) -> web.Response:
if not self._check_session(request):
return web.Response(status=401, body="Authorization required")
if self.client_data and "LAVHOST" in os.environ:
return web.Response(status=403, body="Forbidden by host EULA")
if "JAMHOST" in os.environ:
return web.Response(status=403, body="Forbidden by JamHost EULA")
if "HIKKAHOST" in os.environ:
return web.Response(status=403, body="Forbidden by HikkaHost EULA")
if self._pending_client:
return web.Response(status=208, body="Already pending")
text = await request.text()
phone = parse_phone(text)
if not phone:
return web.Response(status=400, body="Invalid phone number")
client = self._get_client()
self._pending_client = client
await client.connect()
try:
await client.send_code_request(phone)
except FloodWaitError as e:
return web.Response(
status=429,
body=(
f"You got FloodWait of {e.seconds} seconds. Wait the specified"
" amount of time and try again."
),
)
return web.Response(status=429, body=self._render_fw_error(e))
return web.Response(body="ok")
async def okteto(self, request: web.Request) -> web.Response:
if main.get_config_key("okteto_uri"):
return web.Response(status=418)
@staticmethod
def _render_fw_error(e: FloodWaitError) -> str:
seconds, minutes, hours = (
e.seconds % 3600 % 60,
e.seconds % 3600 // 60,
e.seconds // 3600,
)
seconds, minutes, hours = (
f"{seconds} second(-s)",
f"{minutes} minute(-s) " if minutes else "",
f"{hours} hour(-s) " if hours else "",
)
return (
f"You got FloodWait for {hours}{minutes}{seconds}. Wait the specified"
" amount of time and try again."
)
async def qr_2fa(self, request: web.Request) -> web.Response:
if not self._check_session(request):
return web.Response(status=401)
text = await request.text()
main.save_config_key("okteto_uri", text)
return web.Response(body="URI_SAVED")
logger.debug("2FA code received for QR login: %s", text)
try:
await self._pending_client._on_login(
(
await self._pending_client(
CheckPasswordRequest(
compute_check(
await self._pending_client(GetPasswordRequest()),
text.strip(),
)
)
)
).user
)
except PasswordHashInvalidError:
logger.debug("Invalid 2FA code")
return web.Response(
status=403,
body="Invalid 2FA password",
)
except FloodWaitError as e:
logger.debug("FloodWait for 2FA code")
return web.Response(
status=421,
body=(self._render_fw_error(e)),
)
logger.debug("2FA code accepted, logging in")
await main.hikka.save_client_session(self._pending_client, delay_restart=True)
asyncio.ensure_future(self.schedule_restart())
return web.Response()
async def tg_code(self, request: web.Request) -> web.Response:
if not self._check_session(request):
@ -248,7 +429,7 @@ class Web:
return web.Response(status=400)
code = split[0]
phone = telethon.utils.parse_phone(split[1])
phone = parse_phone(split[1])
password = split[2]
if (
@ -261,41 +442,36 @@ class Web:
if not password:
try:
await self._pending_client.sign_in(phone, code=code)
except telethon.errors.SessionPasswordNeededError:
except SessionPasswordNeededError:
return web.Response(
status=401,
body="2FA Password required",
) # Requires 2FA login
except telethon.errors.PhoneCodeExpiredError:
)
except PhoneCodeExpiredError:
return web.Response(status=404, body="Code expired")
except telethon.errors.PhoneCodeInvalidError:
except PhoneCodeInvalidError:
return web.Response(status=403, body="Invalid code")
except telethon.errors.FloodWaitError as e:
except FloodWaitError as e:
return web.Response(
status=421,
body=(
f"You got FloodWait of {e.seconds} seconds. Wait the specified"
" amount of time and try again."
),
body=(self._render_fw_error(e)),
)
else:
try:
await self._pending_client.sign_in(phone, password=password)
except telethon.errors.PasswordHashInvalidError:
except PasswordHashInvalidError:
return web.Response(
status=403,
body="Invalid 2FA password",
) # Invalid 2FA password
except telethon.errors.FloodWaitError as e:
)
except FloodWaitError as e:
return web.Response(
status=421,
body=(
f"You got FloodWait of {e.seconds} seconds. Wait the specified"
" amount of time and try again."
),
body=(self._render_fw_error(e)),
)
await main.hikka.save_client_session(self._pending_client)
await main.hikka.save_client_session(self._pending_client, delay_restart=True)
asyncio.ensure_future(self.schedule_restart())
return web.Response()
async def finish_login(self, request: web.Request) -> web.Response:
@ -314,13 +490,7 @@ class Web:
self.clients_set.set()
if not first_session:
atexit.register(functools.partial(restart, *sys.argv[1:]))
handler = logging.getLogger().handlers[0]
handler.setLevel(logging.CRITICAL)
for client in main.hikka.clients:
await client.disconnect()
sys.exit(0)
restart()
return web.Response()
@ -386,9 +556,11 @@ class Web:
bot = user[0].inline.bot
msg = await bot.send_message(
user[1].tg_id,
"🌘🔐 <b>Click button below to confirm web application"
f" ops</b>\n\n<b>Client IP</b>: {ips}\n{cities}\n<i>If you did not"
" request any codes, simply ignore this message</i>",
(
"🌘🔐 <b>Click button below to confirm web application"
f" ops</b>\n\n<b>Client IP</b>: {ips}\n{cities}\n<i>If you did"
" not request any codes, simply ignore this message</i>"
),
disable_web_page_preview=True,
reply_markup=markup,
)