Source code for broqer.op.throttle

"""
Rate limit emits by the given time.
Usage:
>>> import asyncio
>>> from broqer import Value, op, Sink
>>> v = Value()
>>> throttle_publisher = v | op.Throttle(0.1)
>>> _d = throttle_publisher.subscribe(Sink(print))
>>> v.emit(1)
1
>>> v.emit(2)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.05))
>>> v.emit(3)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.2))
3
>>> # It's also possible to reset the throttling duration:
>>> v.emit(4)
4
>>> v.emit(5)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.05))
>>> throttle_publisher.reset()
"""
import asyncio
import sys
from typing import Any  # noqa: F401

from broqer import Publisher, default_error_handler, NONE

from broqer.operator import Operator
from broqer.timer import Timer


[docs]class Throttle(Operator): """ Rate limit emits by the given time. :param duration: time for throttling in seconds :param error_callback: the error callback to be registered :param loop: asyncio event loop to use """ def __init__(self, duration: float, error_callback=default_error_handler, loop=None) -> None: Operator.__init__(self) if duration < 0: raise ValueError('Duration has to be bigger than zero') self._duration = duration self._loop = loop or asyncio.get_event_loop() self._timer = Timer(self._delayed_emit_cb, loop=loop) self._error_callback = error_callback def get(self): return Publisher.get(self) def emit(self, value: Any, who: Publisher) -> None: if who is not self._originator: raise ValueError('Emit from non assigned publisher') if not self._timer.is_running(): self._timer.start(timeout=0, args=(value,)) else: self._timer.change_arguments(args=(value,)) def _delayed_emit_cb(self, value=NONE): if value is NONE: # since the last emit the given duration has passed without another # emit return try: Publisher.notify(self, value) except Exception: # pylint: disable=broad-except self._error_callback(*sys.exc_info()) self._timer.start(self._duration) def reset(self): """ Reseting duration for throttling """ self._timer.cancel()