310 lines
11 KiB
Python
310 lines
11 KiB
Python
import irc3
|
|
from irc3.plugins.command import command
|
|
from tinydb import Query, TinyDB
|
|
import fnmatch
|
|
from ircstyle import style
|
|
from datetime import datetime
|
|
|
|
def check_ignore(func):
|
|
"""Decorator to block processing for ignored users in event handlers.
|
|
|
|
Args:
|
|
func (callable): The function to decorate
|
|
|
|
Returns:
|
|
callable: Wrapped function with ignore checks
|
|
|
|
The decorator performs these actions in order:
|
|
1. Converts user mask to hostmask string
|
|
2. Checks against all ignore entries in permission DB
|
|
3. Uses fnmatch for wildcard pattern matching
|
|
4. Blocks processing if any match found
|
|
"""
|
|
async def wrapper(self, mask, *args, **kwargs):
|
|
"""Execute wrapped function only if user is not ignored."""
|
|
hostmask = str(mask)
|
|
User = Query()
|
|
ignored_entries = self.bot.permission_db.search(
|
|
User.permission == 'ignore'
|
|
)
|
|
|
|
for entry in ignored_entries:
|
|
if fnmatch.fnmatch(hostmask, entry['mask']):
|
|
self.bot.log.debug(f"Blocking processing for {hostmask}")
|
|
return True # Block further processing
|
|
return await func(self, mask, *args, **kwargs)
|
|
|
|
# Preserve the original function's attributes for irc3
|
|
wrapper.__name__ = func.__name__
|
|
wrapper.__module__ = func.__module__
|
|
wrapper.__doc__ = func.__doc__
|
|
return wrapper
|
|
|
|
@irc3.plugin
|
|
class TinyDBPermissions:
|
|
"""Main permission system plugin handling storage and commands."""
|
|
|
|
def __init__(self, bot):
|
|
self.bot = bot
|
|
self.permission_db = TinyDB('permissions.json')
|
|
self.bot.permission_db = self.permission_db
|
|
self.User = Query()
|
|
self.bot.log.info("TinyDB permissions plugin initialized")
|
|
|
|
@command(permission='admin')
|
|
async def perm(self, mask, target, args):
|
|
"""Manage permissions through command interface.
|
|
|
|
Usage:
|
|
%%perm --add <mask> <permission>
|
|
%%perm --del <mask> <permission>
|
|
%%perm --list [<mask>]
|
|
"""
|
|
if args['--add']:
|
|
await self._add_permission(target, args['<mask>'], args['<permission>'])
|
|
elif args['--del']:
|
|
await self._del_permission(target, args['<mask>'], args['<permission>'])
|
|
elif args['--list']:
|
|
self._list_permissions(target, args['<mask>'])
|
|
else:
|
|
error_msg = style(
|
|
"Invalid syntax. Use --add, --del, or --list.",
|
|
fg='red', bold=True
|
|
)
|
|
self.bot.privmsg(target, error_msg)
|
|
|
|
@command(permission='admin')
|
|
async def ignore(self, mask, target, args):
|
|
"""Manage user ignore list.
|
|
|
|
Usage:
|
|
%%ignore --add <nick_or_mask>
|
|
%%ignore --del <nick_or_mask>
|
|
%%ignore --list
|
|
"""
|
|
if args['--list']:
|
|
ignored = self.permission_db.search(
|
|
self.User.permission == 'ignore'
|
|
)
|
|
if not ignored:
|
|
msg = style("No ignored users found", fg='yellow', bold=True)
|
|
self.bot.privmsg(target, msg)
|
|
return
|
|
|
|
self.bot.privmsg(target, style("Ignored users:", fg='blue', bold=True))
|
|
for entry in ignored:
|
|
msg = style(
|
|
f"{entry['mask']} (since {entry.get('timestamp', 'unknown')}",
|
|
fg='cyan'
|
|
)
|
|
self.bot.privmsg(target, msg)
|
|
return
|
|
|
|
mask_or_nick = args['<nick_or_mask>']
|
|
if '!' in mask_or_nick or '@' in mask_or_nick:
|
|
user_mask = mask_or_nick
|
|
else:
|
|
try:
|
|
user_mask = await self._get_hostmask(mask_or_nick)
|
|
except ValueError as e:
|
|
# Fallback to nick!*@* pattern if WHOIS fails
|
|
user_mask = f"{mask_or_nick}!*@*"
|
|
warning_msg = style(
|
|
f"Using fallback hostmask {user_mask} ({str(e)})",
|
|
fg='yellow', bold=True
|
|
)
|
|
self.bot.privmsg(target, warning_msg)
|
|
|
|
if args['--add']:
|
|
if self.permission_db.contains(
|
|
(self.User.mask == user_mask) &
|
|
(self.User.permission == 'ignore')
|
|
):
|
|
msg = style(f"{user_mask} already ignored", fg='yellow', bold=True)
|
|
else:
|
|
self.permission_db.insert({
|
|
'mask': user_mask,
|
|
'permission': 'ignore',
|
|
'timestamp': datetime.now().isoformat()
|
|
})
|
|
msg = style(f"Ignored {user_mask}", fg='green', bold=True)
|
|
|
|
elif args['--del']:
|
|
removed = self.permission_db.remove(
|
|
(self.User.mask == user_mask) &
|
|
(self.User.permission == 'ignore')
|
|
)
|
|
msg = style(f"Unignored {user_mask} ({len(removed)} entries)", fg='green', bold=True)
|
|
|
|
else:
|
|
msg = style("Invalid syntax", fg='red', bold=True)
|
|
|
|
self.bot.privmsg(target, msg)
|
|
|
|
async def _get_hostmask(self, nick):
|
|
"""Get the hostmask for a given nickname using WHOIS."""
|
|
try:
|
|
whois_info = await self.bot.async_cmds.whois(nick)
|
|
|
|
# Validate required fields exist
|
|
required_fields = {'username', 'host', 'success'}
|
|
missing = required_fields - whois_info.keys()
|
|
if missing:
|
|
raise ValueError(f"WHOIS response missing fields: {', '.join(missing)}")
|
|
|
|
if not whois_info['success']:
|
|
raise ValueError(f"WHOIS failed for {nick} (server error)")
|
|
|
|
except Exception as e:
|
|
raise ValueError(f"WHOIS error: {str(e)}") from e
|
|
|
|
# Use .get() with fallback for optional fields
|
|
username = whois_info.get('username', 'unknown')
|
|
host = whois_info.get('host', 'unknown.host')
|
|
return f"{nick}!{username}@{host}"
|
|
|
|
async def _add_permission(self, target, user_mask, perm):
|
|
"""Add a permission to the database."""
|
|
original_mask = user_mask
|
|
try:
|
|
# Check if user_mask is a nickname (no ! or @)
|
|
if '!' not in user_mask and '@' not in user_mask:
|
|
user_mask = await self._get_hostmask(user_mask)
|
|
except ValueError as e:
|
|
error_msg = style(
|
|
f"Failed to get hostmask for {original_mask}: {e}",
|
|
fg='red', bold=True
|
|
)
|
|
self.bot.privmsg(target, error_msg)
|
|
return
|
|
|
|
existing = self.permission_db.search(
|
|
(self.User.mask == user_mask) &
|
|
(self.User.permission == perm)
|
|
)
|
|
if existing:
|
|
msg = style(
|
|
f"Permission '{perm}' already exists for {user_mask}",
|
|
fg='yellow', bold=True
|
|
)
|
|
else:
|
|
self.permission_db.insert({'mask': user_mask, 'permission': perm})
|
|
msg = style(
|
|
f"Added permission '{perm}' for {user_mask}",
|
|
fg='green', bold=True
|
|
)
|
|
self.bot.privmsg(target, msg)
|
|
|
|
async def _del_permission(self, target, user_mask, perm):
|
|
"""Remove a permission from the database."""
|
|
original_mask = user_mask
|
|
try:
|
|
# Check if user_mask is a nickname (no ! or @)
|
|
if '!' not in user_mask and '@' not in user_mask:
|
|
user_mask = await self._get_hostmask(user_mask)
|
|
except ValueError as e:
|
|
error_msg = style(
|
|
f"Failed to get hostmask for {original_mask}: {e}",
|
|
fg='red', bold=True
|
|
)
|
|
self.bot.privmsg(target, error_msg)
|
|
return
|
|
|
|
removed = self.permission_db.remove(
|
|
(self.User.mask == user_mask) &
|
|
(self.User.permission == perm)
|
|
)
|
|
if removed:
|
|
msg = style(
|
|
f"Removed {len(removed)} '{perm}' permission(s) for {user_mask}",
|
|
fg='green', bold=True
|
|
)
|
|
else:
|
|
msg = style(
|
|
f"No '{perm}' permissions found for {user_mask}",
|
|
fg='red', bold=True
|
|
)
|
|
self.bot.privmsg(target, msg)
|
|
|
|
def _list_permissions(self, target, mask_filter):
|
|
"""List permissions matching a filter pattern."""
|
|
mask_filter = mask_filter or '*'
|
|
regex = fnmatch.translate(mask_filter).split('(?ms)')[0].rstrip('\\Z')
|
|
entries = self.permission_db.search(
|
|
self.User.mask.matches(regex)
|
|
)
|
|
|
|
if not entries:
|
|
msg = style("No permissions found", fg='red', bold=True)
|
|
self.bot.privmsg(target, msg)
|
|
return
|
|
|
|
for entry in entries:
|
|
msg = style(
|
|
f"{entry['mask']}: {entry['permission']}",
|
|
fg='blue', bold=True
|
|
)
|
|
self.bot.privmsg(target, msg)
|
|
|
|
# @irc3.event(irc3.rfc.PRIVMSG)
|
|
# @check_ignore
|
|
# async def on_privmsg(self, mask, event, target, data):
|
|
# """Handle PRIVMSG events with integrated ignore checks.
|
|
|
|
# Args:
|
|
# mask: User's hostmask
|
|
# event: IRC event type
|
|
# target: Message target (channel or user)
|
|
# data: Message content
|
|
|
|
# Returns:
|
|
# bool: True to block processing, False to continue
|
|
# """
|
|
# return True
|
|
# Continue processing if not ignored
|
|
#return False
|
|
|
|
class TinyDBPolicy:
|
|
"""Authorization system for command access control."""
|
|
|
|
def __init__(self, bot):
|
|
self.bot = bot
|
|
self.User = Query()
|
|
|
|
def has_permission(self, client_mask, permission):
|
|
"""Check if a client has required permissions."""
|
|
# Check ignore list first
|
|
ignored = self.bot.permission_db.search(
|
|
self.User.permission == 'ignore'
|
|
)
|
|
for entry in ignored:
|
|
if fnmatch.fnmatch(client_mask, entry['mask']):
|
|
return False
|
|
|
|
# Check permissions if not ignored
|
|
if permission is None:
|
|
return True
|
|
|
|
# Check for matching permissions using fnmatch
|
|
entries = self.bot.permission_db.search(
|
|
self.User.permission.test(lambda p: p in (permission, 'all_permissions'))
|
|
)
|
|
for entry in entries:
|
|
if fnmatch.fnmatch(client_mask, entry['mask']):
|
|
return True
|
|
|
|
return False
|
|
|
|
def __call__(self, predicates, meth, client, target, args):
|
|
"""Enforce command permissions."""
|
|
cmd_name = predicates.get('name', meth.__name__)
|
|
client_hostmask = str(client)
|
|
|
|
if self.has_permission(client_hostmask, predicates.get('permission')):
|
|
return meth(client, target, args)
|
|
|
|
error_msg = style(
|
|
f"Access denied for '{cmd_name}' command",
|
|
fg='red', bold=True
|
|
)
|
|
self.bot.privmsg(client.nick, error_msg) |