# -*- coding: utf-8 -*- """ IRC3 Anti-Spam Plugin with Auto-Kick and Auto-Ban This plugin automatically detects and mitigates spam in IRC channels by monitoring messages for: - Excessive message repetition - High-frequency messaging (flooding) - Excessive bot mentions Actions: - **1st and 2nd offense**: User gets kicked. - **3rd offense within 5 minutes**: User gets banned. Features: - Uses a dynamic WHO query to verify user modes before action. - Configurable limits for spam detection (via `antispam` settings). - Periodic cleanup of inactive users every minute. Author: [Your Name] """ import time import irc3 from collections import defaultdict, deque from irc3.plugins.cron import cron from plugins.asynchronious import WhoChannel @irc3.plugin class AntiSpam: """A plugin for automatic spam detection and mitigation in IRC channels.""" def __init__(self, bot: irc3.IrcBot): """ Initialize the AntiSpam plugin with configurable thresholds. Args: bot (irc3.IrcBot): The IRC bot instance. """ self.bot = bot self.config = bot.config.get('antispam', {}) # User activity tracking self.user_data = defaultdict(lambda: { 'messages': deque(maxlen=int(self.config.get('repeat_limit', 3))), 'timestamps': deque(maxlen=int(self.config.get('spam_limit', 5))), 'mentions': deque(maxlen=int(self.config.get('mention_limit', 2))) }) self.kick_history = defaultdict(deque) # Track kick timestamps per user self.exclude_list = ['ZodBot'] # Bots that should be ignored self.service_name = self.config.get('service_name', 'ChanServ') self.who_channel = WhoChannel(bot) # WHO query for user modes async def get_user_modes(self, nick: str, channel: str) -> str: """ Retrieve user modes dynamically using the WHO command. Args: nick (str): The user's nickname. channel (str): The IRC channel. Returns: str: User's mode (e.g., 'o' for op, 'v' for voice, etc.). """ try: result = await self.who_channel(channel=channel) if result.get('success'): for user in result['users']: if user['nick'].lower() == nick.lower(): return user['modes'] except Exception as e: self.bot.log.error(f"Error fetching user modes: {e}") return "" def is_spam(self, nick: str, message: str, channel: str) -> bool: """ Determine whether a message meets spam criteria. Args: nick (str): The user's nickname. message (str): The message content. channel (str): The channel where the message was sent. Returns: bool: True if the message is considered spam, False otherwise. """ user = self.user_data[nick.lower()] now = time.time() # Check message length if len(message) > int(self.config.get('max_length', 300)): return True # Check repeated messages if message in user['messages']: if len(user['messages']) == user['messages'].maxlen - 1: return True # Check rapid message spam (flooding) user['timestamps'].append(now) if len(user['timestamps']) == user['timestamps'].maxlen: if (now - user['timestamps'][0]) < 60: return True # Check excessive bot mentions if self.bot.nick.lower() in message.lower(): user['mentions'].append(now) if len(user['mentions']) == user['mentions'].maxlen: if (now - user['mentions'][0]) < 60: return True # Store message to check for repetition user['messages'].append(message) return False @irc3.event(irc3.rfc.PRIVMSG) async def monitor_messages(self, mask, event, target, data): """ Monitor incoming messages for spam and take action if needed. Args: mask (str): The sender's mask. event (str): The IRC event type. target (str): The channel where the message was sent. data (str): The message content. """ if target.startswith("#"): # Process only channel messages nick = mask.nick message = data channel_name = target.lower() if nick in self.exclude_list: return # Fetch user modes to avoid acting on moderators user_modes = await self.get_user_modes(nick, channel_name) if user_modes and {'o', '%', 'h', '@'} & set(user_modes): return if self.is_spam(nick, message, channel_name): self.bot.log.info(f"SPAM detected from {nick}: {message}") self.handle_spam(mask, message, channel_name) def handle_spam(self, mask, message, channel): """ Take action against spamming users, escalating to ban if needed. Args: mask (str): The sender's mask. message (str): The spam message. channel (str): The IRC channel. """ nick = mask.nick current_time = time.time() cutoff = current_time - 300 # 5-minute window user_kicks = self.kick_history[nick.lower()] recent_kicks = [ts for ts in user_kicks if ts >= cutoff] if len(recent_kicks) >= 2: # Ban the user ban_mask = f'*!{mask.host}' self.bot.send(f"MODE {channel} +b {ban_mask}") self.bot.privmsg( self.service_name, f"KICK {channel} {nick} :Banned for repeated spamming" ) self.bot.log.info(f"{nick} banned for repeated spamming.") # Clear data del self.kick_history[nick.lower()] self.user_data.pop(nick.lower(), None) else: # Kick the user and log action self.bot.privmsg( self.service_name, f"KICK {channel} {nick} :Stop spamming." ) user_kicks.append(current_time) self.bot.log.info(f"{nick} kicked for spam. Warning count: {len(recent_kicks) + 1}") # Clear message history for the user self.user_data.pop(nick.lower(), None) @cron('* * * * *') def clean_old_records(self): """Clean up inactive user records every minute.""" cutoff = time.time() - 300 self.user_data = { nick: data for nick, data in self.user_data.items() if len(data['timestamps']) > 0 and data['timestamps'][-1] >= cutoff } self.bot.log.info("Cleaned up old spam records.") def connection_made(self): """Initialize when bot connects.""" self.bot.log.info("AntiSpam plugin loaded with automated kick-to-ban escalation.")