MapAsync¶
Definition¶
-
class
broqer.op.
MapAsync
(coro, *args, mode=<AsyncMode.CONCURRENT: 1>, error_callback=<broqer.error_handler.DefaultErrorHandler object>, unpack: bool = False, **kwargs)[source]¶ Apply
coro(*args, value, **kwargs)
to each emitted value allow async processing.Parameters: - coro – coroutine to be applied on emit
- *args – variable arguments to be used for calling coro
- mode – behavior when a value is currently processed
- error_callback – error callback to be registered
- unpack – value from emits will be unpacked as (*value)
- **kwargs – keyword arguments to be used for calling coro
Variables: scheduled – Publisher emitting the value when coroutine is actually started.
Usage¶
Apply coro
to each emitted value allowing async processing
Usage:
>>> import asyncio
>>> from broqer import Value, Sink, op
>>> s = Value()
>>> async def delay_add(a):
... print('Starting with argument', a)
... await asyncio.sleep(0.015)
... result = a + 1
... print('Finished with argument', a)
... return result
AsyncMode: CONCURRENT (is default)
>>> s.emit(0)
>>> _d = (s | op.MapAsync(delay_add)).subscribe(Sink())
>>> s.emit(1)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.02))
Starting with argument 0
Starting with argument 1
Finished with argument 0
Finished with argument 1
>>> _d.dispose()
AsyncMode: INTERRUPT
>>> s.emit(0)
>>> o = (s | op.MapAsync(delay_add, mode=op.AsyncMode.INTERRUPT))
>>> _d = o.subscribe(Sink(print))
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.005))
Starting with argument 0
>>> s.emit(1)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.02))
Starting with argument 1
Finished with argument 1
2
>>> _d.dispose()
AsyncMode: QUEUE
>>> s.emit(0)
>>> o = (s | op.MapAsync(delay_add, mode=op.AsyncMode.QUEUE))
>>> _d = o.subscribe(Sink(print))
>>> s.emit(1)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.04))
Starting with argument 0
Finished with argument 0
1
Starting with argument 1
Finished with argument 1
2
>>> _d.dispose()
AsyncMode: LAST
>>> s.emit(0)
>>> o = (s | op.MapAsync(delay_add, mode=op.AsyncMode.LAST))
>>> _d = o.subscribe(Sink(print))
>>> s.emit(1)
>>> s.emit(2)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.04))
Starting with argument 0
Finished with argument 0
1
Starting with argument 2
Finished with argument 2
3
>>> _d.dispose()
AsyncMode: SKIP
>>> s.emit(0)
>>> o = (s | op.MapAsync(delay_add, mode=op.AsyncMode.SKIP))
>>> _d = o.subscribe(Sink(print))
>>> s.emit(1)
>>> s.emit(2)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.04))
Starting with argument 0
Finished with argument 0
1
>>> _d.dispose()
Using error_callback:
>>> def cb(*e):
... print('Got error')
>>> s.emit('abc')
>>> _d = (s | op.MapAsync(delay_add, error_callback=cb)).subscribe(Sink(print))
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.02))
Starting with argument abc
Got error
>>> _d.dispose()