diff --git a/hikka/modules/hikka_backup.py b/hikka/modules/hikka_backup.py deleted file mode 100644 index 86b8bd2..0000000 --- a/hikka/modules/hikka_backup.py +++ /dev/null @@ -1,311 +0,0 @@ -# ÂŠī¸ Dan Gazizullin, 2021-2023 -# This file is a part of Hikka Userbot -# 🌐 https://github.com/hikariatama/Hikka -# You can redistribute it and/or modify it under the terms of the GNU AGPLv3 -# 🔑 https://www.gnu.org/licenses/agpl-3.0.html - -import asyncio -import contextlib -import datetime -import io -import json -import logging -import os -import time -import zipfile -from pathlib import Path - -from hikkatl.tl.types import Message - -from .. import loader, utils -from ..inline.types import BotInlineCall - -logger = logging.getLogger(__name__) - - -@loader.tds -class HikkaBackupMod(loader.Module): - """Handles database and modules backups""" - - strings = {"name": "HerokuBackup"} - - async def client_ready(self): - if not self.get("period"): - await self.inline.bot.send_photo( - self.tg_id, - photo="https://github.com/coddrago/Heroku/raw/refs/heads/v1.6.8/assets/unit_alpha.png", - caption=self.strings("period"), - reply_markup=self.inline.generate_markup( - utils.chunks( - [ - { - "text": f"🕰 {i} h", - "callback": self._set_backup_period, - "args": (i,), - } - for i in [1, 2, 4, 6, 8, 12, 24, 48, 168] - ], - 3, - ) - + [ - [ - { - "text": "đŸšĢ Never", - "callback": self._set_backup_period, - "args": (0,), - } - ] - ] - ), - ) - - self._backup_channel, _ = await utils.asset_channel( - self._client, - "heroku-backups", - "đŸ“ŧ Your database backups will appear here", - silent=True, - archive=True, - avatar="https://raw.githubusercontent.com/coddrago/Heroku/refs/heads/v1.6.8/assets/heroku-backups.png", - _folder="hikka", - invite_bot=True, - ) - - async def _set_backup_period(self, call: BotInlineCall, value: int): - if not value: - self.set("period", "disabled") - await call.answer(self.strings("never"), show_alert=True) - await call.delete() - return - - self.set("period", value * 60 * 60) - self.set("last_backup", round(time.time())) - - await call.answer(self.strings("saved"), show_alert=True) - await call.delete() - - @loader.command() - async def set_backup_period(self, message: Message): - """[time] | set your backup bd period""" - if ( - not (args := utils.get_args_raw(message)) - or not args.isdigit() - or int(args) not in range(200) - ): - await utils.answer(message, self.strings("invalid_args")) - return - - if not int(args): - self.set("period", "disabled") - await utils.answer(message, f"{self.strings('never')}") - return - - period = int(args) * 60 * 60 - self.set("period", period) - self.set("last_backup", round(time.time())) - await utils.answer(message, f"{self.strings('saved')}") - - @loader.loop(interval=1, autostart=True) - async def handler(self): - try: - if self.get("period") == "disabled": - raise loader.StopLoop - - if not self.get("period"): - await asyncio.sleep(3) - return - - if not self.get("last_backup"): - self.set("last_backup", round(time.time())) - await asyncio.sleep(self.get("period")) - return - - await asyncio.sleep( - self.get("last_backup") + self.get("period") - time.time() - ) - - backup = io.BytesIO(json.dumps(self._db).encode()) - backup.name = ( - f"heroku-db-backup-{datetime.datetime.now():%d-%m-%Y-%H-%M}.json" - ) - - await self.inline.bot.send_document( - int(f"-100{self._backup_channel.id}"), - backup, - reply_markup=self.inline.generate_markup( - [ - [ - { - "text": "â†Ēī¸ Restore this", - "data": "heroku/backup/restore/confirm", - } - ] - ] - ), - ) - - self.set("last_backup", round(time.time())) - except loader.StopLoop: - raise - except Exception: - logger.exception("HerokuBackup failed") - await asyncio.sleep(60) - - @loader.callback_handler() - async def restore(self, call: BotInlineCall): - if not call.data.startswith("heroku/backup/restore"): - return - - if call.data == "heroku/backup/restore/confirm": - await utils.answer( - call, - "❓ Are you sure?", - reply_markup={ - "text": "✅ Yes", - "data": "heroku/backup/restore", - }, - ) - return - - file = await ( - await self._client.get_messages( - self._backup_channel, call.message.message_id - ) - )[0].download_media(bytes) - - decoded_text = json.loads(file.decode()) - - with contextlib.suppress(KeyError): - decoded_text["hikka.inline"].pop("bot_token") - - if not self._db.process_db_autofix(decoded_text): - raise RuntimeError("Attempted to restore broken database") - - self._db.clear() - self._db.update(**decoded_text) - self._db.save() - - await call.answer(self.strings("db_restored"), show_alert=True) - await self.invoke("restart", "-f", peer=call.message.peer_id) - - @loader.command() - async def backupdb(self, message: Message): - """| save backup of your bd""" - txt = io.BytesIO(json.dumps(self._db).encode()) - txt.name = f"db-backup-{datetime.datetime.now():%d-%m-%Y-%H-%M}.json" - await self._client.send_file( - "me", - txt, - caption=self.strings("backup_caption").format( - prefix=utils.escape_html(self.get_prefix()) - ), - ) - await utils.answer(message, self.strings("backup_sent")) - - @loader.command() - async def restoredb(self, message: Message): - """[reply] | restore your bd""" - if not (reply := await message.get_reply_message()) or not reply.media: - await utils.answer( - message, - self.strings("reply_to_file"), - ) - return - - file = await reply.download_media(bytes) - decoded_text = json.loads(file.decode()) - - with contextlib.suppress(KeyError): - decoded_text["hikka.inline"].pop("bot_token") - - if not self._db.process_db_autofix(decoded_text): - raise RuntimeError("Attempted to restore broken database") - - self._db.clear() - self._db.update(**decoded_text) - self._db.save() - - await utils.answer(message, self.strings("db_restored")) - await self.invoke("restart", "-f", peer=message.peer_id) - - @loader.command() - async def backupmods(self, message: Message): - """| save backup of mods""" - mods_quantity = len(self.lookup("Loader").get("loaded_modules", {})) - - result = io.BytesIO() - result.name = "mods.zip" - - db_mods = json.dumps(self.lookup("Loader").get("loaded_modules", {})).encode() - - with zipfile.ZipFile(result, "w", zipfile.ZIP_DEFLATED) as zipf: - for root, _, files in os.walk(loader.LOADED_MODULES_DIR): - for file in files: - if file.endswith(f"{self.tg_id}.py"): - with open(os.path.join(root, file), "rb") as f: - zipf.writestr(file, f.read()) - mods_quantity += 1 - - zipf.writestr("db_mods.json", db_mods) - - archive = io.BytesIO(result.getvalue()) - archive.name = f"mods-{datetime.datetime.now():%d-%m-%Y-%H-%M}.zip" - - await utils.answer_file( - message, - archive, - caption=self.strings("modules_backup").format( - mods_quantity, - utils.escape_html(self.get_prefix()), - ), - ) - - @loader.command() - async def restoremods(self, message: Message): - """[reply] | restore your mods""" - if not (reply := await message.get_reply_message()) or not reply.media: - await utils.answer(message, self.strings("reply_to_file")) - return - - file = await reply.download_media(bytes) - try: - decoded_text = json.loads(file.decode()) - except Exception: - try: - file = io.BytesIO(file) - file.name = "mods.zip" - - with zipfile.ZipFile(file) as zf: - with zf.open("db_mods.json", "r") as modules: - db_mods = json.loads(modules.read().decode()) - if isinstance(db_mods, dict) and all( - ( - isinstance(key, str) - and isinstance(value, str) - and utils.check_url(value) - ) - for key, value in db_mods.items() - ): - self.lookup("Loader").set("loaded_modules", db_mods) - - for name in zf.namelist(): - if name == "db_mods.json": - continue - - path = loader.LOADED_MODULES_PATH / Path(name).name - with zf.open(name, "r") as module: - path.write_bytes(module.read()) - except Exception: - logger.exception("Unable to restore modules") - await utils.answer(message, self.strings("reply_to_file")) - return - else: - if not isinstance(decoded_text, dict) or not all( - isinstance(key, str) and isinstance(value, str) - for key, value in decoded_text.items() - ): - raise RuntimeError("Invalid backup") - - self.lookup("Loader").set("loaded_modules", decoded_text) - - await utils.answer(message, self.strings("mods_restored")) - await self.invoke("restart", "-f", peer=message.peer_id)