Debounce

Definition

class broqer.op.Debounce(duetime: float, retrigger_value: Any = <class 'broqer.types.NONE'>, error_callback=<broqer.default_error_handler.DefaultErrorHandler object>, *, loop=None)[source]

Emit a value only after a given idle time (emits meanwhile are skipped). Debounce can also be used for a timeout functionality.

Parameters:
  • duetime – time in seconds to be waited for debounce
  • retrigger_value – value used to emit when value has changed
  • error_callback – error callback to be registered
  • loop – asyncio loop to be used

Usage

Emit a value only after a given idle time (emits meanwhile are skipped). Debounce can also be used for a timeout functionality.

Usage:

>>> import asyncio
>>> from broqer import Subject, op
>>> s = Subject()
>>> _d = s | op.Debounce(0.1) | op.Sink(print)
>>> s.emit(1)
>>> s.emit(2)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.05))
>>> s.emit(3)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.15))
3
>>> _d.dispose()

When debounce is retriggered you can specify a value to emit:

>>> debounce_publisher = s | op.Debounce(0.1, False)
>>> _d = debounce_publisher | op.Sink(print)
>>> s.emit(False)
False
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.15))
>>> s.emit(True)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.05))
>>> s.emit(False)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.05))
>>> s.emit(True)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.15))
True

Reseting is also possible:

>>> s.emit(False)
False
>>> s.emit(True)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.15))
True
>>> debounce_publisher.reset()
False
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.15))
>>> _d.dispose()