Source code for broqer.value

""" Implementing Value """

from typing import Any

# pylint: disable=cyclic-import
from broqer import Publisher, NONE
from broqer.operator import Operator


[docs]class Value(Operator): """ Value is a publisher and subscriber. >>> from broqer import Sink >>> s = Value(0) >>> _d = s.subscribe(Sink(print)) 0 >>> s.emit(1) 1 """ def __init__(self, init=NONE): Operator.__init__(self) self._state = init def emit(self, value: Any, who: Publisher = None) -> None: # pylint: disable=unused-argument if self._originator is not None and self._originator is not who: raise ValueError('Emit from non assigned publisher') return Publisher.notify(self, value) notify = Publisher.notify
def dependent_subscribe(publisher: Publisher, value: Value): """ Let `value` subscribe to `publisher` only when `value` itself is subscribed :param publisher: publisher to be subscribed, when value is subscribed :param value: value, which will receive .emit calls from `publisher` """ def _on_subscription(existing_subscription: bool): if existing_subscription: publisher.subscribe(value) else: publisher.unsubscribe(value) value.register_on_subscription_callback(_on_subscription)