from irc3.plugins.command import command from irc3.plugins.cron import cron import irc3 from irc3 import utils import re from collections import defaultdict, deque import time from asynchronous import AsyncEvents @irc3.plugin class AntiSpam: """IRC3 Anti-Spam Plugin with Auto-Ban for Repeat Offenders""" def __init__(self, bot): self.bot = bot self.config = bot.config.get('antispam', {}) self.user_data = defaultdict(lambda: { 'messages': deque(maxlen=int(self.config.get('repeat_limit', 3))), 'timestamps': deque(maxlen=int(self.config.get('spam_limit', 5))), 'mentions': deque(maxlen=int(self.config.get('mention_limit', 2))) }) self.kick_history = defaultdict(deque) # Track kick timestamps per user self.exclude_list = ['ZodBot'] self.service_name = self.config.get('service_name', 'ChanServ') self.who_channel = WhoChannel(bot) # Initialize WHO channel handler async def get_user_modes(self, nick, channel): """Dynamically fetch user modes using WHO command.""" try: result = await self.who_channel(channel=channel) if result['success']: for user in result['users']: if user['nick'].lower() == nick.lower(): return user['modes'] except Exception as e: self.bot.log.error(f"Error fetching user modes: {e}") return "" def is_spam(self, nick, message, channel): """Check if message meets spam criteria""" user = self.user_data[nick.lower()] now = time.time() if len(message) > int(self.config.get('max_length', 300)): return True if message in user['messages']: if len(user['messages']) == user['messages'].maxlen - 1: return True user['timestamps'].append(now) if len(user['timestamps']) == user['timestamps'].maxlen: if (now - user['timestamps'][0]) < 60: return True if self.bot.nick.lower() in message.lower(): user['mentions'].append(now) if len(user['mentions']) == user['mentions'].maxlen: if (now - user['mentions'][0]) < 60: return True user['messages'].append(message) return False @irc3.event(irc3.rfc.PRIVMSG) async def monitor_messages(self, mask, event, target, data): """Handle incoming messages and check for spam""" if target.startswith("#"): nick = mask.nick message = data channel_name = target.lower() if nick in self.exclude_list: return user_modes = await self.get_user_modes(nick, channel_name) if user_modes: if {'o', '%', 'h', '@'} & set(user_modes): return if self.is_spam(nick, message, channel_name): print(f"SPAM {nick} - {user_modes}") self.handle_spam(mask, message, channel_name) def handle_spam(self, mask, message, channel): """Take action against spam, escalating to ban if kicked twice in 5 minutes""" nick = mask.nick current_time = time.time() cutoff = current_time - 300 # 5 minutes ago nick_lower = nick.lower() user_kicks = self.kick_history[nick_lower] # Filter recent kicks within the last 5 minutes recent_kicks = [ts for ts in user_kicks if ts >= cutoff] if len(recent_kicks) >= 2: # Ban the user using hostmask ban_mask = f'*!{mask.host}' self.bot.send(f"MODE {channel} +b {ban_mask}") self.bot.privmsg( self.service_name, f"KICK {channel} {nick} Banned for repeated spamming" ) # Clear history and data del self.kick_history[nick_lower] self.user_data.pop(nick_lower, None) else: # Kick and record timestamp self.bot.privmsg( self.service_name, f"KICK {channel} {nick} :stop spamming" ) user_kicks.append(current_time) self.user_data.pop(nick_lower, None) @cron('* * * * *') def clean_old_records(self): """Cleanup inactive users every minute""" cutoff = time.time() - 300 to_remove = [ nick for nick, data in self.user_data.items() if len(data['timestamps']) > 0 and data['timestamps'][-1] < cutoff ] for nick in to_remove: del self.user_data[nick] def connection_made(self): """Initialize when bot connects""" self.bot.log.info("Enhanced AntiSpam plugin loaded with kick-to-ban escalation") class WhoChannel(AsyncEvents): """Handle WHO responses for a channel.""" send_line = 'WHO {channel}' events = ( { 'match': ( r"(?i)^:\S+ 352 \S+ {channel} (?P\S+) " r"(?P\S+) (?P\S+) (?P\S+) " r"(?P\S+) :(?P\S+) (?P.*)" ), 'multi': True }, { 'match': r"(?i)^:\S+ (?P(315|401)) \S+ {channel} :.*", 'final': True }, ) def process_results(self, results=None, **value): """Process WHO channel results into a user list.""" users = [] for res in results: if 'retcode' in res: value.update(res) else: res['mask'] = utils.IrcString('{nick}!{user}@{host}'.format(**res)) users.append(res) value['users'] = users value['success'] = value.get('retcode') == '315' return value