542 lines
19 KiB
Python
542 lines
19 KiB
Python
#!/usr/bin/env python3
|
||
|
||
import logging
|
||
import pickle
|
||
import sqlite3
|
||
import asyncio
|
||
from datetime import datetime, timedelta
|
||
from typing import List, Union
|
||
import os
|
||
import re
|
||
from asyncio.locks import Event
|
||
from contextlib import contextmanager
|
||
|
||
from telethon import events
|
||
|
||
from telethon.events import NewMessage, MessageDeleted, MessageEdited
|
||
from telethon import TelegramClient
|
||
from telethon.hints import Entity
|
||
from telethon.tl.functions.messages import SaveGifRequest, SaveRecentStickerRequest
|
||
from telethon.tl.types import (
|
||
DocumentAttributeFilename,
|
||
Message,
|
||
PeerUser,
|
||
PeerChat,
|
||
PeerChannel,
|
||
Channel,
|
||
Chat,
|
||
DocumentAttributeSticker,
|
||
DocumentAttributeVideo,
|
||
MessageMediaDice,
|
||
MessageMediaWebPage,
|
||
MessageMediaGame,
|
||
Document,
|
||
InputDocument,
|
||
DocumentAttributeAnimated,
|
||
MessageMediaPhoto,
|
||
MessageMediaContact,
|
||
MessageMediaGeo,
|
||
MessageMediaPoll,
|
||
Photo,
|
||
Contact,
|
||
UpdateReadMessagesContents,
|
||
)
|
||
import config
|
||
import file_encrypt
|
||
|
||
TYPE_USER = 1
|
||
TYPE_CHANNEL = 2
|
||
TYPE_GROUP = 3
|
||
TYPE_BOT = 4
|
||
TYPE_UNKNOWN = 0
|
||
|
||
client = TelegramClient('db/user', config.API_ID, config.API_HASH)
|
||
my_id = -1
|
||
sqlite_cursor: sqlite3.Cursor = None
|
||
sqlite_connection: sqlite3.Connection = None
|
||
|
||
|
||
def init_db():
|
||
if not os.path.exists('db'):
|
||
os.mkdir('db')
|
||
if not os.path.exists('media'):
|
||
os.mkdir('media')
|
||
|
||
connection = sqlite3.connect("db/messages.db")
|
||
cursor = connection.cursor()
|
||
cursor.execute("""CREATE TABLE IF NOT EXISTS messages
|
||
(id INTEGER, from_id INTEGER, chat_id INTEGER,
|
||
type INTEGER, msg_text TEXT, media BLOB, noforwards INTEGER DEFAULT 0, self_destructing INTEGER DEFAULT 0, created_time TIMESTAMP, edited_time TIMESTAMP,
|
||
PRIMARY KEY (chat_id, id, edited_time))""")
|
||
|
||
cursor.execute(
|
||
"CREATE INDEX IF NOT EXISTS messages_created_index ON messages (created_time DESC)")
|
||
|
||
connection.commit()
|
||
|
||
return cursor, connection
|
||
|
||
|
||
async def get_chat_type(event: Event) -> int:
|
||
chat_type = TYPE_UNKNOWN
|
||
if event.is_group: # chats and megagroups
|
||
chat_type = TYPE_GROUP
|
||
elif event.is_channel: # megagroups and channels
|
||
chat_type = TYPE_CHANNEL
|
||
elif event.is_private:
|
||
if (await event.get_sender()).bot:
|
||
chat_type = TYPE_BOT
|
||
else:
|
||
chat_type = TYPE_USER
|
||
return chat_type
|
||
|
||
|
||
async def new_message_handler(event: Union[NewMessage.Event, MessageEdited.Event]):
|
||
chat_id = event.chat_id
|
||
from_id = get_sender_id(event.message)
|
||
msg_id = event.message.id
|
||
|
||
if chat_id == config.LOG_CHAT_ID and from_id == my_id and \
|
||
event.message.text and \
|
||
(re.match(r"^(https:\/\/)?t\.me\/(?:c\/)?[\d\w]+\/[\d]+", event.message.text) or
|
||
re.match(
|
||
r"^tg:\/\/openmessage\?user_id=\d+&message_id=\d+", event.message.text)
|
||
):
|
||
msg_links = re.findall(
|
||
r"(?:https:\/\/)?t\.me\/(?:c\/)?[\d\w]+\/[\d]+", event.message.text)
|
||
if not msg_links:
|
||
msg_links = re.findall(
|
||
r"tg:\/\/openmessage\?user_id=\d+&message_id=\d+", event.message.text)
|
||
if msg_links:
|
||
for msg_link in msg_links:
|
||
await save_restricted_msg(msg_link)
|
||
return
|
||
|
||
if from_id in config.IGNORED_IDS or chat_id in config.IGNORED_IDS:
|
||
return
|
||
|
||
edited_time = 0
|
||
noforwards = False
|
||
self_destructing = False
|
||
|
||
try:
|
||
noforwards = event.chat.noforwards is True
|
||
except AttributeError: # AttributeError: 'User' object has no attribute 'noforwards'
|
||
noforwards = event.message.noforwards is True
|
||
|
||
# noforwards = False # wtf why does it work now?
|
||
|
||
try:
|
||
if event.message.media.ttl_seconds:
|
||
self_destructing = True
|
||
except AttributeError:
|
||
pass
|
||
|
||
if event.message.media and (noforwards or self_destructing):
|
||
await save_media_as_file(event.message)
|
||
|
||
async with asyncio.Lock():
|
||
if isinstance(event, MessageEdited.Event):
|
||
edited_time = datetime.now() # event.message.edit_date
|
||
await edited_deleted_handler(event)
|
||
|
||
sqlite_cursor.execute(
|
||
"INSERT INTO messages (id, from_id, chat_id, edited_time, type, msg_text, media, noforwards, self_destructing, created_time) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||
(
|
||
msg_id,
|
||
from_id,
|
||
chat_id,
|
||
edited_time,
|
||
await get_chat_type(event),
|
||
event.message.message,
|
||
sqlite3.Binary(pickle.dumps(event.message.media)),
|
||
int(noforwards),
|
||
int(self_destructing),
|
||
datetime.now())
|
||
)
|
||
sqlite_connection.commit()
|
||
|
||
|
||
def get_sender_id(message) -> int:
|
||
from_id = 0
|
||
if isinstance(message.peer_id, PeerUser):
|
||
from_id = my_id if message.out else message.peer_id.user_id
|
||
elif isinstance(message.peer_id, PeerChannel):
|
||
if isinstance(message.from_id, PeerUser):
|
||
from_id = message.from_id.user_id
|
||
if isinstance(message.from_id, PeerChannel):
|
||
from_id = message.from_id.channel_id
|
||
elif isinstance(message.peer_id, PeerChat):
|
||
if isinstance(message.from_id, PeerUser):
|
||
from_id = message.from_id.user_id
|
||
if isinstance(message.from_id, PeerChannel):
|
||
from_id = message.from_id.channel_id
|
||
|
||
return from_id
|
||
|
||
|
||
def load_messages_from_event(event: Union[MessageDeleted.Event, MessageEdited.Event, UpdateReadMessagesContents]) -> List[Message]:
|
||
if isinstance(event, MessageDeleted.Event):
|
||
ids = event.deleted_ids[:config.RATE_LIMIT_NUM_MESSAGES]
|
||
if isinstance(event, UpdateReadMessagesContents):
|
||
ids = event.messages[:config.RATE_LIMIT_NUM_MESSAGES]
|
||
elif isinstance(event, MessageEdited.Event):
|
||
ids = [event.message.id]
|
||
|
||
sql_message_ids = ",".join(str(deleted_id) for deleted_id in ids)
|
||
if hasattr(event, "chat_id") and event.chat_id:
|
||
where_clause = f"WHERE chat_id = {event.chat_id} and id IN ({sql_message_ids})"
|
||
else:
|
||
where_clause = f"WHERE chat_id not like \"-100%\" and id IN ({sql_message_ids})"
|
||
query = f"""SELECT * FROM (SELECT id, from_id, chat_id, msg_text, media, noforwards, self_destructing,
|
||
created_time FROM messages {where_clause} ORDER BY edited_time DESC)
|
||
GROUP BY chat_id, id ORDER BY created_time ASC"""
|
||
|
||
db_results = sqlite_cursor.execute(query).fetchall()
|
||
|
||
messages = []
|
||
for db_result in db_results:
|
||
# skip read messages which are not self-destructing
|
||
if isinstance(event, UpdateReadMessagesContents) and not db_result[6]:
|
||
continue
|
||
|
||
messages.append({
|
||
"id": db_result[0],
|
||
"from_id": db_result[1],
|
||
"chat_id": db_result[2],
|
||
"msg_text": db_result[3],
|
||
"media": pickle.loads(db_result[4]),
|
||
"noforwards": db_result[5],
|
||
"self_destructing": db_result[6],
|
||
})
|
||
|
||
return messages
|
||
|
||
|
||
async def create_mention(entity_id, chat_msg_id: int = None) -> str:
|
||
if chat_msg_id is None:
|
||
msg_id = 1
|
||
else:
|
||
msg_id = chat_msg_id
|
||
|
||
if entity_id == 0:
|
||
return "Unknown"
|
||
|
||
try:
|
||
entity: Entity = await client.get_entity(entity_id)
|
||
|
||
if isinstance(entity, (Channel, Chat)):
|
||
name = entity.title
|
||
chat_id = str(entity_id).replace("-100", "")
|
||
mention = f"[{name}](t.me/c/{chat_id}/{msg_id})"
|
||
else:
|
||
if entity.first_name:
|
||
is_pm = chat_msg_id is not None
|
||
name = \
|
||
(entity.first_name + " " if entity.first_name else "") + \
|
||
(entity.last_name if entity.last_name else "")
|
||
|
||
mention = f"[{name}](tg://user?id={entity.id})" + \
|
||
(" #pm" if is_pm else "")
|
||
elif entity.username:
|
||
mention = f"[@{entity.username}](t.me/{entity.username})"
|
||
elif entity.phone:
|
||
mention = entity.phone
|
||
else:
|
||
mention = entity.id
|
||
except Exception as e:
|
||
logging.warning(e)
|
||
mention = str(entity_id)
|
||
|
||
return mention
|
||
|
||
|
||
async def edited_deleted_handler(event: Union[MessageDeleted.Event, MessageEdited.Event, UpdateReadMessagesContents]):
|
||
if not isinstance(event, MessageDeleted.Event) and not isinstance(event, MessageEdited.Event) and not isinstance(event, UpdateReadMessagesContents):
|
||
return
|
||
|
||
if isinstance(event, MessageEdited.Event) and not config.SAVE_EDITED_MESSAGES:
|
||
return
|
||
|
||
messages = load_messages_from_event(event)
|
||
|
||
log_deleted_sender_ids = []
|
||
|
||
for message in messages:
|
||
if message['from_id'] in config.IGNORED_IDS or message['chat_id'] in config.IGNORED_IDS:
|
||
return
|
||
|
||
mention_sender = await create_mention(message['from_id'])
|
||
mention_chat = await create_mention(message['chat_id'], message['id'])
|
||
|
||
log_deleted_sender_ids.append(message['from_id'])
|
||
|
||
if isinstance(event, MessageDeleted.Event) or isinstance(event, UpdateReadMessagesContents):
|
||
if isinstance(event, MessageDeleted.Event):
|
||
text = f"**Deleted message from: **{mention_sender}\n"
|
||
if isinstance(event, UpdateReadMessagesContents):
|
||
text = f"**Deleted #selfdestructing message from: **{mention_sender}\n"
|
||
|
||
text += f"in {mention_chat}\n"
|
||
|
||
if message['msg_text']:
|
||
text += "**Message:** \n" + message['msg_text']
|
||
elif isinstance(event, MessageEdited.Event):
|
||
text = f"**✏Edited message from: **{mention_sender}\n"
|
||
|
||
text += f"in {mention_chat}\n"
|
||
|
||
if message['msg_text']:
|
||
text += f"**Original message:**\n{message['msg_text']}\n\n"
|
||
if event.message.text:
|
||
text += f"**Edited message:**\n{event.message.text}"
|
||
|
||
is_sticker = hasattr(message['media'], "document") and \
|
||
message['media'].document.attributes and \
|
||
any(isinstance(attr, DocumentAttributeSticker)
|
||
for attr in message['media'].document.attributes)
|
||
is_gif = hasattr(message['media'], "document") and \
|
||
message['media'].document.attributes and \
|
||
any(isinstance(attr, DocumentAttributeAnimated)
|
||
for attr in message['media'].document.attributes)
|
||
is_round_video = hasattr(message['media'], "document") and \
|
||
message['media'].document.attributes and \
|
||
any((isinstance(attr, DocumentAttributeVideo) and attr.round_message is True)
|
||
for attr in message['media'].document.attributes)
|
||
is_dice = isinstance(message['media'], MessageMediaDice)
|
||
is_instant_view = isinstance(message['media'], MessageMediaWebPage)
|
||
is_game = isinstance(message['media'], MessageMediaGame)
|
||
is_geo = isinstance(message['media'], MessageMediaGeo)
|
||
is_poll = isinstance(message['media'], MessageMediaPoll)
|
||
is_contact = isinstance(
|
||
message['media'], (MessageMediaContact, Contact))
|
||
|
||
with retrieve_media_as_file(message['id'], message['chat_id'], message['media'], message['noforwards'] or message['self_destructing']) as media_file:
|
||
|
||
if is_sticker or is_round_video or is_dice or is_game or is_contact or is_geo or is_poll:
|
||
sent_msg = await client.send_message(config.LOG_CHAT_ID, file=media_file)
|
||
await sent_msg.reply(text)
|
||
elif is_instant_view:
|
||
await client.send_message(config.LOG_CHAT_ID, text)
|
||
else:
|
||
await client.send_message(config.LOG_CHAT_ID, text, file=media_file)
|
||
|
||
if is_gif and config.DELETE_SENT_GIFS_FROM_SAVED:
|
||
await delete_from_saved_gifs(message['media'].document)
|
||
|
||
if is_sticker and config.DELETE_SENT_STICKERS_FROM_SAVED:
|
||
await delete_from_saved_stickers(message['media'].document)
|
||
|
||
if isinstance(event, MessageDeleted.Event):
|
||
ids = event.deleted_ids
|
||
event_verb = "deleted"
|
||
elif isinstance(event, UpdateReadMessagesContents):
|
||
ids = event.messages
|
||
event_verb = "self destructed"
|
||
elif isinstance(event, MessageEdited.Event):
|
||
ids = [event.message.id]
|
||
event_verb = "edited"
|
||
|
||
if len(ids) > config.RATE_LIMIT_NUM_MESSAGES and log_deleted_sender_ids:
|
||
await client.send_message(config.LOG_CHAT_ID, f"{len(ids)} messages {event_verb}. Logged {config.RATE_LIMIT_NUM_MESSAGES}.")
|
||
|
||
logging.info(
|
||
f"Got 1 {event_verb} message. DB has {len(messages)}. Users: {', '.join(str(_id) for _id in log_deleted_sender_ids)}"
|
||
)
|
||
|
||
|
||
def get_file_name(media) -> str:
|
||
if media:
|
||
try:
|
||
file_name = [attr for attr in media.document.attributes if isinstance(
|
||
attr, DocumentAttributeFilename)][0].file_name
|
||
except Exception as e:
|
||
try:
|
||
mime_type = media.document.mime_type
|
||
except (NameError, AttributeError) as e:
|
||
mime_type = None
|
||
|
||
if mime_type == "audio/ogg":
|
||
file_name = "voicenote.ogg"
|
||
elif mime_type == "video/mp4":
|
||
file_name = "video.mp4"
|
||
elif isinstance(media, (MessageMediaPhoto, Photo)):
|
||
file_name = 'photo.jpg'
|
||
elif isinstance(media, (MessageMediaContact, Contact)):
|
||
file_name = 'contact.vcf'
|
||
else:
|
||
file_name = "file.unknown"
|
||
|
||
return file_name
|
||
else:
|
||
return None
|
||
|
||
|
||
async def save_restricted_msg(link: str):
|
||
if link.startswith("tg://"):
|
||
parts = re.findall(r"\d+", link)
|
||
if len(parts) == 2:
|
||
chat_id = int(parts[0])
|
||
msg_id = int(parts[1])
|
||
else:
|
||
await client.send_message(config.LOG_CHAT_ID, f"Could not parse link: {link}")
|
||
return
|
||
else:
|
||
parts = link.split('/')
|
||
msg_id = int(parts[-1])
|
||
chat_id = int(parts[-2]) if parts[-2].isdigit() else parts[-2]
|
||
|
||
msg = await client.get_messages(chat_id, ids=msg_id)
|
||
chat_id = msg.chat_id
|
||
from_id = get_sender_id(msg)
|
||
|
||
mention_sender = await create_mention(from_id)
|
||
mention_chat = await create_mention(chat_id, msg_id)
|
||
|
||
text = f"**↗️Saved message from: **{mention_sender}\n"
|
||
|
||
text += f"in {mention_chat}\n"
|
||
|
||
if msg.text:
|
||
text += "**Message:** \n" + msg.text
|
||
|
||
try:
|
||
if msg.media:
|
||
await save_media_as_file(msg)
|
||
with retrieve_media_as_file(msg_id, chat_id, msg.media, True) as f:
|
||
await client.send_message('me', text, file=f)
|
||
else:
|
||
await client.send_message('me', text)
|
||
except Exception as e:
|
||
await client.send_message(config.LOG_CHAT_ID, str(e))
|
||
|
||
|
||
async def save_media_as_file(msg: Message):
|
||
msg_id = msg.id
|
||
chat_id = msg.chat_id
|
||
|
||
if msg.media:
|
||
if msg.file and msg.file.size > config.MAX_IN_MEMORY_FILE_SIZE:
|
||
raise Exception(f"File too large to save ({msg.file.size} bytes)")
|
||
file_path = f"media/{msg_id}_{chat_id}"
|
||
|
||
with file_encrypt.encrypted(file_path) as f:
|
||
await client.download_media(msg.media, f)
|
||
|
||
|
||
@contextmanager
|
||
def retrieve_media_as_file(msg_id: int, chat_id: int, media, noforwards: bool):
|
||
file_name = get_file_name(media)
|
||
file_path = f"media/{msg_id}_{chat_id}"
|
||
|
||
if noforwards and\
|
||
not isinstance(media, MessageMediaGeo) and\
|
||
not isinstance(media, MessageMediaPoll):
|
||
with file_encrypt.decrypted(file_path) as f:
|
||
f.name = file_name
|
||
yield f
|
||
else:
|
||
yield media
|
||
|
||
|
||
async def delete_from_saved_gifs(gif: Document):
|
||
await client(SaveGifRequest(
|
||
id=InputDocument(
|
||
id=gif.id,
|
||
access_hash=gif.access_hash,
|
||
file_reference=gif.file_reference
|
||
),
|
||
unsave=True
|
||
))
|
||
|
||
|
||
async def delete_from_saved_stickers(sticker: Document):
|
||
await client(SaveRecentStickerRequest(
|
||
id=InputDocument(
|
||
id=sticker.id,
|
||
access_hash=sticker.access_hash,
|
||
file_reference=sticker.file_reference
|
||
),
|
||
unsave=True
|
||
))
|
||
|
||
|
||
async def delete_expired_messages():
|
||
while True:
|
||
now = datetime.now()
|
||
time_user = now - timedelta(days=config.PERSIST_TIME_IN_DAYS_USER)
|
||
time_channel = now - \
|
||
timedelta(days=config.PERSIST_TIME_IN_DAYS_CHANNEL)
|
||
time_group = now - timedelta(days=config.PERSIST_TIME_IN_DAYS_GROUP)
|
||
time_bot = now - timedelta(days=config.PERSIST_TIME_IN_DAYS_BOT)
|
||
time_unknown = now - timedelta(days=config.PERSIST_TIME_IN_DAYS_GROUP)
|
||
|
||
sqlite_cursor.execute(
|
||
"""DELETE FROM messages WHERE (type = ? and created_time < ?) OR
|
||
(type = ? and created_time < ?) OR
|
||
(type = ? and created_time < ?) OR
|
||
(type = ? and created_time < ?) OR
|
||
(type = ? and created_time < ?)""",
|
||
(TYPE_USER, time_user,
|
||
TYPE_CHANNEL, time_channel,
|
||
TYPE_GROUP, time_group,
|
||
TYPE_BOT, time_bot,
|
||
TYPE_UNKNOWN, time_unknown,
|
||
))
|
||
|
||
if sqlite_cursor.rowcount > 0:
|
||
logging.info(
|
||
f"Deleted {sqlite_cursor.rowcount} expired messages from DB"
|
||
)
|
||
|
||
# todo: save group/channel label in file name
|
||
|
||
num_files_deleted = 0
|
||
file_persist_days = max(
|
||
config.PERSIST_TIME_IN_DAYS_GROUP, config.PERSIST_TIME_IN_DAYS_CHANNEL)
|
||
for (dirpath, dirnames, filenames) in os.walk("media"):
|
||
for filename in filenames:
|
||
file_path = os.path.join(dirpath, filename)
|
||
modified_time = datetime.fromtimestamp(
|
||
os.path.getmtime(file_path))
|
||
expiry_time = now - timedelta(days=file_persist_days)
|
||
if modified_time < expiry_time:
|
||
os.unlink(file_path)
|
||
num_files_deleted += 1
|
||
|
||
if num_files_deleted > 0:
|
||
logging.info(f"Deleted {num_files_deleted} expired files")
|
||
|
||
await asyncio.sleep(300)
|
||
|
||
|
||
async def init():
|
||
global my_id
|
||
|
||
if config.DEBUG_MODE:
|
||
logging.basicConfig(level="INFO")
|
||
else:
|
||
logging.basicConfig(level="WARNING")
|
||
|
||
config.IGNORED_IDS.add(config.LOG_CHAT_ID)
|
||
|
||
my_id = (await client.get_me()).id
|
||
|
||
client.add_event_handler(new_message_handler, events.NewMessage(
|
||
incoming=True, outgoing=config.LISTEN_OUTGOING_MESSAGES))
|
||
client.add_event_handler(new_message_handler, events.MessageEdited())
|
||
client.add_event_handler(edited_deleted_handler, events.MessageEdited())
|
||
client.add_event_handler(edited_deleted_handler, events.MessageDeleted())
|
||
client.add_event_handler(edited_deleted_handler)
|
||
# client.add_event_handler(edited_deleted_handler,
|
||
# events.MessageRead(True))
|
||
# doesnt work for self destructs
|
||
|
||
await delete_expired_messages()
|
||
|
||
if __name__ == "__main__":
|
||
sqlite_cursor, sqlite_connection = init_db()
|
||
|
||
with client:
|
||
client.loop.run_until_complete(init())
|