g1mp/asynchronous.py
2025-02-12 20:55:42 -08:00

123 lines
3.7 KiB
Python

# -*- coding: utf-8 -*-
from irc3.compat import asyncio
import re
class event:
iotype = 'in'
iscoroutine = True
def __init__(self, **kwargs):
# kwargs get interpolated into the regex.
# Any kwargs not ending in _re get escaped
self.meta = kwargs.get('meta')
regexp = self.meta['match'].format(**{
k: v if k.endswith('_re') else re.escape(v)
for (k, v) in kwargs.items()
if k != 'meta'
})
self.regexp = regexp
regexp = getattr(self.regexp, 're', self.regexp)
self.cregexp = re.compile(regexp).match
def compile(self, *args, **kwargs):
return self.cregexp
def __repr__(self):
s = getattr(self.regexp, 'name', self.regexp)
name = self.__class__.__name__
return '<temp_event {0} {1}>'.format(name, s)
def __call__(self, callback):
async def wrapper(*args, **kwargs):
# Ensure callback is awaited if it's an async function
if asyncio.iscoroutinefunction(callback):
return await callback(self, *args, **kwargs)
else:
return callback(self, *args, **kwargs)
self.callback = wrapper
return self
def default_result_processor(self, results=None, **value): # pragma: no cover
value['results'] = results
if len(results) == 1:
value.update(results[0])
return value
def async_events(context, events, send_line=None,
process_results=default_result_processor,
timeout=30, **params):
loop = context.loop
task = loop.create_future() # async result
results = [] # store events results
events_ = [] # reference registered events
# async timeout
timeout = asyncio.ensure_future(
asyncio.sleep(timeout, loop=loop), loop=loop)
def end(t=None):
"""t can be a future (timeout done) or False (result success)"""
if not task.done():
# cancel timeout if needed
if t is False:
timeout.cancel()
# detach events
context.detach_events(*events_)
# clean refs
events_[:] = []
# set results
task.set_result(process_results(results=results, timeout=bool(t)))
# end on timeout
timeout.add_done_callback(end)
def callback(e, **kw):
"""common callback for all events"""
results.append(kw)
if e.meta.get('multi') is not True:
context.detach_events(e)
if e in events_:
events_.remove(e)
if e.meta.get('final') is True:
# end on success
end(False)
events_.extend([event(meta=kw, **params)(callback) for kw in events])
context.attach_events(*events_, insert=True)
if send_line:
context.send_line(send_line.format(**params))
return task
class AsyncEvents:
"""Asynchronious events"""
timeout = 30
send_line = None
events = []
def __init__(self, context):
self.context = context
def process_results(self, results=None, **value): # pragma: no cover
"""Process results.
results is a list of dict catched during event.
value is a dict containing some metadata (like timeout=(True/False).
"""
return default_result_processor(results=results, **value)
def __call__(self, **kwargs):
"""Register events; and callbacks then return a `asyncio.Future`.
Events regexp are compiled with `params`"""
kwargs.setdefault('timeout', self.timeout)
kwargs.setdefault('send_line', self.send_line)
kwargs['process_results'] = self.process_results
return async_events(self.context, self.events, **kwargs)