203 lines
8.6 KiB
Python
203 lines
8.6 KiB
Python
from __future__ import annotations
|
|
import logging
|
|
from irc3 import plugin, utils, rfc
|
|
from irc3.dec import event
|
|
from irc3.utils import IrcString
|
|
from collections import defaultdict
|
|
from typing import Optional, Set, Dict, Any, cast
|
|
|
|
# Set up logging
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class Channel(set):
|
|
"""Represents an IRC channel with member management and mode tracking."""
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
self.modes: Dict[str, Set[str]] = defaultdict(set)
|
|
self.topic: Optional[str] = None
|
|
|
|
def add_member(self, nick: str, modes: str = '') -> None:
|
|
"""Add member with optional modes."""
|
|
super().add(nick)
|
|
self._update_modes(nick, modes, add=True)
|
|
logger.debug(f"Added member {nick} with modes {modes} to channel")
|
|
|
|
def remove_member(self, nick: str) -> None:
|
|
"""Remove member and all associated modes."""
|
|
super().discard(nick)
|
|
self._update_modes(nick, remove=True)
|
|
logger.debug(f"Removed member {nick} from channel")
|
|
|
|
def _update_modes(self, nick: str, modes: str = '',
|
|
add: bool = False, remove: bool = False) -> None:
|
|
"""Update mode tracking for a member."""
|
|
for mode in modes:
|
|
if add:
|
|
self.modes[mode].add(nick)
|
|
elif remove:
|
|
self.modes[mode].discard(nick)
|
|
logger.debug(f"Updated modes for {nick}: {'Added' if add else 'Removed'} {modes}")
|
|
|
|
def __repr__(self) -> str:
|
|
return f"Channel({sorted(self)})"
|
|
|
|
@plugin
|
|
class Userlist:
|
|
"""Enhanced user tracking with proper hostmask handling and mode management."""
|
|
|
|
def __init__(self, context: Any) -> None:
|
|
self.context = context
|
|
self.connection_lost()
|
|
|
|
def connection_lost(self, client: Any = None) -> None:
|
|
"""Reset state on connection loss."""
|
|
self.channels: Dict[str, Channel] = defaultdict(Channel)
|
|
self.nicks: Dict[str, IrcString] = {}
|
|
self.context.channels = self.channels
|
|
self.context.nicks = self.nicks
|
|
logger.info("Connection lost, state reset.")
|
|
|
|
@event(rfc.JOIN_PART_QUIT)
|
|
def on_join_part_quit(self, mask: Optional[IrcString] = None,
|
|
event: Optional[str] = None, **kwargs: Any) -> None:
|
|
"""Handle join/part/quit events."""
|
|
if mask and event:
|
|
getattr(self, event.lower())(mask.nick, mask, **kwargs)
|
|
logger.debug(f"Event {event} for {mask.nick}")
|
|
|
|
@event(rfc.KICK)
|
|
def on_kick(self, mask: Optional[IrcString] = None, target: Optional[IrcString] = None,
|
|
**kwargs: Any) -> None:
|
|
"""Handle kick events."""
|
|
if target:
|
|
self.part(target.nick, mask, **kwargs) # Note: Added mask here for consistency
|
|
logger.debug(f"User {target.nick} was kicked from a channel")
|
|
|
|
def join(self, nick: str, mask: IrcString, channel: str, **kwargs: Any) -> None:
|
|
"""Process user join."""
|
|
channel_obj = self.channels[channel]
|
|
if nick != self.context.nick:
|
|
channel_obj.add_member(mask.nick)
|
|
self.nicks[mask.nick] = mask
|
|
self._broadcast(channel_obj, **kwargs)
|
|
logger.debug(f"User {nick} joined channel {channel}")
|
|
|
|
def part(self, nick: str, mask: IrcString, channel: str, **kwargs: Any) -> None:
|
|
"""Process user part."""
|
|
if nick == self.context.nick:
|
|
self.channels.pop(channel, None)
|
|
logger.debug(f"Left channel {channel}")
|
|
else:
|
|
channel_obj = self.channels.get(channel)
|
|
if channel_obj:
|
|
self._broadcast(channel_obj, **kwargs)
|
|
channel_obj.remove_member(nick)
|
|
self._cleanup_nick(nick)
|
|
logger.debug(f"User {nick} parted from channel {channel}")
|
|
|
|
def quit(self, nick: str, mask: IrcString, **kwargs: Any) -> None:
|
|
"""Process user quit."""
|
|
if nick == self.context.nick:
|
|
self.connection_lost()
|
|
logger.info(f"User {nick} quit, connection lost")
|
|
else:
|
|
affected = set()
|
|
for channel in self.channels.values():
|
|
if nick in channel:
|
|
channel.remove_member(nick)
|
|
affected.update(channel)
|
|
self._broadcast(affected, **kwargs)
|
|
self._cleanup_nick(nick)
|
|
logger.debug(f"User {nick} quit from channels: {affected}")
|
|
|
|
@event(rfc.NEW_NICK)
|
|
def new_nick(self, nick: IrcString, new_nick: str, **kwargs: Any) -> None:
|
|
"""Handle nickname changes."""
|
|
host = nick.host or ''
|
|
self.nicks[new_nick] = IrcString(f"{new_nick}!{host}")
|
|
old_nick = nick.nick
|
|
self._update_nick_in_channels(old_nick, new_nick)
|
|
self.nicks.pop(old_nick, None)
|
|
self._broadcast({new_nick}, **kwargs)
|
|
logger.debug(f"Nickname change: {old_nick} -> {new_nick}")
|
|
|
|
def _update_nick_in_channels(self, old_nick: str, new_nick: str) -> None:
|
|
"""Update nickname across all channels."""
|
|
for channel in self.channels.values():
|
|
if old_nick in channel:
|
|
channel.remove_member(old_nick)
|
|
channel.add_member(new_nick)
|
|
for mode_set in channel.modes.values():
|
|
if old_nick in mode_set:
|
|
mode_set.add(new_nick)
|
|
mode_set.discard(old_nick)
|
|
logger.debug(f"Updated nickname {old_nick} to {new_nick} in channels")
|
|
|
|
@event(rfc.RPL_NAMREPLY)
|
|
def names(self, channel: str, data: str, **kwargs: Any) -> None:
|
|
"""Process NAMES reply with proper mode handling."""
|
|
statusmsg = self.context.server_config.get('STATUSMSG', '@+')
|
|
prefix_config = self.context.server_config.get('PREFIX', '(ov)@+')
|
|
mode_chars, prefixes = self._parse_prefix_config(prefix_config)
|
|
prefix_map = dict(zip(prefixes, mode_chars))
|
|
|
|
channel_obj = self.channels[channel]
|
|
for item in data.split():
|
|
nick = item.lstrip(statusmsg)
|
|
modes = ''.join([prefix_map[p] for p in item[:len(item)-len(nick)]
|
|
if p in prefix_map])
|
|
channel_obj.add_member(nick, modes)
|
|
logger.debug(f"Processed NAMES reply for channel {channel}")
|
|
|
|
def _parse_prefix_config(self, config: str) -> tuple[str, str]:
|
|
"""Parse PREFIX config into mode characters and symbols."""
|
|
parts = config.strip('()').split(')')
|
|
return (parts[0], parts[1]) if len(parts) == 2 else ('ov', '@+')
|
|
|
|
@event(rfc.RPL_WHOREPLY)
|
|
def who(self, channel: str, nick: str, username: str,
|
|
server: str, **kwargs: Any) -> None:
|
|
"""Process WHO reply with hostmask."""
|
|
mask = IrcString(f"{nick}!{username}@{server}")
|
|
self.channels[channel].add_member(nick)
|
|
self.nicks[nick] = mask
|
|
logger.debug(f"Processed WHO reply for {nick} in {channel}")
|
|
|
|
@event(rfc.MODE)
|
|
def mode(self, target: str, modes: str, data: Any, **kwargs: Any) -> None:
|
|
"""Handle mode changes with proper parameter handling."""
|
|
chantypes = self.context.server_config.get('CHANTYPES', '#&')
|
|
if not target or target[0] not in chantypes:
|
|
return
|
|
|
|
prefix_config = self.context.server_config.get('PREFIX', '(ov)@+')
|
|
mode_chars, prefixes = self._parse_prefix_config(prefix_config)
|
|
param_modes = self.context.server_config.get('CHANMODES', '').split(',')[0]
|
|
|
|
parsed = utils.parse_modes(modes, data.split() if isinstance(data, str) else data)
|
|
channel = self.channels[target]
|
|
|
|
for sign, mode, param in parsed:
|
|
if mode in param_modes and param:
|
|
# Handle parameterized modes (e.g., +b, +k)
|
|
pass
|
|
elif mode in mode_chars:
|
|
prefix = dict(zip(mode_chars, prefixes))
|
|
mode_key = prefix.get(mode, '')
|
|
if sign == '+':
|
|
channel.modes[mode_key].add(param)
|
|
else:
|
|
channel.modes[mode_key].discard(param)
|
|
logger.debug(f"Mode change in {target}: {modes}")
|
|
|
|
def _cleanup_nick(self, nick: str) -> None:
|
|
"""Remove nick if not in any channels."""
|
|
if not any(nick in channel for channel in self.channels.values()):
|
|
self.nicks.pop(nick, None)
|
|
logger.debug(f"Cleaned up nick {nick} as no longer in any channels")
|
|
|
|
def _broadcast(self, targets: Any, **kwargs: Any) -> None:
|
|
"""Placeholder for broadcast functionality."""
|
|
logger.debug("Broadcasting to targets: %s", targets) |