Source code for broqer.op.map_async

"""
Apply ``coro`` to each emitted value allowing async processing

Usage:

>>> import asyncio
>>> from broqer import Value, Sink, op
>>> s = Value()

>>> async def delay_add(a):
...     print('Starting with argument', a)
...     await asyncio.sleep(0.015)
...     result = a + 1
...     print('Finished with argument', a)
...     return result

AsyncMode: CONCURRENT (is default)

>>> s.emit(0)
>>> _d = (s | op.MapAsync(delay_add)).subscribe(Sink())
>>> s.emit(1)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.02))
Starting with argument 0
Starting with argument 1
Finished with argument 0
Finished with argument 1
>>> _d.dispose()

AsyncMode: INTERRUPT

>>> s.emit(0)
>>> o = (s | op.MapAsync(delay_add, mode=op.AsyncMode.INTERRUPT))
>>> _d = o.subscribe(Sink(print))
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.005))
Starting with argument 0
>>> s.emit(1)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.02))
Starting with argument 1
Finished with argument 1
2
>>> _d.dispose()

AsyncMode: QUEUE

>>> s.emit(0)
>>> o = (s | op.MapAsync(delay_add, mode=op.AsyncMode.QUEUE))
>>> _d = o.subscribe(Sink(print))
>>> s.emit(1)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.04))
Starting with argument 0
Finished with argument 0
1
Starting with argument 1
Finished with argument 1
2
>>> _d.dispose()

AsyncMode: LAST

>>> s.emit(0)
>>> o = (s | op.MapAsync(delay_add, mode=op.AsyncMode.LAST))
>>> _d = o.subscribe(Sink(print))
>>> s.emit(1)
>>> s.emit(2)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.04))
Starting with argument 0
Finished with argument 0
1
Starting with argument 2
Finished with argument 2
3
>>> _d.dispose()

AsyncMode: SKIP

>>> s.emit(0)
>>> o = (s | op.MapAsync(delay_add, mode=op.AsyncMode.SKIP))
>>> _d = o.subscribe(Sink(print))
>>> s.emit(1)
>>> s.emit(2)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.04))
Starting with argument 0
Finished with argument 0
1
>>> _d.dispose()

Using error_callback:

>>> def cb(*e):
...     print('Got error')

>>> s.emit('abc')
>>> _d = (s | op.MapAsync(delay_add, error_callback=cb)).subscribe(Sink(print))
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.02))
Starting with argument abc
Got error
>>> _d.dispose()
"""
import asyncio
import sys
from functools import wraps
from typing import Any  # noqa: F401

# pylint: disable=cyclic-import
from broqer.operator import Operator
from broqer.coro_queue import CoroQueue, AsyncMode, wrap_coro
from broqer import Publisher, default_error_handler, NONE


[docs]class MapAsync(Operator): # pylint: disable=too-many-instance-attributes """ Apply ``coro(*args, value, **kwargs)`` to each emitted value allow async processing. :param coro: coroutine to be applied on emit :param \\*args: variable arguments to be used for calling coro :param mode: behavior when a value is currently processed :param error_callback: error callback to be registered :param unpack: value from emits will be unpacked as (\\*value) :param \\*\\*kwargs: keyword arguments to be used for calling coro :ivar scheduled: Publisher emitting the value when coroutine is actually started. """ def __init__(self, coro, *args, mode=AsyncMode.CONCURRENT, error_callback=default_error_handler, unpack: bool = False, **kwargs ) -> None: Operator.__init__(self) _coro = wrap_coro(coro, unpack, *args, **kwargs) self._coro_queue = CoroQueue(_coro, mode=mode) self._error_callback = error_callback def emit(self, value: Any, who: Publisher) -> None: if who is not self._originator: raise ValueError('Emit from non assigned publisher') future = self._coro_queue.schedule(value) future.add_done_callback(self._done) def _done(self, future: asyncio.Future): try: result = future.result() except Exception: # pylint: disable=broad-except self._error_callback(*sys.exc_info()) else: if result != NONE: Publisher.notify(self, result)
def build_map_async(coro=None, *, mode: AsyncMode = AsyncMode.CONCURRENT, error_callback=default_error_handler, unpack: bool = False): """ Decorator to wrap a function to return a Map operator. :param coro: coroutine to be wrapped :param mode: behavior when a value is currently processed :param error_callback: error callback to be registered :param unpack: value from emits will be unpacked (*value) """ def _build_map_async(coro): return MapAsync(coro, mode=mode, error_callback=error_callback, unpack=unpack) if coro: return _build_map_async(coro) return _build_map_async def build_map_async_factory(coro=None, *, mode: AsyncMode = AsyncMode.CONCURRENT, error_callback=default_error_handler, unpack: bool = False): """ Decorator to wrap a coroutine to return a factory for MapAsync operators. :param coro: coroutine to be wrapped :param mode: behavior when a value is currently processed :param error_callback: error callback to be registered :param unpack: value from emits will be unpacked (*value) """ _mode = mode def _build_map_async(coro): @wraps(coro) def _wrapper(*args, mode=None, **kwargs) -> MapAsync: if ('unpack' in kwargs) or ('error_callback' in kwargs): raise TypeError('"unpack" and "error_callback" has to ' 'be defined by decorator') if mode is None: mode = _mode return MapAsync(coro, *args, mode=mode, unpack=unpack, error_callback=error_callback, **kwargs) return _wrapper if coro: return _build_map_async(coro) return _build_map_async