From 430eb73d820ef0991f3e4196b73908865f05daba Mon Sep 17 00:00:00 2001 From: Zodiac Date: Fri, 14 Feb 2025 16:17:53 -0800 Subject: [PATCH] updates --- permissions.json | 2 +- plugins/goat.py | 4 +- plugins/my_yt.py | 2 + plugins/services/permissions.py | 170 ++++++++++++++++++++++++++++---- 4 files changed, 156 insertions(+), 22 deletions(-) diff --git a/permissions.json b/permissions.json index ad3dc1c..755a545 100644 --- a/permissions.json +++ b/permissions.json @@ -1 +1 @@ -{"_default": {"2": {"mask": "*!uid677411@2266CC43:5D1E0B1F:A5B0466C:IP", "permission": "admin"}}} \ No newline at end of file +{"_default": {"2": {"mask": "*!uid677411@2266CC43:5D1E0B1F:A5B0466C:IP", "permission": "admin"}, "3": {"mask": "sad!*@*", "permission": "", "timestamp": "2025-02-14T16:05:44.177666"}, "4": {"mask": "Day!~Day@194.44.50.9", "permission": "ignore", "timestamp": "2025-02-14T16:05:53.633575"}}} \ No newline at end of file diff --git a/plugins/goat.py b/plugins/goat.py index 35b47e3..425b30c 100644 --- a/plugins/goat.py +++ b/plugins/goat.py @@ -51,7 +51,7 @@ class GoatPlugin: self.bot = bot self.goat_tasks = {} - @command + @command(permission='admin') def goat(self, mask, target, args): """ Send the contents of goat.txt line by line to the channel and resend when reaching the end. @@ -80,7 +80,7 @@ class GoatPlugin: task = self.bot.loop.create_task(self.send_lines(target, nick, lines)) self.goat_tasks[target] = task - @command + @command(permission='admin') def goatstop(self, mask, target, args): """ Stop the goat command. diff --git a/plugins/my_yt.py b/plugins/my_yt.py index 8ecb0ce..63fee42 100644 --- a/plugins/my_yt.py +++ b/plugins/my_yt.py @@ -34,6 +34,7 @@ import googleapiclient.discovery import re import datetime from irc3.plugins.command import command +from plugins.services.permissions import check_ignore # Constants for YouTube API API_SERVICE_NAME = "youtube" @@ -229,6 +230,7 @@ class YouTubePlugin: self.bot.privmsg(target, f"Search error: {e}") @irc3.event(irc3.rfc.PRIVMSG) + @check_ignore async def on_message(self, mask, event, target, data): """ Event handler for messages in the channel to detect and respond to YouTube links. diff --git a/plugins/services/permissions.py b/plugins/services/permissions.py index 10c5505..41f5f0a 100644 --- a/plugins/services/permissions.py +++ b/plugins/services/permissions.py @@ -1,13 +1,44 @@ -# permissions.py -# -*- coding: utf-8 -*- -"""A plugin for irc3 that provides a permission system using TinyDB.""" - import irc3 from irc3.plugins.command import command from tinydb import Query, TinyDB import fnmatch from ircstyle import style +from datetime import datetime +def check_ignore(func): + """Decorator to block processing for ignored users in event handlers. + + Args: + func (callable): The function to decorate + + Returns: + callable: Wrapped function with ignore checks + + The decorator performs these actions in order: + 1. Converts user mask to hostmask string + 2. Checks against all ignore entries in permission DB + 3. Uses fnmatch for wildcard pattern matching + 4. Blocks processing if any match found + """ + async def wrapper(self, mask, *args, **kwargs): + """Execute wrapped function only if user is not ignored.""" + hostmask = str(mask) + User = Query() + ignored_entries = self.bot.permission_db.search( + User.permission == 'ignore' + ) + + for entry in ignored_entries: + if fnmatch.fnmatch(hostmask, entry['mask']): + self.bot.log.debug(f"Blocking processing for {hostmask}") + return True # Block further processing + return await func(self, mask, *args, **kwargs) + + # Preserve the original function's attributes for irc3 + wrapper.__name__ = func.__name__ + wrapper.__module__ = func.__module__ + wrapper.__doc__ = func.__doc__ + return wrapper @irc3.plugin class TinyDBPermissions: @@ -21,7 +52,7 @@ class TinyDBPermissions: self.bot.log.info("TinyDB permissions plugin initialized") @command(permission='admin') - def perm(self, mask, target, args): + async def perm(self, mask, target, args): """Manage permissions through command interface. Usage: @@ -30,9 +61,9 @@ class TinyDBPermissions: %%perm --list [] """ if args['--add']: - self._add_permission(target, args[''], args['']) + await self._add_permission(target, args[''], args['']) elif args['--del']: - self._del_permission(target, args[''], args['']) + await self._del_permission(target, args[''], args['']) elif args['--list']: self._list_permissions(target, args['']) else: @@ -43,40 +74,110 @@ class TinyDBPermissions: self.bot.privmsg(target, error_msg) @command(permission='admin') - def ignore(self, mask, target, args): + async def ignore(self, mask, target, args): """Manage user ignore list. Usage: - %%ignore --add - %%ignore --del + %%ignore --add + %%ignore --del + %%ignore --list """ - nick = args[''] - user_mask = f"{nick}!*@*" - + if args['--list']: + ignored = self.permission_db.search( + self.User.permission == 'ignore' + ) + if not ignored: + msg = style("No ignored users found", fg='yellow', bold=True) + self.bot.privmsg(target, msg) + return + + self.bot.privmsg(target, style("Ignored users:", fg='blue', bold=True)) + for entry in ignored: + msg = style( + f"{entry['mask']} (since {entry.get('timestamp', 'unknown')}", + fg='cyan' + ) + self.bot.privmsg(target, msg) + return + + mask_or_nick = args[''] + if '!' in mask_or_nick or '@' in mask_or_nick: + user_mask = mask_or_nick + else: + try: + user_mask = await self._get_hostmask(mask_or_nick) + except ValueError as e: + # Fallback to nick!*@* pattern if WHOIS fails + user_mask = f"{mask_or_nick}!*@*" + warning_msg = style( + f"Using fallback hostmask {user_mask} ({str(e)})", + fg='yellow', bold=True + ) + self.bot.privmsg(target, warning_msg) + if args['--add']: if self.permission_db.contains( (self.User.mask == user_mask) & (self.User.permission == 'ignore') ): - msg = style(f"{nick} already ignored", fg='yellow', bold=True) + msg = style(f"{user_mask} already ignored", fg='yellow', bold=True) else: - self.permission_db.insert({'mask': user_mask, 'permission': 'ignore'}) - msg = style(f"Ignored {nick}", fg='green', bold=True) + self.permission_db.insert({ + 'mask': user_mask, + 'permission': 'ignore', + 'timestamp': datetime.now().isoformat() + }) + msg = style(f"Ignored {user_mask}", fg='green', bold=True) elif args['--del']: removed = self.permission_db.remove( (self.User.mask == user_mask) & (self.User.permission == 'ignore') ) - msg = style(f"Unignored {nick} ({len(removed)} entries)", fg='green', bold=True) + msg = style(f"Unignored {user_mask} ({len(removed)} entries)", fg='green', bold=True) else: msg = style("Invalid syntax", fg='red', bold=True) self.bot.privmsg(target, msg) + + async def _get_hostmask(self, nick): + """Get the hostmask for a given nickname using WHOIS.""" + try: + whois_info = await self.bot.async_cmds.whois(nick) + + # Validate required fields exist + required_fields = {'username', 'host', 'success'} + missing = required_fields - whois_info.keys() + if missing: + raise ValueError(f"WHOIS response missing fields: {', '.join(missing)}") + + if not whois_info['success']: + raise ValueError(f"WHOIS failed for {nick} (server error)") + + except Exception as e: + raise ValueError(f"WHOIS error: {str(e)}") from e - def _add_permission(self, target, user_mask, perm): + # Use .get() with fallback for optional fields + username = whois_info.get('username', 'unknown') + host = whois_info.get('host', 'unknown.host') + return f"{nick}!{username}@{host}" + + async def _add_permission(self, target, user_mask, perm): """Add a permission to the database.""" + original_mask = user_mask + try: + # Check if user_mask is a nickname (no ! or @) + if '!' not in user_mask and '@' not in user_mask: + user_mask = await self._get_hostmask(user_mask) + except ValueError as e: + error_msg = style( + f"Failed to get hostmask for {original_mask}: {e}", + fg='red', bold=True + ) + self.bot.privmsg(target, error_msg) + return + existing = self.permission_db.search( (self.User.mask == user_mask) & (self.User.permission == perm) @@ -94,8 +195,21 @@ class TinyDBPermissions: ) self.bot.privmsg(target, msg) - def _del_permission(self, target, user_mask, perm): + async def _del_permission(self, target, user_mask, perm): """Remove a permission from the database.""" + original_mask = user_mask + try: + # Check if user_mask is a nickname (no ! or @) + if '!' not in user_mask and '@' not in user_mask: + user_mask = await self._get_hostmask(user_mask) + except ValueError as e: + error_msg = style( + f"Failed to get hostmask for {original_mask}: {e}", + fg='red', bold=True + ) + self.bot.privmsg(target, error_msg) + return + removed = self.permission_db.remove( (self.User.mask == user_mask) & (self.User.permission == perm) @@ -132,6 +246,24 @@ class TinyDBPermissions: ) self.bot.privmsg(target, msg) + # @irc3.event(irc3.rfc.PRIVMSG) + # @check_ignore + # async def on_privmsg(self, mask, event, target, data): + # """Handle PRIVMSG events with integrated ignore checks. + + # Args: + # mask: User's hostmask + # event: IRC event type + # target: Message target (channel or user) + # data: Message content + + # Returns: + # bool: True to block processing, False to continue + # """ + # return True + # Continue processing if not ignored + #return False + class TinyDBPolicy: """Authorization system for command access control."""