MapAsync

Definition

class broqer.op.MapAsync(coro, *args, mode=<MODE.CONCURRENT: 1>, error_callback=<broqer.default_error_handler.DefaultErrorHandler object>, unpack: bool = False, **kwargs)[source]

Apply coro to each emitted value allowing async processing

Parameters:
  • coro – coroutine to be applied on emit
  • *args – variable arguments to be used for calling coro
  • mode – behavior when a value is currently processed
  • error_callback – error callback to be registered
  • unpack – value from emits will be unpacked as (*value)
  • **kwargs – keyword arguments to be used for calling coro
Variables:

scheduled – Publisher emitting the value when coroutine is actually started.

Usage

Apply coro to each emitted value allowing async processing

Usage:

>>> import asyncio
>>> from broqer import Subject, op
>>> s = Subject()
>>> async def delay_add(a):
...     print('Starting with argument', a)
...     await asyncio.sleep(0.015)
...     result = a + 1
...     print('Finished with argument', a)
...     return result

MODE: CONCURRENT (is default)

>>> _d = s | op.MapAsync(delay_add) | op.Sink()
>>> s.emit(0)
>>> s.emit(1)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.02))
Starting with argument 0
Starting with argument 1
Finished with argument 0
Finished with argument 1
>>> _d.dispose()

MODE: INTERRUPT

>>> _d = s | op.MapAsync(delay_add, mode=op.MODE.INTERRUPT) | op.Sink(print)
>>> s.emit(0)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.005))
Starting with argument 0
>>> s.emit(1)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.02))
Starting with argument 1
Finished with argument 1
2
>>> _d.dispose()

MODE: QUEUE

>>> _d = s | op.MapAsync(delay_add, mode=op.MODE.QUEUE) | op.Sink(print)
>>> s.emit(0)
>>> s.emit(1)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.04))
Starting with argument 0
Finished with argument 0
1
Starting with argument 1
Finished with argument 1
2
>>> _d.dispose()

MODE: LAST

>>> _d = s | op.MapAsync(delay_add, mode=op.MODE.LAST) | op.Sink(print)
>>> s.emit(0)
>>> s.emit(1)
>>> s.emit(2)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.04))
Starting with argument 0
Finished with argument 0
1
Starting with argument 2
Finished with argument 2
3
>>> _d.dispose()

MODE: SKIP

>>> _d = s | op.MapAsync(delay_add, mode=op.MODE.SKIP) | op.Sink(print)
>>> s.emit(0)
>>> s.emit(1)
>>> s.emit(2)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.04))
Starting with argument 0
Finished with argument 0
1
>>> _d.dispose()

Using error_callback:

>>> def cb(*e):
...     print('Got error')
>>> _d = s | op.MapAsync(delay_add, error_callback=cb) | op.Sink(print)
>>> s.emit('abc')
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.02))
Starting with argument abc
Got error
>>> _d.dispose()