import irc3 from irc3.plugins.command import command from tinydb import Query, TinyDB import fnmatch from ircstyle import style from datetime import datetime def check_ignore(func): """Decorator to block processing for ignored users in event handlers. Args: func (callable): The function to decorate Returns: callable: Wrapped function with ignore checks The decorator performs these actions in order: 1. Converts user mask to hostmask string 2. Checks against all ignore entries in permission DB 3. Uses fnmatch for wildcard pattern matching 4. Blocks processing if any match found """ async def wrapper(self, mask, *args, **kwargs): """Execute wrapped function only if user is not ignored.""" hostmask = str(mask) User = Query() ignored_entries = self.bot.permission_db.search( User.permission == 'ignore' ) for entry in ignored_entries: if fnmatch.fnmatch(hostmask, entry['mask']): self.bot.log.debug(f"Blocking processing for {hostmask}") return True # Block further processing return await func(self, mask, *args, **kwargs) # Preserve the original function's attributes for irc3 wrapper.__name__ = func.__name__ wrapper.__module__ = func.__module__ wrapper.__doc__ = func.__doc__ return wrapper @irc3.plugin class TinyDBPermissions: """Main permission system plugin handling storage and commands.""" def __init__(self, bot): self.bot = bot self.permission_db = TinyDB('permissions.json') self.bot.permission_db = self.permission_db self.User = Query() self.bot.log.info("TinyDB permissions plugin initialized") @command(permission='admin') async def perm(self, mask, target, args): """Manage permissions through command interface. Usage: %%perm --add %%perm --del %%perm --list [] """ if args['--add']: await self._add_permission(target, args[''], args['']) elif args['--del']: await self._del_permission(target, args[''], args['']) elif args['--list']: self._list_permissions(target, args['']) else: error_msg = style( "Invalid syntax. Use --add, --del, or --list.", fg='red', bold=True ) self.bot.privmsg(target, error_msg) @command(permission='admin') async def ignore(self, mask, target, args): """Manage user ignore list. Usage: %%ignore --add %%ignore --del %%ignore --list """ if args['--list']: ignored = self.permission_db.search( self.User.permission == 'ignore' ) if not ignored: msg = style("No ignored users found", fg='yellow', bold=True) self.bot.privmsg(target, msg) return self.bot.privmsg(target, style("Ignored users:", fg='blue', bold=True)) for entry in ignored: msg = style( f"{entry['mask']} (since {entry.get('timestamp', 'unknown')}", fg='cyan' ) self.bot.privmsg(target, msg) return mask_or_nick = args[''] if '!' in mask_or_nick or '@' in mask_or_nick: user_mask = mask_or_nick else: try: user_mask = await self._get_hostmask(mask_or_nick) except ValueError as e: # Fallback to nick!*@* pattern if WHOIS fails user_mask = f"{mask_or_nick}!*@*" warning_msg = style( f"Using fallback hostmask {user_mask} ({str(e)})", fg='yellow', bold=True ) self.bot.privmsg(target, warning_msg) if args['--add']: if self.permission_db.contains( (self.User.mask == user_mask) & (self.User.permission == 'ignore') ): msg = style(f"{user_mask} already ignored", fg='yellow', bold=True) else: self.permission_db.insert({ 'mask': user_mask, 'permission': 'ignore', 'timestamp': datetime.now().isoformat() }) msg = style(f"Ignored {user_mask}", fg='green', bold=True) elif args['--del']: removed = self.permission_db.remove( (self.User.mask == user_mask) & (self.User.permission == 'ignore') ) msg = style(f"Unignored {user_mask} ({len(removed)} entries)", fg='green', bold=True) else: msg = style("Invalid syntax", fg='red', bold=True) self.bot.privmsg(target, msg) async def _get_hostmask(self, nick): """Get the hostmask for a given nickname using WHOIS.""" try: whois_info = await self.bot.async_cmds.whois(nick) # Validate required fields exist required_fields = {'username', 'host', 'success'} missing = required_fields - whois_info.keys() if missing: raise ValueError(f"WHOIS response missing fields: {', '.join(missing)}") if not whois_info['success']: raise ValueError(f"WHOIS failed for {nick} (server error)") except Exception as e: raise ValueError(f"WHOIS error: {str(e)}") from e # Use .get() with fallback for optional fields username = whois_info.get('username', 'unknown') host = whois_info.get('host', 'unknown.host') return f"{nick}!{username}@{host}" async def _add_permission(self, target, user_mask, perm): """Add a permission to the database.""" original_mask = user_mask try: # Check if user_mask is a nickname (no ! or @) if '!' not in user_mask and '@' not in user_mask: user_mask = await self._get_hostmask(user_mask) except ValueError as e: error_msg = style( f"Failed to get hostmask for {original_mask}: {e}", fg='red', bold=True ) self.bot.privmsg(target, error_msg) return existing = self.permission_db.search( (self.User.mask == user_mask) & (self.User.permission == perm) ) if existing: msg = style( f"Permission '{perm}' already exists for {user_mask}", fg='yellow', bold=True ) else: self.permission_db.insert({'mask': user_mask, 'permission': perm}) msg = style( f"Added permission '{perm}' for {user_mask}", fg='green', bold=True ) self.bot.privmsg(target, msg) async def _del_permission(self, target, user_mask, perm): """Remove a permission from the database.""" original_mask = user_mask try: # Check if user_mask is a nickname (no ! or @) if '!' not in user_mask and '@' not in user_mask: user_mask = await self._get_hostmask(user_mask) except ValueError as e: error_msg = style( f"Failed to get hostmask for {original_mask}: {e}", fg='red', bold=True ) self.bot.privmsg(target, error_msg) return removed = self.permission_db.remove( (self.User.mask == user_mask) & (self.User.permission == perm) ) if removed: msg = style( f"Removed {len(removed)} '{perm}' permission(s) for {user_mask}", fg='green', bold=True ) else: msg = style( f"No '{perm}' permissions found for {user_mask}", fg='red', bold=True ) self.bot.privmsg(target, msg) def _list_permissions(self, target, mask_filter): """List permissions matching a filter pattern.""" mask_filter = mask_filter or '*' regex = fnmatch.translate(mask_filter).split('(?ms)')[0].rstrip('\\Z') entries = self.permission_db.search( self.User.mask.matches(regex) ) if not entries: msg = style("No permissions found", fg='red', bold=True) self.bot.privmsg(target, msg) return for entry in entries: msg = style( f"{entry['mask']}: {entry['permission']}", fg='blue', bold=True ) self.bot.privmsg(target, msg) # @irc3.event(irc3.rfc.PRIVMSG) # @check_ignore # async def on_privmsg(self, mask, event, target, data): # """Handle PRIVMSG events with integrated ignore checks. # Args: # mask: User's hostmask # event: IRC event type # target: Message target (channel or user) # data: Message content # Returns: # bool: True to block processing, False to continue # """ # return True # Continue processing if not ignored #return False class TinyDBPolicy: """Authorization system for command access control.""" def __init__(self, bot): self.bot = bot self.User = Query() def has_permission(self, client_mask, permission): """Check if a client has required permissions.""" # Check ignore list first ignored = self.bot.permission_db.search( self.User.permission == 'ignore' ) for entry in ignored: if fnmatch.fnmatch(client_mask, entry['mask']): return False # Check permissions if not ignored if permission is None: return True # Check for matching permissions using fnmatch entries = self.bot.permission_db.search( self.User.permission.test(lambda p: p in (permission, 'all_permissions')) ) for entry in entries: if fnmatch.fnmatch(client_mask, entry['mask']): return True return False def __call__(self, predicates, meth, client, target, args): """Enforce command permissions.""" cmd_name = predicates.get('name', meth.__name__) client_hostmask = str(client) if self.has_permission(client_hostmask, predicates.get('permission')): return meth(client, target, args) error_msg = style( f"Access denied for '{cmd_name}' command", fg='red', bold=True ) self.bot.privmsg(client.nick, error_msg)