""" Regex Filter Plugin for PyLink Comprehensive message filtering with PCRE2 and flood protection """ import os import time import hashlib import threading import pcre2 from collections import defaultdict, deque from pathlib import Path from pylinkirc import utils, conf, world from pylinkirc.classes import * from pylinkirc.log import log # Plugin metadata __version__ = "1.0.0" # Global variables for thread-safe operation flood_tracker = defaultdict(lambda: deque(maxlen=50)) similarity_cache = defaultdict(lambda: deque(maxlen=20)) file_watchers = {} last_reload = 0 filter_lock = threading.RLock() class Cr(): pattern = [] def _get_config(): """Get plugin configuration with defaults""" config = conf.conf.get('regex_filter', {}) defaults = { 'blacklist_file': './data/regex_blacklist.txt', 'log_file': './logs/regex_filter.log', 'flood_window': 30, # seconds 'flood_threshold': 3, # messages 'similarity_threshold': 0.8, # 80% similar 'enabled': True, 'log_blocked': True, 'auto_reload': True, 'debug': False } # Merge with defaults for key, value in defaults.items(): config.setdefault(key, value) return config def _ensure_directories(): """Create necessary directories and files""" config = _get_config() try: # Create directories for path_key in ['blacklist_file', 'log_file']: file_path = Path(config[path_key]) file_path.parent.mkdir(parents=True, exist_ok=True) # Create default blacklist file if missing blacklist_path = Path(config['blacklist_file']) if not blacklist_path.exists(): with open(blacklist_path, 'w') as f: f.write("# Regex Blacklist Patterns - One regex per line\n") f.write("# Test all patterns thoroughly before deploying\n") f.write("# Examples (commented out by default):\n") f.write("# \\bhttps?://(?:bit\\.ly|tinyurl\\.com|t\\.co)/\\w+\\b\n") f.write("# \\b(?:spam|phishing|malware)\\b\n") f.write("\n") log.info(f"Created default blacklist file: {blacklist_path}") except Exception as e: log.error(f"Failed to create directories: {e}") def _compile_regex_filters(): """Read blacklist_file and compile the patterns""" config = _get_config() blacklist_file = config['blacklist_file'] if os.path.isfile(blacklist_file): lines_list = [] with open(blacklist_file, 'r') as file: for _, line in enumerate(file, start=1): rm_line_break = line.replace('\n', '') if rm_line_break[:1] != '#' and len(rm_line_break) != 0: lines_list.append(rm_line_break) if len(Cr.pattern) > 0: Cr.pattern.clear() for r in lines_list: try: Cr.pattern.append(pcre2.compile(r, jit=True)) except Exception as e: log.error(f"Failed compiling pattern: {e}") else: log.error(f"{config['blacklist_file']} not found.") def _log_filter_event(event_type, source, target, reason, network_name="unknown"): """Log filtering events""" config = _get_config() if not config.get('log_blocked', True): return try: log_path = Path(config['log_file']) log_path.parent.mkdir(parents=True, exist_ok=True) timestamp = time.strftime('%Y-%m-%d %H:%M:%S') # Don't log actual content for security reasons log_entry = f"[{timestamp}] {network_name} {event_type}: {source} -> {target} | {reason}\\n" with open(log_path, 'a') as f: f.write(log_entry) except Exception as e: log.error(f"Failed to write filter log: {e}") def _check_file_modified(file_path): """Check if file has been modified since last check""" global last_reload try: mtime = os.path.getmtime(file_path) if mtime > last_reload: last_reload = time.time() _compile_regex_filters() return True except OSError: pass return False def _get_message_hash(content): """Get hash of message content for similarity detection""" # Normalize content for similarity checking normalized = ''.join(content.lower().split()) return hashlib.md5(normalized.encode('utf-8')).hexdigest() def _calculate_similarity(hash1, hash2): """Calculate similarity between two message hashes""" if len(hash1) != len(hash2): return 0.0 matches = sum(c1 == c2 for c1, c2 in zip(hash1, hash2)) return matches / len(hash1) def _check_flood_protection(source, content): """Check for flood/spam patterns""" config = _get_config() current_time = time.time() window = config.get('flood_window', 30) threshold = config.get('flood_threshold', 3) similarity_threshold = config.get('similarity_threshold', 0.8) with filter_lock: # Clean old entries cutoff_time = current_time - window flood_tracker[source] = deque( [entry for entry in flood_tracker[source] if entry[0] > cutoff_time], maxlen=50 ) # Add current message msg_hash = _get_message_hash(content) flood_tracker[source].append((current_time, msg_hash)) # Check for flood recent_messages = flood_tracker[source] if len(recent_messages) >= threshold: # Check for similar messages similar_count = 0 for _, old_hash in list(recent_messages)[-threshold:]: if _calculate_similarity(msg_hash, old_hash) >= similarity_threshold: similar_count += 1 if similar_count >= threshold: return True, "Flood/spam pattern detected" return False, "" def _run_regex(content): """Filter content using jit compiled pattern""" for p in Cr.pattern: try: matched = p.match(content) if matched: return matched, "Content blocked by regex filter" except Exception as e: log.error(f"Filter error: {e}") return False, '' def _should_filter_content(source, target, content, network_name="unknown"): """Main filtering logic""" config = _get_config() if not config.get('enabled', True): return False, "" if not content or content.strip() == "": return False, "" # Auto-reload check if config.get('auto_reload', True): if _check_file_modified(config['blacklist_file']): log.info("Blacklist file updated, patterns reloaded") # Check flood protection first is_flood, flood_reason = _check_flood_protection(source, content) if is_flood: return True, flood_reason # Check content with filter is_blocked, block_reason = _run_regex(content) if is_blocked: return True, block_reason return False, "" def handle_privmsg(irc, source, command, args): """Handle PRIVMSG events""" if len(args) < 2: return target = args['target'] content = args['text'] source_id = source # Check if content should be filtered should_block, reason = _should_filter_content(source_id, target, content, irc.name) if should_block: # Log the filtering event _log_filter_event("PRIVMSG", source_id, target, reason, irc.name) # Log to PyLink console log.warning(f"Regex Filter ({irc.name}): Blocked PRIVMSG from {source_id} to {target} - {reason}") # Block the message by modifying args to empty content # This prevents relay while maintaining protocol compliance args['text'] = "" return def handle_notice(irc, source, command, args): """Handle NOTICE events""" if len(args) < 2: return target = args['target'] content = args['text'] source_id = source should_block, reason = _should_filter_content(source_id, target, content, irc.name) if should_block: _log_filter_event("NOTICE", source_id, target, reason, irc.name) log.warning(f"Regex Filter ({irc.name}): Blocked NOTICE from {source_id} to {target} - {reason}") args['text'] = "" return def handle_part(irc, source, command, args): """Handle PART events""" if len(args) < 2: return channel_list = args['channels'] reason = args['text'] source_id = source for channel in channel_list: should_block, block_reason = _should_filter_content(source_id, channel, reason, irc.name) if should_block: _log_filter_event("PART", source_id, channel, block_reason, irc.name) log.warning(f"Regex Filter ({irc.name}): Blocked PART from {source_id} in {channel} - {block_reason}") args['text'] = "" return def handle_quit(irc, source, command, args): """Handle QUIT events""" if len(args) < 1: return reason = args['text'] source_id = source should_block, block_reason = _should_filter_content(source_id, "QUIT", reason, irc.name) if should_block: _log_filter_event("QUIT", source_id, "QUIT", block_reason, irc.name) log.warning(f"Regex Filter ({irc.name}): Blocked QUIT from {source_id} - {block_reason}") args['text'] = "" return def handle_topic(irc, source, command, args): """Handle TOPIC events""" if len(args) < 2: return channel = args['channel'] topic = args['text'] source_id = source should_block, reason = _should_filter_content(source_id, channel, topic, irc.name) if should_block: _log_filter_event("TOPIC", source_id, channel, reason, irc.name) log.warning(f"Regex Filter ({irc.name}): Blocked TOPIC from {source_id} in {channel} - {reason}") args['topic'] = "" return def handle_kick(irc, source, command, args): """Handle KICK events""" if len(args) < 3: return channel = args['channel'] kicked_user = args['target'] reason = args['text'] source_id = source target = f"{channel}:{kicked_user}" should_block, block_reason = _should_filter_content(source_id, target, reason, irc.name) if should_block: _log_filter_event("KICK", source_id, target, block_reason, irc.name) log.warning(f"Regex Filter ({irc.name}): Blocked KICK from {source_id} in {channel} - {block_reason}") args['text'] = "" return def handle_nick(irc, source, command, args): """Handle NICK events""" if len(args) < 1: return new_nick = args['newnick'] source_id = source should_block, reason = _should_filter_content(source_id, "NICK", new_nick, irc.name) if should_block: _log_filter_event("NICK", source_id, "NICK", reason, irc.name) log.warning(f"Regex Filter ({irc.name}): Blocked NICK change from {source_id} - {reason}") args['newnick'] = f"Filtered{int(time.time())}" # Replace with safe nick return def handle_join(irc, source, command, args): """Handle JOIN events""" if len(args) < 1: return channel = args['channel'] source_id = source should_block, reason = _should_filter_content(source_id, "JOIN", channel, irc.name) if should_block: _log_filter_event("JOIN", source_id, channel, reason, irc.name) log.warning(f"Regex Filter ({irc.name}): Blocked JOIN from {source_id} to {channel} - {reason}") # For JOIN, we can't easily modify the channel name without breaking protocol # Instead, log it for monitoring return def handle_away(irc, source, command, args): """Handle AWAY events""" if len(args) < 1: return reason = args['text'] source_id = source should_block, block_reason = _should_filter_content(source_id, "AWAY", reason, irc.name) if should_block: _log_filter_event("AWAY", source_id, "AWAY", block_reason, irc.name) log.warning(f"Regex Filter ({irc.name}): Blocked AWAY from {source_id} - {block_reason}") args['text'] = "" return def main(irc=None): """Plugin initialization""" try: _ensure_directories() _compile_regex_filters() # Register hooks for all message types utils.add_hook(handle_privmsg, 'PRIVMSG', priority=1050) utils.add_hook(handle_notice, 'NOTICE', priority=1050) utils.add_hook(handle_part, 'PART', priority=1050) utils.add_hook(handle_quit, 'QUIT', priority=1050) utils.add_hook(handle_topic, 'TOPIC', priority=1050) utils.add_hook(handle_kick, 'KICK', priority=1050) utils.add_hook(handle_nick, 'NICK', priority=1050) utils.add_hook(handle_join, 'JOIN', priority=1050) utils.add_hook(handle_away, 'AWAY', priority=1050) log.info("Regex Filter plugin loaded successfully") except Exception as e: log.error(f"Failed to initialize Regex Filter plugin: {e}") def die(irc=None): """Plugin cleanup""" try: # Clear flood tracking data global flood_tracker, similarity_cache flood_tracker.clear() similarity_cache.clear() log.info("Regex Filter plugin unloaded") except Exception as e: log.error(f"Error during Regex Filter plugin cleanup: {e}")