g1mp/plugins/services/anti_spam.py

206 lines
7.0 KiB
Python
Raw Normal View History

2025-02-13 06:35:15 +00:00
# -*- coding: utf-8 -*-
"""
IRC3 Anti-Spam Plugin with Auto-Kick and Auto-Ban
This plugin automatically detects and mitigates spam in IRC channels
by monitoring messages for:
- Excessive message repetition
- High-frequency messaging (flooding)
- Excessive bot mentions
Actions:
- **1st and 2nd offense**: User gets kicked.
- **3rd offense within 5 minutes**: User gets banned.
Features:
- Uses a dynamic WHO query to verify user modes before action.
- Configurable limits for spam detection (via `antispam` settings).
- Periodic cleanup of inactive users every minute.
Author: [Your Name]
"""
import time
2025-02-13 04:55:42 +00:00
import irc3
from collections import defaultdict, deque
2025-02-13 06:35:15 +00:00
from irc3.plugins.cron import cron
2025-02-13 06:43:56 +00:00
from plugins.asynchronous import WhoChannel
2025-02-13 06:35:15 +00:00
2025-02-13 04:55:42 +00:00
@irc3.plugin
class AntiSpam:
2025-02-13 06:35:15 +00:00
"""A plugin for automatic spam detection and mitigation in IRC channels."""
def __init__(self, bot: irc3.IrcBot):
"""
Initialize the AntiSpam plugin with configurable thresholds.
2025-02-13 04:55:42 +00:00
2025-02-13 06:35:15 +00:00
Args:
bot (irc3.IrcBot): The IRC bot instance.
"""
2025-02-13 04:55:42 +00:00
self.bot = bot
self.config = bot.config.get('antispam', {})
2025-02-13 06:35:15 +00:00
# User activity tracking
2025-02-13 04:55:42 +00:00
self.user_data = defaultdict(lambda: {
'messages': deque(maxlen=int(self.config.get('repeat_limit', 3))),
'timestamps': deque(maxlen=int(self.config.get('spam_limit', 5))),
'mentions': deque(maxlen=int(self.config.get('mention_limit', 2)))
})
self.kick_history = defaultdict(deque) # Track kick timestamps per user
2025-02-13 06:35:15 +00:00
self.exclude_list = ['ZodBot'] # Bots that should be ignored
2025-02-13 04:55:42 +00:00
self.service_name = self.config.get('service_name', 'ChanServ')
2025-02-13 06:35:15 +00:00
self.who_channel = WhoChannel(bot) # WHO query for user modes
async def get_user_modes(self, nick: str, channel: str) -> str:
"""
Retrieve user modes dynamically using the WHO command.
Args:
nick (str): The user's nickname.
channel (str): The IRC channel.
Returns:
str: User's mode (e.g., 'o' for op, 'v' for voice, etc.).
"""
2025-02-13 04:55:42 +00:00
try:
result = await self.who_channel(channel=channel)
2025-02-13 06:35:15 +00:00
if result.get('success'):
2025-02-13 04:55:42 +00:00
for user in result['users']:
if user['nick'].lower() == nick.lower():
return user['modes']
except Exception as e:
self.bot.log.error(f"Error fetching user modes: {e}")
return ""
2025-02-13 06:35:15 +00:00
def is_spam(self, nick: str, message: str, channel: str) -> bool:
"""
Determine whether a message meets spam criteria.
Args:
nick (str): The user's nickname.
message (str): The message content.
channel (str): The channel where the message was sent.
Returns:
bool: True if the message is considered spam, False otherwise.
"""
2025-02-13 04:55:42 +00:00
user = self.user_data[nick.lower()]
now = time.time()
2025-02-13 06:35:15 +00:00
# Check message length
2025-02-13 04:55:42 +00:00
if len(message) > int(self.config.get('max_length', 300)):
return True
2025-02-13 06:35:15 +00:00
# Check repeated messages
2025-02-13 04:55:42 +00:00
if message in user['messages']:
if len(user['messages']) == user['messages'].maxlen - 1:
return True
2025-02-13 06:35:15 +00:00
# Check rapid message spam (flooding)
2025-02-13 04:55:42 +00:00
user['timestamps'].append(now)
if len(user['timestamps']) == user['timestamps'].maxlen:
if (now - user['timestamps'][0]) < 60:
return True
2025-02-13 06:35:15 +00:00
# Check excessive bot mentions
2025-02-13 04:55:42 +00:00
if self.bot.nick.lower() in message.lower():
user['mentions'].append(now)
if len(user['mentions']) == user['mentions'].maxlen:
if (now - user['mentions'][0]) < 60:
return True
2025-02-13 06:35:15 +00:00
# Store message to check for repetition
2025-02-13 04:55:42 +00:00
user['messages'].append(message)
return False
@irc3.event(irc3.rfc.PRIVMSG)
async def monitor_messages(self, mask, event, target, data):
2025-02-13 06:35:15 +00:00
"""
Monitor incoming messages for spam and take action if needed.
Args:
mask (str): The sender's mask.
event (str): The IRC event type.
target (str): The channel where the message was sent.
data (str): The message content.
"""
if target.startswith("#"): # Process only channel messages
2025-02-13 04:55:42 +00:00
nick = mask.nick
message = data
channel_name = target.lower()
if nick in self.exclude_list:
return
2025-02-13 06:35:15 +00:00
# Fetch user modes to avoid acting on moderators
2025-02-13 04:55:42 +00:00
user_modes = await self.get_user_modes(nick, channel_name)
2025-02-13 06:35:15 +00:00
if user_modes and {'o', '%', 'h', '@'} & set(user_modes):
return
2025-02-13 04:55:42 +00:00
2025-02-13 06:35:15 +00:00
if self.is_spam(nick, message, channel_name):
self.bot.log.info(f"SPAM detected from {nick}: {message}")
self.handle_spam(mask, message, channel_name)
2025-02-13 04:55:42 +00:00
def handle_spam(self, mask, message, channel):
2025-02-13 06:35:15 +00:00
"""
Take action against spamming users, escalating to ban if needed.
Args:
mask (str): The sender's mask.
message (str): The spam message.
channel (str): The IRC channel.
"""
2025-02-13 04:55:42 +00:00
nick = mask.nick
current_time = time.time()
2025-02-13 06:35:15 +00:00
cutoff = current_time - 300 # 5-minute window
2025-02-13 04:55:42 +00:00
2025-02-13 06:35:15 +00:00
user_kicks = self.kick_history[nick.lower()]
2025-02-13 04:55:42 +00:00
recent_kicks = [ts for ts in user_kicks if ts >= cutoff]
if len(recent_kicks) >= 2:
2025-02-13 06:35:15 +00:00
# Ban the user
2025-02-13 04:55:42 +00:00
ban_mask = f'*!{mask.host}'
self.bot.send(f"MODE {channel} +b {ban_mask}")
self.bot.privmsg(
self.service_name,
2025-02-13 06:35:15 +00:00
f"KICK {channel} {nick} :Banned for repeated spamming"
2025-02-13 04:55:42 +00:00
)
2025-02-13 06:35:15 +00:00
self.bot.log.info(f"{nick} banned for repeated spamming.")
# Clear data
del self.kick_history[nick.lower()]
self.user_data.pop(nick.lower(), None)
2025-02-13 04:55:42 +00:00
else:
2025-02-13 06:35:15 +00:00
# Kick the user and log action
2025-02-13 04:55:42 +00:00
self.bot.privmsg(
self.service_name,
2025-02-13 06:35:15 +00:00
f"KICK {channel} {nick} :Stop spamming."
2025-02-13 04:55:42 +00:00
)
user_kicks.append(current_time)
2025-02-13 06:35:15 +00:00
self.bot.log.info(f"{nick} kicked for spam. Warning count: {len(recent_kicks) + 1}")
# Clear message history for the user
self.user_data.pop(nick.lower(), None)
2025-02-13 04:55:42 +00:00
@cron('* * * * *')
def clean_old_records(self):
2025-02-13 06:35:15 +00:00
"""Clean up inactive user records every minute."""
2025-02-13 04:55:42 +00:00
cutoff = time.time() - 300
2025-02-13 06:35:15 +00:00
self.user_data = {
nick: data
for nick, data in self.user_data.items()
if len(data['timestamps']) > 0 and data['timestamps'][-1] >= cutoff
}
self.bot.log.info("Cleaned up old spam records.")
2025-02-13 04:55:42 +00:00
def connection_made(self):
2025-02-13 06:35:15 +00:00
"""Initialize when bot connects."""
self.bot.log.info("AntiSpam plugin loaded with automated kick-to-ban escalation.")