214 lines
7.4 KiB
Python
214 lines
7.4 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
IRC3 Anti-Spam Plugin with Auto-Kick and Auto-Ban
|
|
|
|
This plugin automatically detects and mitigates spam in IRC channels
|
|
by monitoring messages for:
|
|
- Excessive message repetition
|
|
- High-frequency messaging (flooding)
|
|
- Excessive bot mentions
|
|
|
|
Actions:
|
|
- **1st and 2nd offense**: User gets kicked.
|
|
- **3rd offense within 5 minutes**: User gets banned.
|
|
|
|
Features:
|
|
- Uses a dynamic WHO query to verify user modes before action.
|
|
- Configurable limits for spam detection (via `antispam` settings).
|
|
- Periodic cleanup of inactive users every minute.
|
|
|
|
Author: [Your Name]
|
|
"""
|
|
|
|
|
|
import time
|
|
|
|
import irc3
|
|
from collections import defaultdict, deque
|
|
from irc3.plugins.cron import cron
|
|
from plugins.asynchronious import WhoChannel
|
|
|
|
|
|
@irc3.plugin
|
|
class AntiSpam:
|
|
"""A plugin for automatic spam detection and mitigation in IRC channels."""
|
|
|
|
def __init__(self, bot: irc3.IrcBot):
|
|
"""
|
|
Initialize the AntiSpam plugin with configurable thresholds.
|
|
|
|
Args:
|
|
bot (irc3.IrcBot): The IRC bot instance.
|
|
"""
|
|
self.bot = bot
|
|
self.config = bot.config.get('antispam', {})
|
|
|
|
# User activity tracking
|
|
self.user_data = defaultdict(lambda: {
|
|
'messages': deque(maxlen=int(self.config.get('repeat_limit', 3))),
|
|
'timestamps': deque(maxlen=int(self.config.get('spam_limit', 5))),
|
|
'mentions': deque(maxlen=int(self.config.get('mention_limit', 2)))
|
|
})
|
|
self.kick_history = defaultdict(deque) # Track kick timestamps per user
|
|
|
|
self.exclude_list = ['ZodBot'] # Bots that should be ignored
|
|
self.service_name = self.config.get('service_name', 'ChanServ')
|
|
|
|
self.who_channel = WhoChannel(bot) # WHO query for user modes
|
|
|
|
async def get_user_modes(self, nick: str, channel: str) -> str:
|
|
"""
|
|
Retrieve user modes dynamically using the WHO command.
|
|
|
|
Args:
|
|
nick (str): The user's nickname.
|
|
channel (str): The IRC channel.
|
|
|
|
Returns:
|
|
str: User's mode (e.g., 'o' for op, 'v' for voice, etc.).
|
|
"""
|
|
try:
|
|
result = await self.who_channel(channel=channel)
|
|
if result.get('success'):
|
|
for user in result['users']:
|
|
if user['nick'].lower() == nick.lower():
|
|
return user['modes']
|
|
except Exception as e:
|
|
self.bot.log.error(f"Error fetching user modes: {e}")
|
|
return ""
|
|
|
|
def is_spam(self, nick: str, message: str, channel: str) -> bool:
|
|
"""
|
|
Determine whether a message meets spam criteria.
|
|
|
|
Args:
|
|
nick (str): The user's nickname.
|
|
message (str): The message content.
|
|
channel (str): The channel where the message was sent.
|
|
|
|
Returns:
|
|
bool: True if the message is considered spam, False otherwise.
|
|
"""
|
|
nick = nick.lower()
|
|
if nick not in self.user_data:
|
|
self.user_data[nick] = {
|
|
'messages': deque(maxlen=int(self.config.get('repeat_limit', 3))),
|
|
'timestamps': deque(maxlen=int(self.config.get('spam_limit', 5))),
|
|
'mentions': deque(maxlen=int(self.config.get('mention_limit', 2)))
|
|
}
|
|
user = self.user_data[nick]
|
|
|
|
now = time.time()
|
|
|
|
# Check message length
|
|
if len(message) > int(self.config.get('max_length', 300)):
|
|
return True
|
|
|
|
# Check repeated messages
|
|
if message in user['messages']:
|
|
if len(user['messages']) == user['messages'].maxlen - 1:
|
|
return True
|
|
|
|
# Check rapid message spam (flooding)
|
|
user['timestamps'].append(now)
|
|
if len(user['timestamps']) == user['timestamps'].maxlen:
|
|
if (now - user['timestamps'][0]) < 60:
|
|
return True
|
|
|
|
# Check excessive bot mentions
|
|
if self.bot.nick.lower() in message.lower():
|
|
user['mentions'].append(now)
|
|
if len(user['mentions']) == user['mentions'].maxlen:
|
|
if (now - user['mentions'][0]) < 60:
|
|
return True
|
|
|
|
# Store message to check for repetition
|
|
user['messages'].append(message)
|
|
return False
|
|
|
|
@irc3.event(irc3.rfc.PRIVMSG)
|
|
async def monitor_messages(self, mask, event, target, data):
|
|
"""
|
|
Monitor incoming messages for spam and take action if needed.
|
|
|
|
Args:
|
|
mask (str): The sender's mask.
|
|
event (str): The IRC event type.
|
|
target (str): The channel where the message was sent.
|
|
data (str): The message content.
|
|
"""
|
|
if target.startswith("#"): # Process only channel messages
|
|
nick = mask.nick
|
|
message = data
|
|
channel_name = target.lower()
|
|
|
|
if nick in self.exclude_list:
|
|
return
|
|
|
|
# Fetch user modes to avoid acting on moderators
|
|
user_modes = await self.get_user_modes(nick, channel_name)
|
|
if user_modes and {'o', '%', 'h', '@'} & set(user_modes):
|
|
return
|
|
|
|
if self.is_spam(nick, message, channel_name):
|
|
self.bot.log.info(f"SPAM detected from {nick}: {message}")
|
|
self.handle_spam(mask, message, channel_name)
|
|
|
|
def handle_spam(self, mask, message, channel):
|
|
"""
|
|
Take action against spamming users, escalating to ban if needed.
|
|
|
|
Args:
|
|
mask (str): The sender's mask.
|
|
message (str): The spam message.
|
|
channel (str): The IRC channel.
|
|
"""
|
|
nick = mask.nick
|
|
current_time = time.time()
|
|
cutoff = current_time - 300 # 5-minute window
|
|
|
|
user_kicks = self.kick_history[nick.lower()]
|
|
recent_kicks = [ts for ts in user_kicks if ts >= cutoff]
|
|
|
|
if len(recent_kicks) >= 2:
|
|
# Ban the user
|
|
ban_mask = f'*!{mask.host}'
|
|
self.bot.send(f"MODE {channel} +b {ban_mask}")
|
|
|
|
self.bot.privmsg(
|
|
self.service_name,
|
|
f"KICK {channel} {nick} :Banned for repeated spamming"
|
|
)
|
|
|
|
self.bot.log.info(f"{nick} banned for repeated spamming.")
|
|
|
|
# Clear data
|
|
del self.kick_history[nick.lower()]
|
|
self.user_data.pop(nick.lower(), None)
|
|
else:
|
|
# Kick the user and log action
|
|
self.bot.privmsg(
|
|
self.service_name,
|
|
f"KICK {channel} {nick} :Stop spamming."
|
|
)
|
|
user_kicks.append(current_time)
|
|
self.bot.log.info(f"{nick} kicked for spam. Warning count: {len(recent_kicks) + 1}")
|
|
|
|
# Clear message history for the user
|
|
self.user_data.pop(nick.lower(), None)
|
|
|
|
@cron('* * * * *')
|
|
def clean_old_records(self):
|
|
"""Clean up inactive user records every minute."""
|
|
cutoff = time.time() - 300
|
|
self.user_data = {
|
|
nick: data
|
|
for nick, data in self.user_data.items()
|
|
if len(data['timestamps']) > 0 and data['timestamps'][-1] >= cutoff
|
|
}
|
|
self.bot.log.info("Cleaned up old spam records.")
|
|
|
|
def connection_made(self):
|
|
"""Initialize when bot connects."""
|
|
self.bot.log.info("AntiSpam plugin loaded with automated kick-to-ban escalation.")
|