MapThreaded

Definition

class broqer.op.MapThreaded(function: Callable[[Any], Any], *args, mode: broqer.op.map_async.MODE = <MODE.CONCURRENT: 1>, error_callback=<broqer.default_error_handler.DefaultErrorHandler object>, unpack: bool = False, loop=None, **kwargs)[source]

Apply function to each emitted value allowing threaded processing.

Parameters:
  • function – function called to apply
  • *args – variable arguments to be used for calling function
  • mode – behavior when a value is currently processed
  • error_callback – error callback to be registered
  • unpack – value from emits will be unpacked as (*value)
  • **kwargs – keyword arguments to be used for calling function

Usage

Apply function to each emitted value allowing threaded processing

Usage:

>>> import time
>>> from broqer import Subject, op
>>> s = Subject()
>>> def delay_add(a):
...     print('Starting with argument', a)
...     time.sleep(0.015)
...     result = a + 1
...     print('Finished with argument', a)
...     return result

Mode: CONCURRENT (is default)

>>> _d = s | op.MapThreaded(delay_add) | op.Sink()
>>> s.emit(0)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.001))
Starting with argument 0
>>> s.emit(1)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.03))
Starting with argument 1
Finished with argument ...
Finished with argument ...
>>> _d.dispose()

Mode: QUEUE

>>> _d = s | op.MapThreaded(delay_add, mode=op.MODE.QUEUE) | op.Sink(print)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.001))
>>> s.emit(0)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.001))
Starting with argument 0
>>> s.emit(1)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.04))
Finished with argument 0
1
Starting with argument 1
Finished with argument 1
2
>>> _d.dispose()

Mode: LAST

>>> _d = s | op.MapThreaded(delay_add, mode=op.MODE.LAST) | op.Sink(print)
>>> s.emit(0)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.001))
Starting with argument 0
>>> s.emit(1)
>>> s.emit(2)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.04))
Finished with argument 0
1
Starting with argument 2
Finished with argument 2
3
>>> _d.dispose()

Mode: SKIP

>>> _d = s | op.MapThreaded(delay_add, mode=op.MODE.SKIP) | op.Sink(print)
>>> s.emit(0)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.001))
Starting with argument 0
>>> s.emit(1)
>>> s.emit(2)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.04))
Finished with argument 0
1
>>> _d.dispose()

Using error_callback:

>>> def cb(*e):
...     print('Got error')
>>> _d = s | op.MapThreaded(delay_add, error_callback=cb) | op.Sink(print)
>>> s.emit('abc')
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.001))
Starting with argument abc
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.02))
Got error
>>> _d.dispose()