Source code for broqer.publisher

""" Implementing Publisher """
import sys
from typing import (TYPE_CHECKING, TypeVar, Type, Tuple, Callable, Optional,

from broqer import NONE, Disposable, default_error_handler
import broqer

    # pylint: disable=cyclic-import
    from typing import List
    from broqer import Subscriber
    from broqer.operator import Operator

class SubscriptionError(ValueError):
    """ Special exception raised when subscription is failing (subscriber
    already subscribed) or on unsubscribe when subscriber is not subscribed

ValueT = TypeVar('ValueT')  # Type of publisher state and emitted value
SubscriptionCBT = Callable[[bool], None]

[docs]class Publisher: """ In broqer a subscriber can subscribe to a publisher. After subscription the subscriber is notified about emitted values from the publisher ( starting with the current state). In other frameworks *publisher*/*subscriber* are referenced as *observable*/*observer*. broqer.NONE is used as default initialisation. .get() will always return the internal state (even when it's broqer.NONE). .subscribe() will emit the actual state to the new subscriber only if it is something else than broqer.NONE . To receive information use following methods to interact with Publisher: - ``.subscribe(subscriber)`` to subscribe for events on this publisher - ``.unsubscribe(subscriber)`` to unsubscribe - ``.get()`` to get the current state When implementing a Publisher use the following methods: - ``.notify(value)`` calls .emit(value) on all subscribers :param init: the initial state. :ivar _state: state of the publisher :ivar _inherited_type: type class for method lookup :ivar _subscriptions: holding a list of subscribers :ivar _on_subscription_cb: callback with boolean as argument, telling if at least one subscription exists :ivar _dependencies: list with publishers this publisher is (directly or indirectly) dependent on. """ @overload # noqa: F811 def __init__(self, *, type_: Type[ValueT] = None): pass @overload # noqa: F811 def __init__(self, init: ValueT, type_: Type[ValueT] = None): # noqa: F811 pass def __init__(self, init=NONE, type_=None): # noqa: F811 self._state = init if type_: self._inherited_type = type_ # type: Optional[Type] elif init is not NONE: self._inherited_type = type(init) else: self._inherited_type = None self._subscriptions = [] # type: List[Subscriber] self._on_subscription_cb = None # type: Optional[SubscriptionCBT] self._dependencies = () # type: Tuple[Publisher, ...]
[docs] def subscribe(self, subscriber: 'Subscriber', prepend: bool = False) -> 'SubscriptionDisposable': """ Subscribing the given subscriber. :param subscriber: subscriber to add :param prepend: For internal use - usually the subscribers will be added at the end of a list. When prepend is True, it will be added in front of the list. This will habe an effect in the order the subscribers are called. :raises SubscriptionError: if subscriber already subscribed """ # `subscriber in self._subscriptions` is not working because # tuple.__contains__ is using __eq__ which is overwritten and returns # a new publisher - not helpful here if any(subscriber is s for s in self._subscriptions): raise SubscriptionError('Subscriber already registered') if not self._subscriptions and self._on_subscription_cb: self._on_subscription_cb(True) if prepend: self._subscriptions.insert(0, subscriber) else: self._subscriptions.append(subscriber) disposable_obj = SubscriptionDisposable(self, subscriber) if self._state is not NONE: subscriber.emit(self._state, who=self) return disposable_obj
[docs] def unsubscribe(self, subscriber: 'Subscriber') -> None: """ Unsubscribe the given subscriber :param subscriber: subscriber to unsubscribe :raises SubscriptionError: if subscriber is not subscribed (anymore) """ # here is a special implementation which is replacing the more # obvious one: self._subscriptions.remove(subscriber) - this will not # work because list.remove(x) is doing comparison for equality. # Applied to publishers this will return another publisher instead of # a boolean result for i, _s in enumerate(self._subscriptions): if _s is subscriber: self._subscriptions.pop(i) if not self._subscriptions and self._on_subscription_cb: self._on_subscription_cb(False) return raise SubscriptionError('Subscriber is not registered')
[docs] def get(self) -> ValueT: """ Return the state of the publisher. """ return self._state
[docs] def notify(self, value: ValueT) -> None: """ Calling .emit(value) on all subscribers and store state. :param value: value to be emitted to subscribers """ self._state = value for subscriber in tuple(self._subscriptions): try: subscriber.emit(value, who=self) except Exception: # pylint: disable=broad-except default_error_handler(*sys.exc_info())
def reset_state(self) -> None: """ Resets the state. Calling this method will not trigger a notification, but will call .reset_state for all subscribers """ self._state = NONE for subscriber in tuple(self._subscriptions): try: subscriber.reset_state() except Exception: # pylint: disable=broad-except default_error_handler(*sys.exc_info()) @property def subscriptions(self) -> Tuple['Subscriber', ...]: """ Property returning a tuple with all current subscribers """ return tuple(self._subscriptions) def register_on_subscription_callback(self, callback: SubscriptionCBT) -> None: """ This callback will be called, when the subscriptions are changing. When a subscription is done and no subscription was present the callback is called with True as argument. When after unsubscribe no subscription is left, it will be called with False. :param callback: callback(subscription: bool) to be called. when `callback` is None the callback will be reset :raises ValueError: when a callback is already registered """ if callback is None: self._on_subscription_cb = None return if self._on_subscription_cb is not None: raise ValueError('A callback is already registered') self._on_subscription_cb = callback if self._subscriptions: callback(True) def __await__(self): """ Makes publisher awaitable. When publisher has a state it will immediatly return its state as result. Otherwise it will wait forever until it will change its state. """ future = self.as_future(timeout=None, omit_subscription=False) return future.__await__() def as_future(self, timeout: float, omit_subscription: bool = True, loop=None): """ Returns a asyncio.Future which will be done on first change of this publisher. :param timeout: timeout in seconds. Use None for infinite waiting :param omit_subscription: if True the first emit (which can be on the subscription) will be ignored. :param loop: asyncio loop to be used :returns: a future returning the emitted value """ return broqer.OnEmitFuture(self, timeout, omit_subscription, loop) def __bool__(self): """ A new Publisher is the result of a comparision between a publisher and something else (may also be a second publisher). This result should never be used in a boolean sense (e.g. in `if p1 == p2:`). To prevent this __bool__ is overwritten to raise a ValueError. """ raise ValueError('Evaluation of comparison of publishers is not ' 'supported') def __iter__(self): """ To prevent iterating over a publisher this method is implemented to throw an exception. Otherwise it will fallback to __getitem__. """ raise ValueError('Iteration over a publisher is not possible') def inherit_type(self, type_cls: Optional[Type]) -> None: """ Enables the usage of method and attribute overloading for this publisher. """ self._inherited_type = type_cls @property def inherited_type(self) -> Optional[Type]: """ Property inherited_type returns used type class (or None) """ return self._inherited_type @property def dependencies(self) -> Tuple['Publisher', ...]: """ Returning a list of publishers this publisher is dependent on. """ return self._dependencies def add_dependencies(self, *publishers: 'Publisher') -> None: """ Add publishers which are directly or indirectly controlling the behavior of this publisher :param *publishers: variable argument list with publishers """ self._dependencies = self._dependencies + publishers def __or__(self, operator: 'Operator'): operator.originator = self return operator def __dir__(self): """ Extending __dir__ with inherited type """ attrs = set(super().__dir__()) if self._inherited_type: attrs.update(set(dir(self._inherited_type))) return tuple(attrs)
class SubscriptionDisposable(Disposable): """ This disposable is returned on Publisher.subscribe(subscriber). :param publisher: publisher the subscription is made to :param subscriber: subscriber used for subscription """ def __init__(self, publisher: 'Publisher', subscriber: 'Subscriber') \ -> None: self._publisher = publisher self._subscriber = subscriber def dispose(self) -> None: self._publisher.unsubscribe(self._subscriber) @property def publisher(self) -> 'Publisher': """ Subscripted publisher """ return self._publisher @property def subscriber(self) -> 'Subscriber': """ Subscriber used in this subscription """ return self._subscriber