mirror of https://github.com/coddrago/Heroku
1.0.11: Add `inline.query_gallery` to add ability to call inline gallery via inline query. Add `e404` attribute to `InlineQuery`, which will show, that no results were found
parent
12d833d0dd
commit
0a2327c37e
|
@ -105,7 +105,15 @@ class InlineManager(Gallery, Form, BotInteractions, Events, TokenObtainment):
|
|||
if form["ttl"] < time.time():
|
||||
del self._forms[form_uid]
|
||||
|
||||
await asyncio.sleep(10)
|
||||
for gallery_uid, gallery in self._galleries.copy().items():
|
||||
if gallery["ttl"] < time.time():
|
||||
del self._galleries[gallery_uid]
|
||||
|
||||
for map_uid, config in self._custom_map.copy().items():
|
||||
if config["ttl"] < time.time():
|
||||
del self._custom_map[map_uid]
|
||||
|
||||
await asyncio.sleep(5)
|
||||
|
||||
async def _register_manager(
|
||||
self,
|
||||
|
|
|
@ -175,7 +175,7 @@ class Events(InlineUnit):
|
|||
await query.answer("You are not allowed to press this button!")
|
||||
return
|
||||
|
||||
await self._custom_map[query.data]["handler"](query)
|
||||
await self._custom_map[query.data]["handler"](query, *self._custom_map[query.data].get("args", []), **self._custom_map[query.data].get("kwargs", {}))
|
||||
return
|
||||
|
||||
async def _chosen_inline_handler(
|
||||
|
@ -248,15 +248,18 @@ class Events(InlineUnit):
|
|||
continue
|
||||
|
||||
# Retrieve docs from func
|
||||
doc = utils.escape_html(
|
||||
"\n".join(
|
||||
[
|
||||
line.strip()
|
||||
for line in inspect.getdoc(fun).splitlines()
|
||||
if not line.strip().startswith("@")
|
||||
]
|
||||
try:
|
||||
doc = utils.escape_html(
|
||||
"\n".join(
|
||||
[
|
||||
line.strip()
|
||||
for line in inspect.getdoc(fun).splitlines()
|
||||
if not line.strip().startswith("@")
|
||||
]
|
||||
)
|
||||
)
|
||||
)
|
||||
except AttributeError:
|
||||
doc = "🦥 No docs"
|
||||
|
||||
_help += f"🎹 <code>@{self.bot_username} {name}</code> - {doc}\n"
|
||||
|
||||
|
|
|
@ -10,6 +10,8 @@ from aiogram.types import (
|
|||
InlineQueryResultPhoto,
|
||||
InlineQuery,
|
||||
InlineQueryResultGif,
|
||||
InlineQueryResultArticle,
|
||||
InputTextMessageContent,
|
||||
)
|
||||
|
||||
from aiogram.utils.exceptions import InvalidHTTPUrlContent, BadRequest, RetryAfter
|
||||
|
@ -22,6 +24,7 @@ import asyncio
|
|||
import time
|
||||
import functools
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -41,13 +44,14 @@ class Gallery(InlineUnit):
|
|||
message: Union[Message, int],
|
||||
next_handler: Union[FunctionType, List[str]],
|
||||
caption: Union[str, FunctionType] = "",
|
||||
*,
|
||||
force_me: bool = True,
|
||||
always_allow: Union[list, None] = None,
|
||||
ttl: Union[int, bool] = False,
|
||||
on_unload: Union[FunctionType, None] = None,
|
||||
preload: Union[bool, int] = False,
|
||||
gif: bool = False,
|
||||
reattempt: bool = False,
|
||||
_reattempt: bool = False,
|
||||
) -> Union[bool, str]:
|
||||
"""
|
||||
Processes inline gallery
|
||||
|
@ -127,15 +131,9 @@ class Gallery(InlineUnit):
|
|||
btn_call_data = [utils.rand(16), utils.rand(16), utils.rand(16)]
|
||||
|
||||
try:
|
||||
if asyncio.iscoroutinefunction(next_handler):
|
||||
photo_url = await next_handler()
|
||||
elif getattr(next_handler, "__call__", False):
|
||||
photo_url = next_handler()
|
||||
else:
|
||||
raise Exception("Invalid type for `next_handler`")
|
||||
|
||||
if not isinstance(photo_url, (str, list)):
|
||||
raise Exception("Got invalid result from `next_handler`")
|
||||
photo_url = await self._call_photo(next_handler)
|
||||
if not photo_url:
|
||||
return False
|
||||
except Exception:
|
||||
logger.exception("Error while parsing first photo in gallery")
|
||||
return False
|
||||
|
@ -168,6 +166,7 @@ class Gallery(InlineUnit):
|
|||
),
|
||||
"always_allow": always_allow,
|
||||
"force_me": force_me,
|
||||
"ttl": self._galleries[gallery_uid]["ttl"],
|
||||
}
|
||||
|
||||
self._custom_map[btn_call_data[1]] = {
|
||||
|
@ -180,6 +179,7 @@ class Gallery(InlineUnit):
|
|||
),
|
||||
"always_allow": always_allow,
|
||||
"force_me": force_me,
|
||||
"ttl": self._galleries[gallery_uid]["ttl"],
|
||||
}
|
||||
|
||||
self._custom_map[btn_call_data[2]] = {
|
||||
|
@ -193,10 +193,13 @@ class Gallery(InlineUnit):
|
|||
),
|
||||
"always_allow": always_allow,
|
||||
"force_me": force_me,
|
||||
"ttl": self._galleries[gallery_uid]["ttl"],
|
||||
}
|
||||
|
||||
if isinstance(message, Message):
|
||||
await (message.edit if message.out else message.respond)("👩🎤 <b>Loading inline gallery...</b>")
|
||||
await (message.edit if message.out else message.respond)(
|
||||
"👩🎤 <b>Loading inline gallery...</b>"
|
||||
)
|
||||
|
||||
try:
|
||||
q = await self._client.inline_query(self.bot_username, gallery_uid)
|
||||
|
@ -211,7 +214,7 @@ class Gallery(InlineUnit):
|
|||
|
||||
del self._galleries[gallery_uid]
|
||||
|
||||
if reattempt:
|
||||
if _reattempt:
|
||||
msg = (
|
||||
"🚫 <b>A problem occurred with inline bot "
|
||||
"while processing query. Check logs for "
|
||||
|
@ -226,15 +229,15 @@ class Gallery(InlineUnit):
|
|||
return False
|
||||
|
||||
return await self.gallery(
|
||||
caption,
|
||||
message,
|
||||
next_handler,
|
||||
force_me,
|
||||
always_allow,
|
||||
ttl,
|
||||
on_unload,
|
||||
preload,
|
||||
True,
|
||||
caption=caption,
|
||||
message=message,
|
||||
next_handler=next_handler,
|
||||
force_me=force_me,
|
||||
always_allow=always_allow,
|
||||
ttl=ttl,
|
||||
on_unload=on_unload,
|
||||
preload=preload,
|
||||
_reattempt=True,
|
||||
)
|
||||
|
||||
self._galleries[gallery_uid]["chat"] = utils.get_chat_id(m)
|
||||
|
@ -247,17 +250,137 @@ class Gallery(InlineUnit):
|
|||
|
||||
return gallery_uid
|
||||
|
||||
async def _load_gallery_photos(self, gallery_uid: str) -> None:
|
||||
"""Preloads photo. Should be called via ensure_future"""
|
||||
if asyncio.iscoroutinefunction(self._galleries[gallery_uid]["next_handler"]):
|
||||
photo_url = await self._galleries[gallery_uid]["next_handler"]()
|
||||
elif getattr(self._galleries[gallery_uid]["next_handler"], "__call__", False):
|
||||
photo_url = self._galleries[gallery_uid]["next_handler"]()
|
||||
async def query_gallery(
|
||||
self,
|
||||
query: InlineQuery,
|
||||
items: List[dict],
|
||||
*,
|
||||
force_me: bool = True,
|
||||
always_allow: Union[list, None] = None,
|
||||
) -> None:
|
||||
"""
|
||||
query
|
||||
`InlineQuery` which should be answered with inline gallery
|
||||
items
|
||||
Array of dicts with inline results.
|
||||
Each dict *must* has a:
|
||||
- `title` - The title of the result
|
||||
- `description` - Short description of the result
|
||||
- `next_handler` - Inline gallery handler. Callback or awaitable
|
||||
Each dict *could* has a:
|
||||
- `caption` - Caption of photo. Defaults to `""`
|
||||
force_me
|
||||
Either this gallery buttons must be pressed only by owner scope or no
|
||||
always_allow
|
||||
Users, that are allowed to press buttons in addition to previous rules
|
||||
"""
|
||||
if not isinstance(force_me, bool):
|
||||
logger.error("Invalid type for `force_me`")
|
||||
return False
|
||||
|
||||
if always_allow and not isinstance(always_allow, list):
|
||||
logger.error("Invalid type for `always_allow`")
|
||||
return False
|
||||
|
||||
if not always_allow:
|
||||
always_allow = []
|
||||
|
||||
if (
|
||||
not isinstance(items, list)
|
||||
or not all(isinstance(i, dict) for i in items)
|
||||
or not all(
|
||||
"title" in i
|
||||
and "description" in i
|
||||
and "next_handler" in i
|
||||
and (
|
||||
callable(i["next_handler"])
|
||||
or asyncio.iscoroutinefunction(i)
|
||||
or isinstance(i, list)
|
||||
)
|
||||
and isinstance(i["title"], str)
|
||||
and isinstance(i["description"], str)
|
||||
for i in items
|
||||
)
|
||||
):
|
||||
logger.error("Invalid `items` specified in query gallery")
|
||||
return False
|
||||
|
||||
result = []
|
||||
for i in items:
|
||||
if "thumb_handler" not in i:
|
||||
photo_url = await self._call_photo(i["next_handler"])
|
||||
if not photo_url:
|
||||
return False
|
||||
|
||||
if isinstance(photo_url, list):
|
||||
photo_url = photo_url[0]
|
||||
|
||||
if not isinstance(photo_url, str):
|
||||
logger.error("Invalid result from `next_handler`")
|
||||
continue
|
||||
else:
|
||||
photo_url = await self._call_photo(i["thumb_handler"])
|
||||
if not photo_url:
|
||||
return False
|
||||
|
||||
if isinstance(photo_url, list):
|
||||
photo_url = photo_url[0]
|
||||
|
||||
if not isinstance(photo_url, str):
|
||||
logger.error("Invalid result from `thumb_handler`")
|
||||
continue
|
||||
|
||||
id_ = utils.rand(16)
|
||||
|
||||
self._custom_map[id_] = {
|
||||
"handler": i["next_handler"],
|
||||
"always_allow": always_allow,
|
||||
"force_me": force_me,
|
||||
"caption": i.get("caption", ""),
|
||||
"ttl": round(time.time()) + 120,
|
||||
}
|
||||
|
||||
result += [
|
||||
InlineQueryResultArticle(
|
||||
id=utils.rand(20),
|
||||
title=i["title"],
|
||||
description=i["description"],
|
||||
input_message_content=InputTextMessageContent(
|
||||
f"👩🎤 <b>Loading Hikka gallery...</b>\n<i>#id: {id_}</i>",
|
||||
"HTML",
|
||||
disable_web_page_preview=True,
|
||||
),
|
||||
thumb_url=photo_url,
|
||||
thumb_width=128,
|
||||
thumb_height=128,
|
||||
)
|
||||
]
|
||||
|
||||
await query.answer(result, cache_time=0)
|
||||
|
||||
async def _call_photo(self, callback: FunctionType) -> Union[str, bool]:
|
||||
"""Parses photo url from `callback`. Returns url on success, otherwise `False`"""
|
||||
if asyncio.iscoroutinefunction(callback):
|
||||
photo_url = await callback()
|
||||
elif getattr(callback, "__call__", False):
|
||||
photo_url = callback()
|
||||
elif isinstance(callback, str):
|
||||
photo_url = callback
|
||||
elif isinstance(callback, list):
|
||||
photo_url = callback[0]
|
||||
else:
|
||||
raise Exception("Invalid type for `next_handler`")
|
||||
logger.error("Invalid type for `next_handler`")
|
||||
return False
|
||||
|
||||
if not isinstance(photo_url, (str, list)):
|
||||
raise Exception("Got invalid result from `next_handler`")
|
||||
logger.error("Got invalid result from `next_handler`")
|
||||
return False
|
||||
|
||||
return photo_url
|
||||
|
||||
async def _load_gallery_photos(self, gallery_uid: str) -> None:
|
||||
"""Preloads photo. Should be called via ensure_future"""
|
||||
photo_url = await self._call_photo(self._galleries[gallery_uid]["next_handler"])
|
||||
|
||||
self._galleries[gallery_uid]["photos"] += (
|
||||
[photo_url] if isinstance(photo_url, str) else photo_url
|
||||
|
@ -385,7 +508,7 @@ class Gallery(InlineUnit):
|
|||
reply_markup=self._gallery_markup(btn_call_data),
|
||||
)
|
||||
except (InvalidHTTPUrlContent, BadRequest):
|
||||
logger.exception("Error fetching photo content, attempting load next one")
|
||||
logger.debug("Error fetching photo content, attempting load next one")
|
||||
del self._galleries[gallery_uid]["photos"][
|
||||
self._galleries[gallery_uid]["current_index"]
|
||||
]
|
||||
|
@ -407,7 +530,11 @@ class Gallery(InlineUnit):
|
|||
self._galleries[gallery_uid]["current_index"]
|
||||
]
|
||||
except IndexError:
|
||||
logger.error(f"Got IndexError in `_get_next_photo`. {self._galleries[gallery_uid]['current_index']=} / {len(self._galleries[gallery_uid]['photos'])=}")
|
||||
logger.error(
|
||||
"Got IndexError in `_get_next_photo`. "
|
||||
f"{self._galleries[gallery_uid]['current_index']=} / "
|
||||
f"{len(self._galleries[gallery_uid]['photos'])=}"
|
||||
)
|
||||
return self._galleries[gallery_uid]["photos"][0]
|
||||
|
||||
def _get_caption(self, gallery_uid: str) -> str:
|
||||
|
|
|
@ -1,8 +1,16 @@
|
|||
from aiogram.types import Message as AiogramMessage, InlineQuery as AiogramInlineQuery
|
||||
from aiogram.types import (
|
||||
Message as AiogramMessage,
|
||||
InlineQuery as AiogramInlineQuery,
|
||||
InlineQueryResultArticle,
|
||||
InputTextMessageContent,
|
||||
)
|
||||
|
||||
from .. import utils
|
||||
|
||||
|
||||
class InlineCall:
|
||||
"""Modified version of original Aiogram CallbackQuery"""
|
||||
|
||||
def __init__(self):
|
||||
self.delete = None
|
||||
self.unload = None
|
||||
|
@ -12,18 +20,21 @@ class InlineCall:
|
|||
|
||||
class InlineUnit:
|
||||
"""InlineManager extension type. For internal use only"""
|
||||
|
||||
def __init__(self):
|
||||
"""Made just for type specification"""
|
||||
|
||||
|
||||
class BotMessage(AiogramMessage):
|
||||
"""Modified version of original Aiogram Message"""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
|
||||
class InlineQuery:
|
||||
"""Modified version of original Aiogram InlineQuery"""
|
||||
|
||||
def __init__(self, inline_query: AiogramInlineQuery) -> None:
|
||||
self.inline_query = inline_query
|
||||
|
||||
|
@ -33,6 +44,9 @@ class InlineQuery:
|
|||
if attr.startswith("__") and attr.endswith("__"):
|
||||
continue # Ignore magic attrs
|
||||
|
||||
if hasattr(self, attr):
|
||||
continue # Do not override anything
|
||||
|
||||
try:
|
||||
setattr(self, attr, getattr(inline_query, attr))
|
||||
except AttributeError:
|
||||
|
@ -44,3 +58,22 @@ class InlineQuery:
|
|||
if len(self.inline_query.query.split()) > 1
|
||||
else ""
|
||||
)
|
||||
|
||||
async def e404(self) -> None:
|
||||
await self.answer(
|
||||
[
|
||||
InlineQueryResultArticle(
|
||||
id=utils.rand(20),
|
||||
title="🚫 404",
|
||||
description="No results found",
|
||||
input_message_content=InputTextMessageContent(
|
||||
"😶🌫️ <i>There is nothing here...</i>",
|
||||
parse_mode="HTML",
|
||||
),
|
||||
thumb_url="https://img.icons8.com/external-justicon-flat-justicon/344/external-404-error-responsive-web-design-justicon-flat-justicon.png",
|
||||
thumb_width=128,
|
||||
thumb_height=128,
|
||||
)
|
||||
],
|
||||
cache_time=0,
|
||||
)
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
# █ █ ▀ █▄▀ ▄▀█ █▀█ ▀ ▄▀█ ▀█▀ ▄▀█ █▀▄▀█ ▄▀█
|
||||
# █▀█ █ █ █ █▀█ █▀▄ █ ▄ █▀█ █ █▀█ █ ▀ █ █▀█
|
||||
#
|
||||
# © Copyright 2022
|
||||
#
|
||||
# https://t.me/hikariatama
|
||||
#
|
||||
# 🔒 Licensed under the CC BY-NC-ND 4.0
|
||||
# 🌐 https://creativecommons.org/licenses/by-nc-nd/4.0
|
||||
|
||||
# scope: inline
|
||||
# scope: hikka_only
|
||||
# meta developer: @hikariatama
|
||||
|
||||
from .. import loader, utils
|
||||
from telethon.tl.types import Message
|
||||
from aiogram.types import CallbackQuery
|
||||
import logging
|
||||
import re
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@loader.tds
|
||||
class InlineStuffMod(loader.Module):
|
||||
"""Provides support for inline stuff"""
|
||||
|
||||
strings = {"name": "InlineStuff"}
|
||||
|
||||
async def client_ready(self, client, db) -> None:
|
||||
self._db = db
|
||||
self._client = client
|
||||
self._bot_id = (await self.inline.bot.get_me()).id
|
||||
|
||||
async def inline__close(self, call: CallbackQuery) -> None:
|
||||
await call.delete()
|
||||
|
||||
async def watcher(self, message: Message) -> None:
|
||||
if (
|
||||
not getattr(message, "out", False)
|
||||
or not getattr(message, "via_bot_id", False)
|
||||
or message.via_bot_id != self._bot_id
|
||||
or "Loading Hikka gallery..." not in getattr(message, "raw_text", "")
|
||||
):
|
||||
return
|
||||
|
||||
id_ = re.search(r"#id: ([a-zA-Z0-9]+)", message.raw_text).group(1)
|
||||
|
||||
await self.inline.gallery(
|
||||
message=utils.get_chat_id(message),
|
||||
next_handler=self.inline._custom_map[id_]["handler"],
|
||||
caption=self.inline._custom_map[id_].get("caption", ""),
|
||||
)
|
||||
|
||||
await message.delete()
|
|
@ -1 +1 @@
|
|||
__version__ = (1, 0, 10)
|
||||
__version__ = (1, 0, 11)
|
||||
|
|
Loading…
Reference in New Issue