MapAsync

Definition

class broqer.op.MapAsync(coro, *args, mode=<AsyncMode.CONCURRENT: 1>, error_callback=<broqer.error_handler.DefaultErrorHandler object>, unpack: bool = False, **kwargs)[source]

Apply coro(*args, value, **kwargs) to each emitted value allow async processing.

Parameters:
  • coro – coroutine to be applied on emit
  • *args – variable arguments to be used for calling coro
  • mode – behavior when a value is currently processed
  • error_callback – error callback to be registered
  • unpack – value from emits will be unpacked as (*value)
  • **kwargs – keyword arguments to be used for calling coro
Variables:

scheduled – Publisher emitting the value when coroutine is actually started.

Usage

Apply coro to each emitted value allowing async processing

Usage:

>>> import asyncio
>>> from broqer import Value, Sink, op
>>> s = Value()
>>> async def delay_add(a):
...     print('Starting with argument', a)
...     await asyncio.sleep(0.015)
...     result = a + 1
...     print('Finished with argument', a)
...     return result

AsyncMode: CONCURRENT (is default)

>>> s.emit(0)
>>> _d = (s | op.MapAsync(delay_add)).subscribe(Sink())
>>> s.emit(1)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.02))
Starting with argument 0
Starting with argument 1
Finished with argument 0
Finished with argument 1
>>> _d.dispose()

AsyncMode: INTERRUPT

>>> s.emit(0)
>>> o = (s | op.MapAsync(delay_add, mode=op.AsyncMode.INTERRUPT))
>>> _d = o.subscribe(Sink(print))
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.005))
Starting with argument 0
>>> s.emit(1)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.02))
Starting with argument 1
Finished with argument 1
2
>>> _d.dispose()

AsyncMode: QUEUE

>>> s.emit(0)
>>> o = (s | op.MapAsync(delay_add, mode=op.AsyncMode.QUEUE))
>>> _d = o.subscribe(Sink(print))
>>> s.emit(1)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.04))
Starting with argument 0
Finished with argument 0
1
Starting with argument 1
Finished with argument 1
2
>>> _d.dispose()

AsyncMode: LAST

>>> s.emit(0)
>>> o = (s | op.MapAsync(delay_add, mode=op.AsyncMode.LAST))
>>> _d = o.subscribe(Sink(print))
>>> s.emit(1)
>>> s.emit(2)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.04))
Starting with argument 0
Finished with argument 0
1
Starting with argument 2
Finished with argument 2
3
>>> _d.dispose()

AsyncMode: SKIP

>>> s.emit(0)
>>> o = (s | op.MapAsync(delay_add, mode=op.AsyncMode.SKIP))
>>> _d = o.subscribe(Sink(print))
>>> s.emit(1)
>>> s.emit(2)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.04))
Starting with argument 0
Finished with argument 0
1
>>> _d.dispose()

Using error_callback:

>>> def cb(*e):
...     print('Got error')
>>> s.emit('abc')
>>> _d = (s | op.MapAsync(delay_add, error_callback=cb)).subscribe(Sink(print))
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.02))
Starting with argument abc
Got error
>>> _d.dispose()