Accumulate

Definition

class broqer.op.Accumulate(function: Callable[[Any, Any], Tuple[Any, Any]], init: Any)[source]

On each emit of source publisher a function gets called with state and received value as arguments and this returns a new state and value to emit.

Parameters:
  • function – Function taking two arguments: current state and new value. The return value is a tuple with (new state, result) where new state will be used for the next call and result will be emitted to subscribers.
  • init – initialization for state
reset(state: Any) → None[source]

Reseting (or setting) the internal state.

Parameters:state – new state to be set

Usage

>>> from broqer import Subject, op
>>> s = Subject()
>>> def moving_average(state, value):
...     state=state[1:]+[value]
...     return state, sum(state)/len(state)
>>> lowpass = s | op.Accumulate(moving_average, init=[0]*3)
>>> lowpass | op.Sink(print)
<...>
>>> s.emit(3)
1.0
>>> s.emit(3)
2.0
>>> s.emit(3)
3.0
>>> s.emit(3)
3.0