Throttle

Definition

class broqer.op.Throttle(duration: float, error_callback=<broqer.default_error_handler.DefaultErrorHandler object>, loop=None)[source]

Rate limit emits by the given time.

Parameters:
  • duration – time for throttling in seconds
  • error_callback – the error callback to be registered
  • loop – asyncio event loop to use

Usage

Rate limit emits by the given time.

Usage:

>>> import asyncio
>>> from broqer import Subject, op
>>> s = Subject()
>>> throttle_publisher = s | op.Throttle(0.1)
>>> _d = throttle_publisher | op.Sink(print)
>>> s.emit(1)
1
>>> s.emit(2)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.05))
>>> s.emit(3)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.2))
3

It’s also possible to reset the throttling duration:

>>> s.emit(4)
4
>>> s.emit(5)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.05))
>>> throttle_publisher.reset()
5