Publishers

In broqer a subscriber can subscribe to a publisher. After subscription the subscriber is notified about emitted values from the publisher. In other frameworks publisher/subscriber are referenced as observable/observer.

As information receiver use following method to interact with Publisher

  • .subscribe(subscriber) to subscribe for events on this publisher
  • .unsubscribe(subscriber) to unsubscribe
  • .get() to get the current state (will raise ValueError if not stateful)

When implementing a Publisher use the following methods:

  • .notify(value) calls .emit(value) on all subscribers
ivar _subscriptions:
 holding a list of subscribers
ivar _inherited_type:
 type class for method lookup

Publisher

class broqer.Publisher[source]

In broqer a subscriber can subscribe to a publisher. After subscription the subscriber is notified about emitted values from the publisher. In other frameworks publisher/subscriber are referenced as observable/observer.

As information receiver use following method to interact with Publisher

  • .subscribe(subscriber) to subscribe for events on this publisher
  • .unsubscribe(subscriber) to unsubscribe
  • .get() to get the current state (will raise ValueError if not stateful)

When implementing a Publisher use the following methods:

  • .notify(value) calls .emit(value) on all subscribers
Variables:
  • _subscriptions – holding a list of subscribers
  • _inherited_type – type class for method lookup
get()[source]

Return the value of the publisher. This is only working for stateful publishers. If publisher is stateless it will raise a ValueError.

Raises:ValueError – when the publisher is stateless.
notify(value: Any) → asyncio.futures.Future[source]

Calling .emit(value) on all subscribers. A synchronouse subscriber will just return None, a asynchronous one may returns a future. Futures will be collected. If no future was returned None will be returned by this method. If one futrue was returned that future will be returned. When multiple futures were returned a gathered future will be returned.

Parameters:value – value to be emitted to subscribers
Returns:a future if at least one subscriber has returned a future, elsewise None
subscribe(subscriber: Subscriber, prepend: bool = False) → broqer.disposable.SubscriptionDisposable[source]

Subscribing the given subscriber.

Parameters:
  • subscriber – subscriber to add
  • prepend – For internal use - usually the subscribers will be added at the end of a list. When prepend is True, it will be added in front of the list. This will habe an effect in the order the subscribers are called.
Raises:

SubscriptionError – if subscriber already subscribed

subscriptions

Property returning a tuple with all current subscribers

unsubscribe(subscriber: Subscriber) → None[source]

Unsubscribe the given subscriber

Parameters:subscriber – subscriber to unsubscribe
Raises:SubscriptionError – if subscriber is not subscribed (anymore)
wait_for(timeout=None)[source]

When a timeout should be applied for awaiting use this method. :param timeout: optional timeout in seconds. :returns: a future returning the emitted value

StatefulPublisher

class broqer.StatefulPublisher(init=<class 'broqer.types.NONE'>)[source]

A StatefulPublisher is keeping it’s state. This changes the behavior compared to a non-stateful Publisher: - when subscribing the subscriber will be notified with the actual state - .get() is returning the actual state

Parameters:init – the initial state. As long the state is NONE, the behavior will be equal to a stateless Publisher.

FromPolling

class broqer.op.FromPolling(interval, poll_func: Callable[[Any], Any], *args, error_callback=<broqer.default_error_handler.DefaultErrorHandler object>, loop=None, **kwargs)[source]

Call func(*args, **kwargs) periodically and emit the returned values. :param interval: periodic interval in seconds. Use None if it should poll

only once on first subscription
Parameters:
  • poll_func – function to be called
  • *args – variable arguments to be used for calling poll_func
  • error_callback – error callback to be registered
  • loop – asyncio event loop to use
  • *kwargs – keyword arguments to be used for calling poll_func