g1mp/plugins/asynchronious.py

479 lines
14 KiB
Python
Raw Normal View History

2025-02-13 04:55:42 +00:00
# -*- coding: utf-8 -*-
from collections import OrderedDict
import re
from irc3.asynchronous import AsyncEvents # Corrected import path
from irc3 import utils
from irc3 import dec
__doc__ = """
======================================================
:mod:`irc3.plugins.asynchronious` Asynchronious events
======================================================
This module provide a way to catch data from various predefined events.
Usage
=====
You'll have to define a subclass of :class:`~irc3.asynchronous.AsyncEvents`:
.. literalinclude:: ../../irc3/plugins/asynchronious.py
:pyobject: Whois
Notice that regexps and send_line contains some `{nick}`. This will be
substitued later with the keyword arguments passed to the instance.
Then you're able to use it in a plugin:
.. code-block:: py
class MyPlugin:
def __init__(self, bot):
self.bot = bot
self.whois = Whois(bot)
def do_whois(self):
# remember {nick} in the regexp? Here it is
whois = await self.whois(nick='gawel')
if int(whois['idle']) / 60 > 10:
self.bot.privmsg('gawel', 'Wake up dude')
.. warning::
Your code should always check if the result has been set before timeout by
using `result['timeout']` which is True when the bot failed to get a result
before 30s (you can override the default value per call)
.. warning::
Do not over use this feature. If you're making a lot of calls at the same
time you should experience some weird behavior since irc do not allow
to identify responses for a command. That's why the exemple use {nick} in
the regexp to filter events efficiently. But two concurent call for the
same nick can still fail.
API
===
.. autoclass:: irc3.asynchronous.AsyncEvents
:members: process_results, __call__
.. autoclass:: Async
:members:
"""
class Whois(AsyncEvents):
"""Asynchronously handle WHOIS responses from the IRC server."""
# Command timeout in seconds
timeout = 20
# Line sent to trigger WHOIS
send_line = 'WHOIS {nick} {nick}'
# Regex patterns to match server responses
events = (
{'match': r"(?i)^:\S+ 301 \S+ {nick} :(?P<away>.*)"},
{
'match': (
r"(?i)^:\S+ 311 \S+ {nick} (?P<username>\S+) "
r"(?P<host>\S+) . :(?P<realname>.*)"
)
},
{
'match': (
r"(?i)^:\S+ 312 \S+ {nick} (?P<server>\S+) "
r":(?P<server_desc>.*)"
)
},
{'match': r"(?i)^:\S+ 317 \S+ {nick} (?P<idle>[0-9]+).*"},
{
'match': r"(?i)^:\S+ 319 \S+ {nick} :(?P<channels>.*)",
'multi': True
},
{
'match': (
r"(?i)^:\S+ 330 \S+ {nick} (?P<account>\S+) "
r":(?P<account_desc>.*)"
)
},
{'match': r"(?i)^:\S+ 671 \S+ {nick} :(?P<connection>.*)"},
{
'match': (
r"(?i)^:\S+ (?P<retcode>(318|401)) \S+ (?P<nick>{nick}) :.*"
),
'final': True
},
)
def process_results(self, results=None, **value):
"""Process WHOIS results into a structured dictionary.
Args:
results (list): List of event results.
**value: Accumulated results.
Returns:
dict: Processed WHOIS data with channels, success flag, etc.
"""
channels = []
for res in results:
channels.extend(res.pop('channels', '').split())
value.update(res)
value['channels'] = channels
value['success'] = value.get('retcode') == '318'
return value
class WhoChannel(AsyncEvents):
"""Handle WHO responses for a channel."""
send_line = 'WHO {channel}'
events = (
{
'match': (
r"(?i)^:\S+ 352 \S+ {channel} (?P<user>\S+) "
r"(?P<host>\S+) (?P<server>\S+) (?P<nick>\S+) "
r"(?P<modes>\S+) :(?P<hopcount>\S+) (?P<realname>.*)"
),
'multi': True
},
{
'match': r"(?i)^:\S+ (?P<retcode>(315|401)) \S+ {channel} :.*",
'final': True
},
)
def process_results(self, results=None, **value):
"""Process WHO channel results into a user list."""
users = []
for res in results:
if 'retcode' in res:
value.update(res)
else:
res['mask'] = utils.IrcString('{nick}!{user}@{host}'.format(**res))
users.append(res)
value['users'] = users
value['success'] = value.get('retcode') == '315'
return value
class WhoChannelFlags(AsyncEvents):
"""Handle WHO responses with specific flags for a channel."""
flags = OrderedDict([
("u", r"(?P<user>\S+)"),
("i", r"(?P<ip>\S+)"),
("h", r"(?P<host>\S+)"),
("s", r"(?P<server>\S+)"),
("n", r"(?P<nick>\S+)"),
("a", r"(?P<account>\S+)"),
("r", r":(?P<realname>.*)"),
])
send_line = "WHO {channel} c%{flags}"
events = (
{
'match': r"(?i)^:\S+ (?P<retcode>(315|401)) \S+ {channel} :.*",
'final': True
},
)
def process_results(self, results=None, **value):
"""Process WHO results with flags into a user list."""
users = []
for res in results:
if 'retcode' in res:
value.update(res)
else:
if res.get('account') == '0':
res['account'] = None
users.append(res)
value['users'] = users
value['success'] = value.get('retcode') == '315'
return value
class WhoNick(AsyncEvents):
"""Handle WHO responses for a specific nickname."""
send_line = 'WHO {nick}'
events = (
{
'match': (
r"(?i)^:\S+ 352 \S+ (?P<channel>\S+) (?P<user>\S+) "
r"(?P<host>\S+) (?P<server>\S+) (?P<nick>{nick}) "
r"(?P<modes>\S+) :(?P<hopcount>\S+)\s*(?P<realname>.*)"
)
},
{
'match': r"(?i)^:\S+ (?P<retcode>(315|401)) \S+ {nick} :.*",
'final': True
},
)
def process_results(self, results=None, **value):
"""Process WHO nickname results into user data."""
for res in results:
if 'retcode' not in res:
res['mask'] = utils.IrcString('{nick}!{user}@{host}'.format(**res))
value.update(res)
value['success'] = value.get('retcode') == '315'
return value
class IsOn(AsyncEvents):
"""Handle ISON responses to check nickname presence."""
events = (
{
'match': (
r"(?i)^:\S+ 303 \S+ :(?P<nicknames>({nicknames_re}.*|$))"
),
'final': True
},
)
def process_results(self, results=None, **value):
"""Extract nicknames from ISON results."""
nicknames = []
for res in results:
nicknames.extend(res.pop('nicknames', '').split())
value['names'] = nicknames
return value
class Topic(AsyncEvents):
"""Handle TOPIC commands to get or set a channel topic."""
send_line = 'TOPIC {channel}{topic}'
events = (
{
'match': (
r"(?i)^:\S+ (?P<retcode>(331|332|TOPIC))"
r"(:?\s+\S+\s+|\s+){channel} :(?P<topic>.*)"
),
'final': True
},
)
def process_results(self, results=None, **value):
"""Determine the topic from server response."""
for res in results:
status = res.get('retcode', '')
if status.upper() in ('332', 'TOPIC'):
value['topic'] = res.get('topic')
else:
value['topic'] = None
return value
class Names(AsyncEvents):
"""Handle NAMES responses to list users in a channel."""
send_line = 'NAMES {channel}'
events = (
{
'match': r"(?i)^:\S+ 353 .*{channel} :(?P<nicknames>.*)",
'multi': True
},
{
'match': r"(?i)^:\S+ (?P<retcode>(366|401)) \S+ {channel} :.*",
'final': True
},
)
def process_results(self, results=None, **value):
"""Aggregate nicknames from NAMES responses."""
nicknames = []
for res in results:
nicknames.extend(res.pop('nicknames', '').split())
value['names'] = nicknames
value['success'] = value.get('retcode') == '366'
return value
class ChannelBans(AsyncEvents):
"""Handle MODE +b responses to list channel bans."""
send_line = 'MODE {channel} +b'
events = (
{
'match': (
r"(?i)^:\S+ 367 \S+ {channel} (?P<mask>\S+) "
r"(?P<user>\S+) (?P<timestamp>\d+)"
),
'multi': True
},
{
'match': r"(?i)^:\S+ 368 \S+ {channel} :.*",
'final': True
},
)
def process_results(self, results=None, **value):
"""Compile ban entries from server responses."""
bans = []
for res in results:
if not res:
continue # Skip empty results
res['timestamp'] = int(res['timestamp'])
bans.append(res)
value['bans'] = bans
return value
class CTCP(AsyncEvents):
"""Handle CTCP commands and responses."""
send_line = 'PRIVMSG {nick} :\x01{ctcp}\x01'
events = (
{
'match': (
r"(?i):(?P<mask>\S+) NOTICE \S+ :\x01(?P<ctcp>\S+) "
r"(?P<reply>.*)\x01"
),
'final': True
},
{
'match': r"(?i)^:\S+ (?P<retcode>486) \S+ :(?P<reply>.*)",
'final': True
}
)
def process_results(self, results=None, **value):
"""Extract CTCP reply data from responses."""
for res in results:
if 'mask' in res:
res['mask'] = utils.IrcString(res['mask'])
value['success'] = res.pop('retcode', None) != '486'
value.update(res)
return value
@dec.plugin
class Async:
"""Provide asynchronous commands for IRC interactions.
Extends the bot with methods using AsyncEvents for handling server responses.
"""
def __init__(self, context):
self.context = context
self.context.async_cmds = self
self.async_whois = Whois(context)
self.async_who_channel = WhoChannel(context)
self.async_who_nick = WhoNick(context)
self.async_topic = Topic(context)
self.async_ison = IsOn(context)
self.async_names = Names(context)
self.async_channel_bans = ChannelBans(context)
self.async_ctcp = CTCP(context)
async def send_message(self, target, message):
"""Send a message asynchronously"""
self.context.privmsg(target, message)
def async_who_channel_flags(self, channel, flags, timeout):
"""Create a dynamic WHO command with flags for channel user details."""
flags = ''.join([f.lower() for f in WhoChannelFlags.flags if f in flags])
regex = [WhoChannelFlags.flags[f] for f in flags]
channel = channel.lower()
cls = type(
WhoChannelFlags.__name__,
(WhoChannelFlags,),
{
"events": WhoChannelFlags.events + (
{
"match": (
r"(?i)^:\S+ 354 \S+ {0}".format(' '.join(regex))
),
"multi": True
},
)
}
)
return cls(self.context)(channel=channel, flags=flags, timeout=timeout)
@dec.extend
def whois(self, nick, timeout=20):
"""Send a WHOIS and return a Future with received data.
Example:
result = await bot.async_cmds.whois('gawel')
"""
return self.async_whois(nick=nick.lower(), timeout=timeout)
@dec.extend
def who(self, target, flags=None, timeout=20):
"""Send a WHO and return a Future with received data.
Examples:
result = await bot.async_cmds.who('gawel')
result = await bot.async_cmds.who('#irc3', 'an')
"""
target = target.lower()
if target.startswith('#'):
if flags:
return self.async_who_channel_flags(
channel=target, flags=flags, timeout=timeout
)
return self.async_who_channel(channel=target, timeout=timeout)
else:
return self.async_who_nick(nick=target, timeout=timeout)
def topic(self, channel, topic=None, timeout=20):
"""Get or set the topic for a channel."""
if not topic:
topic = ''
else:
topic = ' ' + topic.strip()
return self.async_topic(channel=channel, topic=topic, timeout=timeout)
@dec.extend
def ison(self, *nicknames, **kwargs):
"""Send ISON to check online status of nicknames.
Example:
result = await bot.async_cmds.ison('gawel', 'irc3')
"""
nicknames = [n.lower() for n in nicknames]
self.context.send_line(f'ISON :{" ".join(nicknames)}')
nicknames_re = '(%s)' % '|'.join(re.escape(n) for n in nicknames)
return self.async_ison(nicknames_re=nicknames_re, **kwargs)
@dec.extend
def names(self, channel, timeout=20):
"""Send NAMES to list users in a channel.
Example:
result = await bot.async_cmds.names('#irc3')
"""
return self.async_names(channel=channel.lower(), timeout=timeout)
@dec.extend
def channel_bans(self, channel, timeout=20):
"""List channel bans via MODE +b.
Example:
result = await bot.async_cmds.channel_bans('#irc3')
"""
return self.async_channel_bans(channel=channel.lower(), timeout=timeout)
@dec.extend
def ctcp_async(self, nick, ctcp, timeout=20):
"""Send a CTCP request and return a Future with the reply.
Example:
result = await bot.async_cmds.ctcp('irc3', 'VERSION')
"""
return self.async_ctcp(nick=nick, ctcp=ctcp.upper(), timeout=timeout)