2025-02-14 16:17:53 -08:00

310 lines
11 KiB

import irc3
from irc3.plugins.command import command
from tinydb import Query, TinyDB
import fnmatch
from ircstyle import style
from datetime import datetime
def check_ignore(func):
"""Decorator to block processing for ignored users in event handlers.
func (callable): The function to decorate
callable: Wrapped function with ignore checks
The decorator performs these actions in order:
1. Converts user mask to hostmask string
2. Checks against all ignore entries in permission DB
3. Uses fnmatch for wildcard pattern matching
4. Blocks processing if any match found
async def wrapper(self, mask, *args, **kwargs):
"""Execute wrapped function only if user is not ignored."""
hostmask = str(mask)
User = Query()
ignored_entries = self.bot.permission_db.search(
User.permission == 'ignore'
for entry in ignored_entries:
if fnmatch.fnmatch(hostmask, entry['mask']):
self.bot.log.debug(f"Blocking processing for {hostmask}")
return True # Block further processing
return await func(self, mask, *args, **kwargs)
# Preserve the original function's attributes for irc3
wrapper.__name__ = func.__name__
wrapper.__module__ = func.__module__
wrapper.__doc__ = func.__doc__
return wrapper
class TinyDBPermissions:
"""Main permission system plugin handling storage and commands."""
def __init__(self, bot):
self.bot = bot
self.permission_db = TinyDB('permissions.json')
self.bot.permission_db = self.permission_db
self.User = Query()
self.bot.log.info("TinyDB permissions plugin initialized")
async def perm(self, mask, target, args):
"""Manage permissions through command interface.
%%perm --add <mask> <permission>
%%perm --del <mask> <permission>
%%perm --list [<mask>]
if args['--add']:
await self._add_permission(target, args['<mask>'], args['<permission>'])
elif args['--del']:
await self._del_permission(target, args['<mask>'], args['<permission>'])
elif args['--list']:
self._list_permissions(target, args['<mask>'])
error_msg = style(
"Invalid syntax. Use --add, --del, or --list.",
fg='red', bold=True
self.bot.privmsg(target, error_msg)
async def ignore(self, mask, target, args):
"""Manage user ignore list.
%%ignore --add <nick_or_mask>
%%ignore --del <nick_or_mask>
%%ignore --list
if args['--list']:
ignored = self.permission_db.search(
self.User.permission == 'ignore'
if not ignored:
msg = style("No ignored users found", fg='yellow', bold=True)
self.bot.privmsg(target, msg)
self.bot.privmsg(target, style("Ignored users:", fg='blue', bold=True))
for entry in ignored:
msg = style(
f"{entry['mask']} (since {entry.get('timestamp', 'unknown')}",
self.bot.privmsg(target, msg)
mask_or_nick = args['<nick_or_mask>']
if '!' in mask_or_nick or '@' in mask_or_nick:
user_mask = mask_or_nick
user_mask = await self._get_hostmask(mask_or_nick)
except ValueError as e:
# Fallback to nick!*@* pattern if WHOIS fails
user_mask = f"{mask_or_nick}!*@*"
warning_msg = style(
f"Using fallback hostmask {user_mask} ({str(e)})",
fg='yellow', bold=True
self.bot.privmsg(target, warning_msg)
if args['--add']:
if self.permission_db.contains(
(self.User.mask == user_mask) &
(self.User.permission == 'ignore')
msg = style(f"{user_mask} already ignored", fg='yellow', bold=True)
'mask': user_mask,
'permission': 'ignore',
'timestamp': datetime.now().isoformat()
msg = style(f"Ignored {user_mask}", fg='green', bold=True)
elif args['--del']:
removed = self.permission_db.remove(
(self.User.mask == user_mask) &
(self.User.permission == 'ignore')
msg = style(f"Unignored {user_mask} ({len(removed)} entries)", fg='green', bold=True)
msg = style("Invalid syntax", fg='red', bold=True)
self.bot.privmsg(target, msg)
async def _get_hostmask(self, nick):
"""Get the hostmask for a given nickname using WHOIS."""
whois_info = await self.bot.async_cmds.whois(nick)
# Validate required fields exist
required_fields = {'username', 'host', 'success'}
missing = required_fields - whois_info.keys()
if missing:
raise ValueError(f"WHOIS response missing fields: {', '.join(missing)}")
if not whois_info['success']:
raise ValueError(f"WHOIS failed for {nick} (server error)")
except Exception as e:
raise ValueError(f"WHOIS error: {str(e)}") from e
# Use .get() with fallback for optional fields
username = whois_info.get('username', 'unknown')
host = whois_info.get('host', 'unknown.host')
return f"{nick}!{username}@{host}"
async def _add_permission(self, target, user_mask, perm):
"""Add a permission to the database."""
original_mask = user_mask
# Check if user_mask is a nickname (no ! or @)
if '!' not in user_mask and '@' not in user_mask:
user_mask = await self._get_hostmask(user_mask)
except ValueError as e:
error_msg = style(
f"Failed to get hostmask for {original_mask}: {e}",
fg='red', bold=True
self.bot.privmsg(target, error_msg)
existing = self.permission_db.search(
(self.User.mask == user_mask) &
(self.User.permission == perm)
if existing:
msg = style(
f"Permission '{perm}' already exists for {user_mask}",
fg='yellow', bold=True
self.permission_db.insert({'mask': user_mask, 'permission': perm})
msg = style(
f"Added permission '{perm}' for {user_mask}",
fg='green', bold=True
self.bot.privmsg(target, msg)
async def _del_permission(self, target, user_mask, perm):
"""Remove a permission from the database."""
original_mask = user_mask
# Check if user_mask is a nickname (no ! or @)
if '!' not in user_mask and '@' not in user_mask:
user_mask = await self._get_hostmask(user_mask)
except ValueError as e:
error_msg = style(
f"Failed to get hostmask for {original_mask}: {e}",
fg='red', bold=True
self.bot.privmsg(target, error_msg)
removed = self.permission_db.remove(
(self.User.mask == user_mask) &
(self.User.permission == perm)
if removed:
msg = style(
f"Removed {len(removed)} '{perm}' permission(s) for {user_mask}",
fg='green', bold=True
msg = style(
f"No '{perm}' permissions found for {user_mask}",
fg='red', bold=True
self.bot.privmsg(target, msg)
def _list_permissions(self, target, mask_filter):
"""List permissions matching a filter pattern."""
mask_filter = mask_filter or '*'
regex = fnmatch.translate(mask_filter).split('(?ms)')[0].rstrip('\\Z')
entries = self.permission_db.search(
if not entries:
msg = style("No permissions found", fg='red', bold=True)
self.bot.privmsg(target, msg)
for entry in entries:
msg = style(
f"{entry['mask']}: {entry['permission']}",
fg='blue', bold=True
self.bot.privmsg(target, msg)
# @irc3.event(irc3.rfc.PRIVMSG)
# @check_ignore
# async def on_privmsg(self, mask, event, target, data):
# """Handle PRIVMSG events with integrated ignore checks.
# Args:
# mask: User's hostmask
# event: IRC event type
# target: Message target (channel or user)
# data: Message content
# Returns:
# bool: True to block processing, False to continue
# """
# return True
# Continue processing if not ignored
#return False
class TinyDBPolicy:
"""Authorization system for command access control."""
def __init__(self, bot):
self.bot = bot
self.User = Query()
def has_permission(self, client_mask, permission):
"""Check if a client has required permissions."""
# Check ignore list first
ignored = self.bot.permission_db.search(
self.User.permission == 'ignore'
for entry in ignored:
if fnmatch.fnmatch(client_mask, entry['mask']):
return False
# Check permissions if not ignored
if permission is None:
return True
# Check for matching permissions using fnmatch
entries = self.bot.permission_db.search(
self.User.permission.test(lambda p: p in (permission, 'all_permissions'))
for entry in entries:
if fnmatch.fnmatch(client_mask, entry['mask']):
return True
return False
def __call__(self, predicates, meth, client, target, args):
"""Enforce command permissions."""
cmd_name = predicates.get('name', meth.__name__)
client_hostmask = str(client)
if self.has_permission(client_hostmask, predicates.get('permission')):
return meth(client, target, args)
error_msg = style(
f"Access denied for '{cmd_name}' command",
fg='red', bold=True
self.bot.privmsg(client.nick, error_msg)