updates
This commit is contained in:
parent
345d60dfbd
commit
430eb73d82
@ -1 +1 @@
|
||||
{"_default": {"2": {"mask": "*!uid677411@2266CC43:5D1E0B1F:A5B0466C:IP", "permission": "admin"}}}
|
||||
{"_default": {"2": {"mask": "*!uid677411@2266CC43:5D1E0B1F:A5B0466C:IP", "permission": "admin"}, "3": {"mask": "sad!*@*", "permission": "", "timestamp": "2025-02-14T16:05:44.177666"}, "4": {"mask": "Day!~Day@194.44.50.9", "permission": "ignore", "timestamp": "2025-02-14T16:05:53.633575"}}}
|
@ -51,7 +51,7 @@ class GoatPlugin:
|
||||
self.bot = bot
|
||||
self.goat_tasks = {}
|
||||
|
||||
@command
|
||||
@command(permission='admin')
|
||||
def goat(self, mask, target, args):
|
||||
"""
|
||||
Send the contents of goat.txt line by line to the channel and resend when reaching the end.
|
||||
@ -80,7 +80,7 @@ class GoatPlugin:
|
||||
task = self.bot.loop.create_task(self.send_lines(target, nick, lines))
|
||||
self.goat_tasks[target] = task
|
||||
|
||||
@command
|
||||
@command(permission='admin')
|
||||
def goatstop(self, mask, target, args):
|
||||
"""
|
||||
Stop the goat command.
|
||||
|
@ -34,6 +34,7 @@ import googleapiclient.discovery
|
||||
import re
|
||||
import datetime
|
||||
from irc3.plugins.command import command
|
||||
from plugins.services.permissions import check_ignore
|
||||
|
||||
# Constants for YouTube API
|
||||
API_SERVICE_NAME = "youtube"
|
||||
@ -229,6 +230,7 @@ class YouTubePlugin:
|
||||
self.bot.privmsg(target, f"Search error: {e}")
|
||||
|
||||
@irc3.event(irc3.rfc.PRIVMSG)
|
||||
@check_ignore
|
||||
async def on_message(self, mask, event, target, data):
|
||||
"""
|
||||
Event handler for messages in the channel to detect and respond to YouTube links.
|
||||
|
@ -1,13 +1,44 @@
|
||||
# permissions.py
|
||||
# -*- coding: utf-8 -*-
|
||||
"""A plugin for irc3 that provides a permission system using TinyDB."""
|
||||
|
||||
import irc3
|
||||
from irc3.plugins.command import command
|
||||
from tinydb import Query, TinyDB
|
||||
import fnmatch
|
||||
from ircstyle import style
|
||||
from datetime import datetime
|
||||
|
||||
def check_ignore(func):
|
||||
"""Decorator to block processing for ignored users in event handlers.
|
||||
|
||||
Args:
|
||||
func (callable): The function to decorate
|
||||
|
||||
Returns:
|
||||
callable: Wrapped function with ignore checks
|
||||
|
||||
The decorator performs these actions in order:
|
||||
1. Converts user mask to hostmask string
|
||||
2. Checks against all ignore entries in permission DB
|
||||
3. Uses fnmatch for wildcard pattern matching
|
||||
4. Blocks processing if any match found
|
||||
"""
|
||||
async def wrapper(self, mask, *args, **kwargs):
|
||||
"""Execute wrapped function only if user is not ignored."""
|
||||
hostmask = str(mask)
|
||||
User = Query()
|
||||
ignored_entries = self.bot.permission_db.search(
|
||||
User.permission == 'ignore'
|
||||
)
|
||||
|
||||
for entry in ignored_entries:
|
||||
if fnmatch.fnmatch(hostmask, entry['mask']):
|
||||
self.bot.log.debug(f"Blocking processing for {hostmask}")
|
||||
return True # Block further processing
|
||||
return await func(self, mask, *args, **kwargs)
|
||||
|
||||
# Preserve the original function's attributes for irc3
|
||||
wrapper.__name__ = func.__name__
|
||||
wrapper.__module__ = func.__module__
|
||||
wrapper.__doc__ = func.__doc__
|
||||
return wrapper
|
||||
|
||||
@irc3.plugin
|
||||
class TinyDBPermissions:
|
||||
@ -21,7 +52,7 @@ class TinyDBPermissions:
|
||||
self.bot.log.info("TinyDB permissions plugin initialized")
|
||||
|
||||
@command(permission='admin')
|
||||
def perm(self, mask, target, args):
|
||||
async def perm(self, mask, target, args):
|
||||
"""Manage permissions through command interface.
|
||||
|
||||
Usage:
|
||||
@ -30,9 +61,9 @@ class TinyDBPermissions:
|
||||
%%perm --list [<mask>]
|
||||
"""
|
||||
if args['--add']:
|
||||
self._add_permission(target, args['<mask>'], args['<permission>'])
|
||||
await self._add_permission(target, args['<mask>'], args['<permission>'])
|
||||
elif args['--del']:
|
||||
self._del_permission(target, args['<mask>'], args['<permission>'])
|
||||
await self._del_permission(target, args['<mask>'], args['<permission>'])
|
||||
elif args['--list']:
|
||||
self._list_permissions(target, args['<mask>'])
|
||||
else:
|
||||
@ -43,40 +74,110 @@ class TinyDBPermissions:
|
||||
self.bot.privmsg(target, error_msg)
|
||||
|
||||
@command(permission='admin')
|
||||
def ignore(self, mask, target, args):
|
||||
async def ignore(self, mask, target, args):
|
||||
"""Manage user ignore list.
|
||||
|
||||
Usage:
|
||||
%%ignore --add <nick>
|
||||
%%ignore --del <nick>
|
||||
%%ignore --add <nick_or_mask>
|
||||
%%ignore --del <nick_or_mask>
|
||||
%%ignore --list
|
||||
"""
|
||||
nick = args['<nick>']
|
||||
user_mask = f"{nick}!*@*"
|
||||
if args['--list']:
|
||||
ignored = self.permission_db.search(
|
||||
self.User.permission == 'ignore'
|
||||
)
|
||||
if not ignored:
|
||||
msg = style("No ignored users found", fg='yellow', bold=True)
|
||||
self.bot.privmsg(target, msg)
|
||||
return
|
||||
|
||||
self.bot.privmsg(target, style("Ignored users:", fg='blue', bold=True))
|
||||
for entry in ignored:
|
||||
msg = style(
|
||||
f"{entry['mask']} (since {entry.get('timestamp', 'unknown')}",
|
||||
fg='cyan'
|
||||
)
|
||||
self.bot.privmsg(target, msg)
|
||||
return
|
||||
|
||||
mask_or_nick = args['<nick_or_mask>']
|
||||
if '!' in mask_or_nick or '@' in mask_or_nick:
|
||||
user_mask = mask_or_nick
|
||||
else:
|
||||
try:
|
||||
user_mask = await self._get_hostmask(mask_or_nick)
|
||||
except ValueError as e:
|
||||
# Fallback to nick!*@* pattern if WHOIS fails
|
||||
user_mask = f"{mask_or_nick}!*@*"
|
||||
warning_msg = style(
|
||||
f"Using fallback hostmask {user_mask} ({str(e)})",
|
||||
fg='yellow', bold=True
|
||||
)
|
||||
self.bot.privmsg(target, warning_msg)
|
||||
|
||||
if args['--add']:
|
||||
if self.permission_db.contains(
|
||||
(self.User.mask == user_mask) &
|
||||
(self.User.permission == 'ignore')
|
||||
):
|
||||
msg = style(f"{nick} already ignored", fg='yellow', bold=True)
|
||||
msg = style(f"{user_mask} already ignored", fg='yellow', bold=True)
|
||||
else:
|
||||
self.permission_db.insert({'mask': user_mask, 'permission': 'ignore'})
|
||||
msg = style(f"Ignored {nick}", fg='green', bold=True)
|
||||
self.permission_db.insert({
|
||||
'mask': user_mask,
|
||||
'permission': 'ignore',
|
||||
'timestamp': datetime.now().isoformat()
|
||||
})
|
||||
msg = style(f"Ignored {user_mask}", fg='green', bold=True)
|
||||
|
||||
elif args['--del']:
|
||||
removed = self.permission_db.remove(
|
||||
(self.User.mask == user_mask) &
|
||||
(self.User.permission == 'ignore')
|
||||
)
|
||||
msg = style(f"Unignored {nick} ({len(removed)} entries)", fg='green', bold=True)
|
||||
msg = style(f"Unignored {user_mask} ({len(removed)} entries)", fg='green', bold=True)
|
||||
|
||||
else:
|
||||
msg = style("Invalid syntax", fg='red', bold=True)
|
||||
|
||||
self.bot.privmsg(target, msg)
|
||||
|
||||
def _add_permission(self, target, user_mask, perm):
|
||||
async def _get_hostmask(self, nick):
|
||||
"""Get the hostmask for a given nickname using WHOIS."""
|
||||
try:
|
||||
whois_info = await self.bot.async_cmds.whois(nick)
|
||||
|
||||
# Validate required fields exist
|
||||
required_fields = {'username', 'host', 'success'}
|
||||
missing = required_fields - whois_info.keys()
|
||||
if missing:
|
||||
raise ValueError(f"WHOIS response missing fields: {', '.join(missing)}")
|
||||
|
||||
if not whois_info['success']:
|
||||
raise ValueError(f"WHOIS failed for {nick} (server error)")
|
||||
|
||||
except Exception as e:
|
||||
raise ValueError(f"WHOIS error: {str(e)}") from e
|
||||
|
||||
# Use .get() with fallback for optional fields
|
||||
username = whois_info.get('username', 'unknown')
|
||||
host = whois_info.get('host', 'unknown.host')
|
||||
return f"{nick}!{username}@{host}"
|
||||
|
||||
async def _add_permission(self, target, user_mask, perm):
|
||||
"""Add a permission to the database."""
|
||||
original_mask = user_mask
|
||||
try:
|
||||
# Check if user_mask is a nickname (no ! or @)
|
||||
if '!' not in user_mask and '@' not in user_mask:
|
||||
user_mask = await self._get_hostmask(user_mask)
|
||||
except ValueError as e:
|
||||
error_msg = style(
|
||||
f"Failed to get hostmask for {original_mask}: {e}",
|
||||
fg='red', bold=True
|
||||
)
|
||||
self.bot.privmsg(target, error_msg)
|
||||
return
|
||||
|
||||
existing = self.permission_db.search(
|
||||
(self.User.mask == user_mask) &
|
||||
(self.User.permission == perm)
|
||||
@ -94,8 +195,21 @@ class TinyDBPermissions:
|
||||
)
|
||||
self.bot.privmsg(target, msg)
|
||||
|
||||
def _del_permission(self, target, user_mask, perm):
|
||||
async def _del_permission(self, target, user_mask, perm):
|
||||
"""Remove a permission from the database."""
|
||||
original_mask = user_mask
|
||||
try:
|
||||
# Check if user_mask is a nickname (no ! or @)
|
||||
if '!' not in user_mask and '@' not in user_mask:
|
||||
user_mask = await self._get_hostmask(user_mask)
|
||||
except ValueError as e:
|
||||
error_msg = style(
|
||||
f"Failed to get hostmask for {original_mask}: {e}",
|
||||
fg='red', bold=True
|
||||
)
|
||||
self.bot.privmsg(target, error_msg)
|
||||
return
|
||||
|
||||
removed = self.permission_db.remove(
|
||||
(self.User.mask == user_mask) &
|
||||
(self.User.permission == perm)
|
||||
@ -132,6 +246,24 @@ class TinyDBPermissions:
|
||||
)
|
||||
self.bot.privmsg(target, msg)
|
||||
|
||||
# @irc3.event(irc3.rfc.PRIVMSG)
|
||||
# @check_ignore
|
||||
# async def on_privmsg(self, mask, event, target, data):
|
||||
# """Handle PRIVMSG events with integrated ignore checks.
|
||||
|
||||
# Args:
|
||||
# mask: User's hostmask
|
||||
# event: IRC event type
|
||||
# target: Message target (channel or user)
|
||||
# data: Message content
|
||||
|
||||
# Returns:
|
||||
# bool: True to block processing, False to continue
|
||||
# """
|
||||
# return True
|
||||
# Continue processing if not ignored
|
||||
#return False
|
||||
|
||||
class TinyDBPolicy:
|
||||
"""Authorization system for command access control."""
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user