g1mp/plugins/users.py

203 lines
8.6 KiB
Python
Raw Normal View History

2025-02-13 04:55:42 +00:00
from __future__ import annotations
import logging
from irc3 import plugin, utils, rfc
from irc3.dec import event
from irc3.utils import IrcString
from collections import defaultdict
from typing import Optional, Set, Dict, Any, cast
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class Channel(set):
"""Represents an IRC channel with member management and mode tracking."""
def __init__(self) -> None:
super().__init__()
self.modes: Dict[str, Set[str]] = defaultdict(set)
self.topic: Optional[str] = None
def add_member(self, nick: str, modes: str = '') -> None:
"""Add member with optional modes."""
super().add(nick)
self._update_modes(nick, modes, add=True)
logger.debug(f"Added member {nick} with modes {modes} to channel")
def remove_member(self, nick: str) -> None:
"""Remove member and all associated modes."""
super().discard(nick)
self._update_modes(nick, remove=True)
logger.debug(f"Removed member {nick} from channel")
def _update_modes(self, nick: str, modes: str = '',
add: bool = False, remove: bool = False) -> None:
"""Update mode tracking for a member."""
for mode in modes:
if add:
self.modes[mode].add(nick)
elif remove:
self.modes[mode].discard(nick)
logger.debug(f"Updated modes for {nick}: {'Added' if add else 'Removed'} {modes}")
def __repr__(self) -> str:
return f"Channel({sorted(self)})"
@plugin
class Userlist:
"""Enhanced user tracking with proper hostmask handling and mode management."""
def __init__(self, context: Any) -> None:
self.context = context
self.connection_lost()
def connection_lost(self, client: Any = None) -> None:
"""Reset state on connection loss."""
self.channels: Dict[str, Channel] = defaultdict(Channel)
self.nicks: Dict[str, IrcString] = {}
self.context.channels = self.channels
self.context.nicks = self.nicks
logger.info("Connection lost, state reset.")
@event(rfc.JOIN_PART_QUIT)
def on_join_part_quit(self, mask: Optional[IrcString] = None,
event: Optional[str] = None, **kwargs: Any) -> None:
"""Handle join/part/quit events."""
if mask and event:
getattr(self, event.lower())(mask.nick, mask, **kwargs)
logger.debug(f"Event {event} for {mask.nick}")
@event(rfc.KICK)
def on_kick(self, mask: Optional[IrcString] = None, target: Optional[IrcString] = None,
**kwargs: Any) -> None:
"""Handle kick events."""
if target:
self.part(target.nick, mask, **kwargs) # Note: Added mask here for consistency
logger.debug(f"User {target.nick} was kicked from a channel")
def join(self, nick: str, mask: IrcString, channel: str, **kwargs: Any) -> None:
"""Process user join."""
channel_obj = self.channels[channel]
if nick != self.context.nick:
channel_obj.add_member(mask.nick)
self.nicks[mask.nick] = mask
self._broadcast(channel_obj, **kwargs)
logger.debug(f"User {nick} joined channel {channel}")
def part(self, nick: str, mask: IrcString, channel: str, **kwargs: Any) -> None:
"""Process user part."""
if nick == self.context.nick:
self.channels.pop(channel, None)
logger.debug(f"Left channel {channel}")
else:
channel_obj = self.channels.get(channel)
if channel_obj:
self._broadcast(channel_obj, **kwargs)
channel_obj.remove_member(nick)
self._cleanup_nick(nick)
logger.debug(f"User {nick} parted from channel {channel}")
def quit(self, nick: str, mask: IrcString, **kwargs: Any) -> None:
"""Process user quit."""
if nick == self.context.nick:
self.connection_lost()
logger.info(f"User {nick} quit, connection lost")
else:
affected = set()
for channel in self.channels.values():
if nick in channel:
channel.remove_member(nick)
affected.update(channel)
self._broadcast(affected, **kwargs)
self._cleanup_nick(nick)
logger.debug(f"User {nick} quit from channels: {affected}")
@event(rfc.NEW_NICK)
def new_nick(self, nick: IrcString, new_nick: str, **kwargs: Any) -> None:
"""Handle nickname changes."""
host = nick.host or ''
self.nicks[new_nick] = IrcString(f"{new_nick}!{host}")
old_nick = nick.nick
self._update_nick_in_channels(old_nick, new_nick)
self.nicks.pop(old_nick, None)
self._broadcast({new_nick}, **kwargs)
logger.debug(f"Nickname change: {old_nick} -> {new_nick}")
def _update_nick_in_channels(self, old_nick: str, new_nick: str) -> None:
"""Update nickname across all channels."""
for channel in self.channels.values():
if old_nick in channel:
channel.remove_member(old_nick)
channel.add_member(new_nick)
for mode_set in channel.modes.values():
if old_nick in mode_set:
mode_set.add(new_nick)
mode_set.discard(old_nick)
logger.debug(f"Updated nickname {old_nick} to {new_nick} in channels")
@event(rfc.RPL_NAMREPLY)
def names(self, channel: str, data: str, **kwargs: Any) -> None:
"""Process NAMES reply with proper mode handling."""
statusmsg = self.context.server_config.get('STATUSMSG', '@+')
prefix_config = self.context.server_config.get('PREFIX', '(ov)@+')
mode_chars, prefixes = self._parse_prefix_config(prefix_config)
prefix_map = dict(zip(prefixes, mode_chars))
channel_obj = self.channels[channel]
for item in data.split():
nick = item.lstrip(statusmsg)
modes = ''.join([prefix_map[p] for p in item[:len(item)-len(nick)]
if p in prefix_map])
channel_obj.add_member(nick, modes)
logger.debug(f"Processed NAMES reply for channel {channel}")
def _parse_prefix_config(self, config: str) -> tuple[str, str]:
"""Parse PREFIX config into mode characters and symbols."""
parts = config.strip('()').split(')')
return (parts[0], parts[1]) if len(parts) == 2 else ('ov', '@+')
@event(rfc.RPL_WHOREPLY)
def who(self, channel: str, nick: str, username: str,
server: str, **kwargs: Any) -> None:
"""Process WHO reply with hostmask."""
mask = IrcString(f"{nick}!{username}@{server}")
self.channels[channel].add_member(nick)
self.nicks[nick] = mask
logger.debug(f"Processed WHO reply for {nick} in {channel}")
@event(rfc.MODE)
def mode(self, target: str, modes: str, data: Any, **kwargs: Any) -> None:
"""Handle mode changes with proper parameter handling."""
chantypes = self.context.server_config.get('CHANTYPES', '#&')
if not target or target[0] not in chantypes:
return
prefix_config = self.context.server_config.get('PREFIX', '(ov)@+')
mode_chars, prefixes = self._parse_prefix_config(prefix_config)
param_modes = self.context.server_config.get('CHANMODES', '').split(',')[0]
parsed = utils.parse_modes(modes, data.split() if isinstance(data, str) else data)
channel = self.channels[target]
for sign, mode, param in parsed:
if mode in param_modes and param:
# Handle parameterized modes (e.g., +b, +k)
pass
elif mode in mode_chars:
prefix = dict(zip(mode_chars, prefixes))
mode_key = prefix.get(mode, '')
if sign == '+':
channel.modes[mode_key].add(param)
else:
channel.modes[mode_key].discard(param)
logger.debug(f"Mode change in {target}: {modes}")
def _cleanup_nick(self, nick: str) -> None:
"""Remove nick if not in any channels."""
if not any(nick in channel for channel in self.channels.values()):
self.nicks.pop(nick, None)
logger.debug(f"Cleaned up nick {nick} as no longer in any channels")
def _broadcast(self, targets: Any, **kwargs: Any) -> None:
"""Placeholder for broadcast functionality."""
logger.debug("Broadcasting to targets: %s", targets)