switch regex_filter plugin to use PCRE2 python library instead of perl

main
simp 2025-08-16 06:35:30 +00:00
parent 019916ca8e
commit b0acfe2a47
5 changed files with 48 additions and 219 deletions

View File

@ -1,8 +1,12 @@
# Regex filter and Flood
regex_filter is a plugin to take advantage of perls regex over python, and does flood control.
regex_filter is a plugin that passes all messages that pass through the relay through regex and flood protection. Using PCRE2 as a drop in replacement to utilize the C bindings instead of pythons standard re library.
It sits in front of the relay function and prevents messages that match from passing through. it simply drops the messages, and doesn't punish the user. AntiSpam can be used for blocking/banning.
The regex filter simply drops messages and doesn't punish the user. AntiSpam can be used for blocking/banning, though the current implimentation uses the standard re library and is likely less efficient.
This should give a good amount of control for relayed networks. Messages hit the more efficient regex+flood filter first, so spam and flood messages don't undergoe any further processing. Antispam sees what's leftover and is capable of OPER commands like blocking/banning/kick. Antispam can also uses globs that can block phrases constructed with look-alike unicode characters.
pattern_testing_tools.sh can be used to create a testing environment for regex patterns.
regex_filter:
@ -10,7 +14,6 @@ regex_filter:
# File paths (relative to PyLink directory)
blacklist_file: ./data/regex_blacklist.txt
perl_script: ./data/regex_filter.pl
log_file: ./logs/regex_filter.log
# Flood protection settings
@ -27,10 +30,12 @@ regex_filter:
Debug logging (set to true for troubleshooting)
debug: false
upon running it will create a /data directory with regex_filter.pl and regex_blacklist.txt. your regex_blacklist.txt contains a regex pattern one per line:
upon running it will create a /data directory with and regex_blacklist.txt. your regex_blacklist.txt contains a regex pattern one per line:
\b(?:spam|phishing|malware)\b
\bhttps?://(?:bit\.ly|tinyurl\.com|t\.co)/\w+\b
changes to the regex_blacklist.txt will cause the plugin to reload and recompile the patterns.
# AntiSpam

View File

@ -1057,7 +1057,6 @@ stats:
# File paths (relative to PyLink directory)
#blacklist_file: "./data/regex_blacklist.txt"
#perl_script: "./data/regex_filter.pl"
#log_file: "./logs/regex_filter.log"
# Flood protection settings

View File

@ -1,75 +0,0 @@
#!/usr/bin/perl
# Regex Filter Script
# Author: Anon
# License: BSD 2-Clause
use strict;
use warnings;
use utf8;
use FindBin qw($RealBin);
use File::Spec;
# Configuration
my $blacklist_file = File::Spec->catfile($RealBin, "./data/regex_blacklist.txt");
# Read blacklist patterns
sub load_patterns {
my @patterns;
return @patterns unless -f $blacklist_file;
open my $fh, '<:utf8', $blacklist_file or do {
warn "Cannot open blacklist file: $!\n";
return @patterns;
};
while (my $line = <$fh>) {
chomp $line;
$line =~ s/^\s+|\s+$//g; # trim whitespace
# Skip empty lines and comments
next if $line eq '' || $line =~ /^#/;
# Validate regex pattern
eval { qr/$line/i };
if ($@) {
warn "Invalid regex pattern: $line - $@\n";
next;
}
push @patterns, $line;
}
close $fh;
return @patterns;
}
# Main filtering logic
sub filter_content {
my ($content) = @_;
my @patterns = load_patterns();
return 0 unless @patterns; # No patterns = allow through
# Test each pattern
for my $pattern (@patterns) {
if ($content =~ /$pattern/i) {
# Found match - block content
print STDERR "MATCH: Pattern matched\n";
return 1;
}
}
return 0; # No matches = allow through
}
# Read content from stdin
my $content = do { local $/; <STDIN> };
chomp $content if defined $content;
# Exit with appropriate code
if (defined $content && $content ne '') {
exit filter_content($content);
} else {
exit 0; # Empty content = allow
}

View File

@ -6,11 +6,10 @@ License: BSD 2-Clause
"""
import os
import sys
import time
import hashlib
import subprocess
import threading
import pcre2
from collections import defaultdict, deque
from pathlib import Path
@ -28,13 +27,15 @@ file_watchers = {}
last_reload = 0
filter_lock = threading.RLock()
class Cr():
pattern = []
def _get_config():
"""Get plugin configuration with defaults"""
config = conf.conf.get('regex_filter', {})
defaults = {
'blacklist_file': './data/regex_blacklist.txt',
'perl_script': './data/regex_filter.pl',
'log_file': './logs/regex_filter.log',
'flood_window': 30, # seconds
'flood_threshold': 3, # messages
@ -48,7 +49,6 @@ def _get_config():
# Merge with defaults
for key, value in defaults.items():
config.setdefault(key, value)
return config
def _ensure_directories():
@ -57,7 +57,7 @@ def _ensure_directories():
try:
# Create directories
for path_key in ['blacklist_file', 'perl_script', 'log_file']:
for path_key in ['blacklist_file', 'log_file']:
file_path = Path(config[path_key])
file_path.parent.mkdir(parents=True, exist_ok=True)
@ -72,105 +72,31 @@ def _ensure_directories():
f.write("# \\b(?:spam|phishing|malware)\\b\n")
f.write("\n")
log.info(f"Created default blacklist file: {blacklist_path}")
# Create default Perl script if missing
perl_path = Path(config['perl_script'])
if not perl_path.exists():
create_perl_script(perl_path)
except Exception as e:
log.error(f"Failed to create directories: {e}")
def create_perl_script(script_path):
"""Create the Perl filtering script"""
def _compile_regex_filters():
"""Read blacklist_file and compile the patterns"""
config = _get_config()
perl_content = f'''#!/usr/bin/perl
# Regex Filter Script
# Author: Anon
# License: BSD 2-Clause
use strict;
use warnings;
use utf8;
use FindBin qw($RealBin);
use File::Spec;
# Configuration
my $blacklist_file = File::Spec->catfile($RealBin, "{config['blacklist_file']}");
# Read blacklist patterns
sub load_patterns {{
my @patterns;
return @patterns unless -f $blacklist_file;
open my $fh, '<:utf8', $blacklist_file or do {{
warn "Cannot open blacklist file: $!\\n";
return @patterns;
}};
while (my $line = <$fh>) {{
chomp $line;
$line =~ s/^\\s+|\\s+$//g; # trim whitespace
# Skip empty lines and comments
next if $line eq '' || $line =~ /^#/;
# Validate regex pattern
eval {{ qr/$line/i }};
if ($@) {{
warn "Invalid regex pattern: $line - $@\\n";
next;
}}
push @patterns, $line;
}}
close $fh;
return @patterns;
}}
# Main filtering logic
sub filter_content {{
my ($content) = @_;
my @patterns = load_patterns();
return 0 unless @patterns; # No patterns = allow through
# Test each pattern
for my $pattern (@patterns) {{
if ($content =~ /$pattern/i) {{
# Found match - block content
print STDERR "MATCH: Pattern matched\\n";
return 1;
}}
}}
return 0; # No matches = allow through
}}
# Read content from stdin
my $content = do {{ local $/; <STDIN> }};
chomp $content if defined $content;
# Exit with appropriate code
if (defined $content && $content ne '') {{
exit filter_content($content);
}} else {{
exit 0; # Empty content = allow
}}
'''
try:
with open(script_path, 'w') as f:
f.write(perl_content)
# Make executable
os.chmod(script_path, 0o755)
log.info(f"Created Perl filter script: {script_path}")
except Exception as e:
log.error(f"Failed to create Perl script: {e}")
blacklist_file = config['blacklist_file']
if os.path.isfile(blacklist_file):
lines_list = []
with open(blacklist_file, 'r') as file:
for _, line in enumerate(file, start=1):
rm_line_break = line.replace('\n', '')
if rm_line_break[:1] != '#' and len(rm_line_break) != 0:
lines_list.append(rm_line_break)
if len(Cr.pattern) > 0:
Cr.pattern.clear()
for r in lines_list:
try:
Cr.pattern.append(pcre2.compile(r, jit=True))
except Exception as e:
log.error(f"Failed compiling pattern: {e}")
else:
log.error(f"{config['blacklist_file']} not found.")
def _log_filter_event(event_type, source, target, reason, network_name="unknown"):
"""Log filtering events"""
@ -201,6 +127,7 @@ def _check_file_modified(file_path):
mtime = os.path.getmtime(file_path)
if mtime > last_reload:
last_reload = time.time()
_compile_regex_filters()
return True
except OSError:
pass
@ -254,45 +181,17 @@ def _check_flood_protection(source, content):
return True, "Flood/spam pattern detected"
return False, ""
def _call_perl_filter(content):
"""Call Perl script to filter content"""
config = _get_config()
perl_script = config['perl_script']
try:
# Check if script exists
if not os.path.exists(perl_script):
log.error(f"Perl script not found: {perl_script}")
return False, "Script missing"
# Call Perl script
process = subprocess.Popen(
# ['perl', perl_script],
[f'./{perl_script}'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
stdout, stderr = process.communicate(input=content, timeout=5)
exit_code = process.returncode
if exit_code == 1:
return True, "Content blocked by regex filter"
elif exit_code != 0:
log.error(f"Perl script error: {stderr}")
return False, f"Script error: exit {exit_code}"
return False, ""
except subprocess.TimeoutExpired:
process.kill()
return True, "Filter timeout (blocked for safety)"
except Exception as e:
log.error(f"Perl filter error: {e}")
return False, f"Filter error: {e}"
def _run_regex(content):
"""Call Perl script to filter content"""
for p in Cr.pattern:
try:
matched = p.match(content)
if matched:
return matched, "Content blocked by regex filter"
except Exception as e:
log.error(f"Filter error: {e}")
return False, ''
def _should_filter_content(source, target, content, network_name="unknown"):
"""Main filtering logic"""
@ -315,7 +214,7 @@ def _should_filter_content(source, target, content, network_name="unknown"):
return True, flood_reason
# Check content with Perl filter
is_blocked, block_reason = _call_perl_filter(content)
is_blocked, block_reason = _run_regex(content)
if is_blocked:
return True, block_reason
@ -480,7 +379,7 @@ def main(irc=None):
"""Plugin initialization"""
try:
_ensure_directories()
_compile_regex_filters()
# Register hooks for all message types
utils.add_hook(handle_privmsg, 'PRIVMSG', priority=1050)
utils.add_hook(handle_notice, 'NOTICE', priority=1050)

View File

@ -2,3 +2,4 @@ cachetools
passlib
pyyaml
setuptools
pcre2