Python Broqer

https://img.shields.io/pypi/v/broqer.svg https://readthedocs.org/projects/python-broqer/badge/?version=latest https://img.shields.io/github/license/semiversus/python-broqer.svg

Initial focus on embedded systems Broqer can be used wherever continuous streams of data have to be processed - and they are everywhere. Watch out!

https://cdn.rawgit.com/semiversus/python-broqer/7beb7379/docs/logo.svg

Content

Introduction

What’s all the fuss about Reactive Programming?

Reactive Programming is all about asynchronous data streams. In history of programming a lot of architectures and paradigms had come up solving this issues. Terms like event driven, oberserver pattern and others are coming up describing the idea behind reactive programming.

In reactive programming you work on asynchronous data streams. And this can be nearly anything.

Examples of sources for asynchronous data streams:

  • events in an user interface (clicks, mouse moves, …)
  • requests from clients in a server environment
  • sensor and input data from an embedded system
  • variables, states or data structures in an application

Reactive programming gives you operators to work on those streams:

  • filter streams
  • map functions to streams
  • merge streams in various ways
  • process contained data

Publishers

In broqer a subscriber can subscribe to a publisher. After subscription the subscriber is notified about emitted values from the publisher ( starting with the current state). In other frameworks publisher/subscriber are referenced as observable/observer.

broqer.NONE is used as default initialisation. .get() will always return the internal state (even when it’s broqer.NONE). .subscribe() will emit the actual state to the new subscriber only if it is something else than broqer.NONE .

To receive information use following methods to interact with Publisher:

  • .subscribe(subscriber) to subscribe for events on this publisher
  • .unsubscribe(subscriber) to unsubscribe
  • .get() to get the current state

When implementing a Publisher use the following methods:

  • .notify(value) calls .emit(value) on all subscribers
param init:the initial state.
ivar _state:state of the publisher
ivar _inherited_type:
 type class for method lookup
ivar _subscriptions:
 holding a list of subscribers
ivar _on_subscription_cb:
 callback with boolean as argument, telling if at least one subscription exists
ivar _dependencies:
 list with publishers this publisher is (directly or indirectly) dependent on.

Publisher

class broqer.Publisher(init=<class 'broqer.types.NONE'>, type_=None)[source]

In broqer a subscriber can subscribe to a publisher. After subscription the subscriber is notified about emitted values from the publisher ( starting with the current state). In other frameworks publisher/subscriber are referenced as observable/observer.

broqer.NONE is used as default initialisation. .get() will always return the internal state (even when it’s broqer.NONE). .subscribe() will emit the actual state to the new subscriber only if it is something else than broqer.NONE .

To receive information use following methods to interact with Publisher:

  • .subscribe(subscriber) to subscribe for events on this publisher
  • .unsubscribe(subscriber) to unsubscribe
  • .get() to get the current state

When implementing a Publisher use the following methods:

  • .notify(value) calls .emit(value) on all subscribers
Parameters:

init – the initial state.

Variables:
  • _state – state of the publisher
  • _inherited_type – type class for method lookup
  • _subscriptions – holding a list of subscribers
  • _on_subscription_cb – callback with boolean as argument, telling if at least one subscription exists
  • _dependencies – list with publishers this publisher is (directly or indirectly) dependent on.
get() → ValueT[source]

Return the state of the publisher.

notify(value: ValueT) → None[source]

Calling .emit(value) on all subscribers and store state.

Parameters:value – value to be emitted to subscribers
subscribe(subscriber: Subscriber, prepend: bool = False) → SubscriptionDisposable[source]

Subscribing the given subscriber.

Parameters:
  • subscriber – subscriber to add
  • prepend – For internal use - usually the subscribers will be added at the end of a list. When prepend is True, it will be added in front of the list. This will habe an effect in the order the subscribers are called.
Raises:

SubscriptionError – if subscriber already subscribed

subscriptions

Property returning a tuple with all current subscribers

unsubscribe(subscriber: Subscriber) → None[source]

Unsubscribe the given subscriber

Parameters:subscriber – subscriber to unsubscribe
Raises:SubscriptionError – if subscriber is not subscribed (anymore)

StatefulPublisher

FromPolling

Subscribers

Subscriber

class broqer.Subscriber[source]

A Subscriber is listening to changes of a publisher. As soon as the publisher is emitting a value .emit(value) will be called.

Sink

SinkAsync

OnEmitFuture

Trace

TopicMapper

Operators

Operator Description
Accumulate Apply func(value, state) which is returning new state and value to emit
Cache Caching the emitted values (make a stateless publisher stateful)
CatchException Catching exceptions of following operators in the pipeline
CombineLatest Combine the latest emit of multiple publishers and emit the combination
Debounce Emit a value only after a given idle time (emits meanwhile are skipped).
Delay Emit every value delayed by the given time.
Filter Filters values based on a predicate function
Map Apply a function to each emitted value
MapAsync Apply a coroutine to each emitted value allowing async processing
MapThreaded Apply a blocking function to each emitted value allowing threaded processing
Merge Merge emits of multiple publishers into one stream
Partition Group size emits into one emit as tuple
Reduce Like Map but with additional previous result as argument to the function.
Replace Replace each received value by the given value
Sample Emit the last received value periodically
SlidingWindow Group size emitted values overlapping
Switch Emit selected source mapped by mapping
Throttle Rate limit emits by the given time

Accumulate

Definition

Usage

Cache

Definition

class broqer.op.Cache(init: Any = <class 'broqer.types.NONE'>)[source]

Cache object applied to publisher (see Map)

Usage

Cache the latest emit - the result is suppressing multiple emits with the same value. Also initialization can be defined in the case the source publisher does not emit on subscription.

Usage:

>>> from broqer import Value, op, Sink
>>> s = Value(1)
>>> cached_publisher = s | op.Cache()
>>> _disposable = cached_publisher.subscribe(Sink(print))
1
>>> s.emit(2)
2
>>> s.emit(2)
>>> _disposable.dispose()

Using the initial value for cache:

>>> from broqer import Value, op, Sink
>>> s = Value()
>>> cached_publisher = s | op.Cache(1)
>>> _disposable = cached_publisher.subscribe(Sink(print))
1
>>> s.emit(1)
>>> s.emit(2)
2
>>> _disposable.dispose()

CatchException

Definition

Usage

CombineLatest

Definition

class broqer.op.CombineLatest(*publishers, map_: Callable[[...], Any] = None, emit_on=None)[source]

Combine the latest emit of multiple publishers and emit the combination

Parameters:
  • publishers – source publishers
  • map – optional function to be called for evaluation of current state
  • emit_on – publisher or list of publishers - only emitting result when emit comes from one of this list. If None, emit on any source publisher.

Usage

>>> from broqer import Value, Sink, op
>>> s1 = Value()
>>> s2 = Value()
>>> combination = op.CombineLatest(s1, s2)
>>> disposable = combination.subscribe(Sink(print))

CombineLatest is only emitting, when all values are collected:

>>> s1.emit(1)
>>> s2.emit(2)
(1, 2)
>>> s2.emit(3)
(1, 3)

Subscribing to a CombineLatest with all values available is emitting the values immediate on subscription:

>>> combination.subscribe(Sink(print, 'Second sink:'))
Second sink: (1, 3)
<...>

Debounce

Definition

Usage

Delay

Definition

Usage

Filter

Definition

class broqer.op.Filter(predicate: Callable[[Any], bool], *args, unpack: bool = False, **kwargs)[source]

Filter object applied to publisher

Parameters:
  • predicate – function to evaluate the filtering
  • *args – variable arguments to be used for evaluating predicate
  • unpack – value from emits will be unpacked (*value)
  • **kwargs – keyword arguments to be used for evaluating predicate

Usage

Filters values based on a predicate function

Usage:

>>> from broqer import Value, op, Sink
>>> s = Value()
>>> filtered_publisher = s | op.Filter(lambda v:v>0)
>>> _disposable = filtered_publisher.subscribe(Sink(print))
>>> s.emit(1)
1
>>> s.emit(-1)
>>> s.emit(0)
>>> _disposable.dispose()

Also possible with additional args and kwargs:

>>> import operator
>>> filtered_publisher = s | op.Filter(operator.and_, 0x01)
>>> _disposable = filtered_publisher.subscribe(Sink(print))
>>> s.emit(100)
>>> s.emit(101)
101

Map

Definition

class broqer.op.Map(function: Callable[[Any], Any], *args, unpack: bool = False, **kwargs)[source]

Map object applied to publisher

Parameters:
  • function – function to be applied for each emit
  • *args – variable arguments to be used for calling function
  • unpack – value from emits will be unpacked (*value)
  • **kwargs – keyword arguments to be used for calling function

Usage

Apply function(*args, value, **kwargs) to each emitted value

Usage:

>>> from broqer import Value, op, Sink
>>> s = Value()
>>> mapped_publisher = s | op.Map(lambda v:v*2)
>>> _disposable = mapped_publisher.subscribe(Sink(print))
>>> s.emit(1)
2
>>> s.emit(-1)
-2
>>> s.emit(0)
0
>>> _disposable.dispose()

Also possible with additional args and kwargs:

>>> import operator
>>> mapped_publisher = s | op.Map(operator.add, 3)
>>> _disposable = mapped_publisher.subscribe(Sink(print))
3
>>> s.emit(100)
103
>>> _disposable.dispose()
>>> _disposable = (s | op.Map(print, 'Output:')).subscribe(                                                Sink(print, 'EMITTED'))
Output: 100
EMITTED None
>>> s.emit(1)
Output: 1
EMITTED None

MapAsync

Definition

class broqer.op.MapAsync(coro, *args, mode=<AsyncMode.CONCURRENT: 1>, error_callback=<broqer.error_handler.DefaultErrorHandler object>, unpack: bool = False, **kwargs)[source]

Apply coro(*args, value, **kwargs) to each emitted value allow async processing.

Parameters:
  • coro – coroutine to be applied on emit
  • *args – variable arguments to be used for calling coro
  • mode – behavior when a value is currently processed
  • error_callback – error callback to be registered
  • unpack – value from emits will be unpacked as (*value)
  • **kwargs – keyword arguments to be used for calling coro
Variables:

scheduled – Publisher emitting the value when coroutine is actually started.

Usage

Apply coro to each emitted value allowing async processing

Usage:

>>> import asyncio
>>> from broqer import Value, Sink, op
>>> s = Value()
>>> async def delay_add(a):
...     print('Starting with argument', a)
...     await asyncio.sleep(0.015)
...     result = a + 1
...     print('Finished with argument', a)
...     return result

AsyncMode: CONCURRENT (is default)

>>> s.emit(0)
>>> _d = (s | op.MapAsync(delay_add)).subscribe(Sink())
>>> s.emit(1)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.02))
Starting with argument 0
Starting with argument 1
Finished with argument 0
Finished with argument 1
>>> _d.dispose()

AsyncMode: INTERRUPT

>>> s.emit(0)
>>> o = (s | op.MapAsync(delay_add, mode=op.AsyncMode.INTERRUPT))
>>> _d = o.subscribe(Sink(print))
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.005))
Starting with argument 0
>>> s.emit(1)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.02))
Starting with argument 1
Finished with argument 1
2
>>> _d.dispose()

AsyncMode: QUEUE

>>> s.emit(0)
>>> o = (s | op.MapAsync(delay_add, mode=op.AsyncMode.QUEUE))
>>> _d = o.subscribe(Sink(print))
>>> s.emit(1)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.04))
Starting with argument 0
Finished with argument 0
1
Starting with argument 1
Finished with argument 1
2
>>> _d.dispose()

AsyncMode: LAST

>>> s.emit(0)
>>> o = (s | op.MapAsync(delay_add, mode=op.AsyncMode.LAST))
>>> _d = o.subscribe(Sink(print))
>>> s.emit(1)
>>> s.emit(2)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.04))
Starting with argument 0
Finished with argument 0
1
Starting with argument 2
Finished with argument 2
3
>>> _d.dispose()

AsyncMode: SKIP

>>> s.emit(0)
>>> o = (s | op.MapAsync(delay_add, mode=op.AsyncMode.SKIP))
>>> _d = o.subscribe(Sink(print))
>>> s.emit(1)
>>> s.emit(2)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.04))
Starting with argument 0
Finished with argument 0
1
>>> _d.dispose()

Using error_callback:

>>> def cb(*e):
...     print('Got error')
>>> s.emit('abc')
>>> _d = (s | op.MapAsync(delay_add, error_callback=cb)).subscribe(Sink(print))
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.02))
Starting with argument abc
Got error
>>> _d.dispose()

MapThreaded

Definition

Usage

Merge

Definition

Usage

Partition

Definition

Usage

Reduce

Definition

Usage

Replace

Definition

Usage

Sample

Definition

Usage

SlidingWindow

Definition

Usage

Switch

Definition

Usage

Throttle

Definition

class broqer.op.Throttle(duration: float, error_callback=<broqer.error_handler.DefaultErrorHandler object>, loop=None)[source]

Rate limit emits by the given time.

Parameters:
  • duration – time for throttling in seconds
  • error_callback – the error callback to be registered
  • loop – asyncio event loop to use

Usage

Rate limit emits by the given time. Usage: >>> import asyncio >>> from broqer import Value, op, Sink >>> v = Value() >>> throttle_publisher = v | op.Throttle(0.1) >>> _d = throttle_publisher.subscribe(Sink(print)) >>> v.emit(1) 1 >>> v.emit(2) >>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.05)) >>> v.emit(3) >>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.2)) 3 >>> # It’s also possible to reset the throttling duration: >>> v.emit(4) 4 >>> v.emit(5) >>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.05)) >>> throttle_publisher.reset()

Subjects

Subject

Value

class broqer.Value(init=<class 'broqer.types.NONE'>)[source]

Value is a publisher and subscriber.

>>> from broqer import Sink
>>> s = Value(0)
>>> _d = s.subscribe(Sink(print))
0
>>> s.emit(1)
1

Hub

Synopsis

  • Pure python implementation without dependencies
  • Under MIT license (2018 Günther Jena)
  • Source is hosted on GitHub.com
  • Documentation is hosted on ReadTheDocs.com
  • Tested on Python 3.7. 3.8, 3.9, 3.10 and 3.11
  • Unit tested with pytest, coding style checked with Flake8, static type checked with mypy, static code checked with Pylint, documented with Sphinx
  • Operators known from ReactiveX and other streaming frameworks (like Map, CombineLatest, …)
    • Centralised object to keep track of publishers and subscribers
    • Starting point to build applications with a microservice architecture

Showcase

In other frameworks a Publisher is sometimes called Oberservable. A Subscriber is able to observe changes the publisher is emitting. With these basics you’re able to use the observer pattern - let’s see!

Observer pattern

Subscribing to a publisher is done via the .subscribe() method. A simple subscriber is Sink which is calling a function with optional positional and keyword arguments.

>>> from broqer import Publisher, Sink
>>> a = Publisher(5)  # create a publisher with state `5`
>>> s = Sink(print, 'Change:')  # create a subscriber
>>> disposable = a.subscribe(s)  # subscribe subscriber to publisher
Change: 5

>>> a.notify(3)  # change the state
Change: 3

>>> disposable.dispose()  # unsubscribe

Combine publishers with arithmetic operators

You’re able to create publishers on the fly by combining two publishers with the common operators (like +, >, <<, …).

>>> a = Publisher(1)
>>> b = Publisher(3)

>>> c = a * 3 > b  # create a new publisher via operator overloading
>>> disposable = c.subscribe(Sink(print, 'c:'))
c: False

>>> a.notify(2)
c: True

>>> b.notify(10)
c: False

Also fancy stuff like getting item by index or key is possible:

>>> i = Publisher('a')
>>> d = Publisher({'a':100, 'b':200, 'c':300})

>>> disposable = d[i].subscribe(Sink(print, 'r:'))
r: 100

>>> i.notify('c')
r: 300
>>> d.notify({'c':123})
r: 123

Some python built in functions can’t return Publishers (e.g. len() needs to return an integer). For these cases special functions are defined in broqer: Str, Int, Float, Len and In (for x in y). Also other functions for convenience are available: All, Any, BitwiseAnd and BitwiseOr.

Attribute access on a publisher is building a publisher where the actual attribute access is done on emitting values. A publisher has to know, which type it should mimic - this is done via .inherit_type(type).

>>> i = Publisher('Attribute access made REACTIVE')
>>> i.inherit_type(str)
>>> disposable = i.lower().split(sep=' ').subscribe(Sink(print))
['attribute', 'access', 'made', 'reactive']

>>> i.notify('Reactive and pythonic')
['reactive', 'and', 'pythonic']

Function decorators

Make your own operators on the fly with function decorators. Decorators are available for Accumulate, CombineLatest, Filter, Map, MapAsync, MapThreaded, Reduce and Sink.

>>> from broqer import op
>>> @op.build_map
... def count_vowels(s):
...     return sum([s.count(v) for v in 'aeiou'])

>>> msg = Publisher('Hello World!')
>>> disposable = (msg | count_vowels).subscribe(Sink(print, 'Number of vowels:'))
Number of vowels: 3
>>> msg.notify('Wahuuu')
Number of vowels: 4

You can even make configurable Map s and Filter s:

>>> import re

>>> @op.build_filter_factory
... def filter_pattern(pattern, s):
...     return re.search(pattern, s) is not None

>>> msg = Publisher('Cars passed: 135!')
>>> disposable = (msg | filter_pattern('[0-9]+')).subscribe(Sink(print))
Cars passed: 135!
>>> msg.notify('No cars have passed')
>>> msg.notify('Only 1 car has passed')
Only 1 car has passed

Install

pip install broqer

Credits

Broqer was inspired by:

  • RxPY: Reactive Extension for Python (by Børge Lanes and Dag Brattli)
  • aioreactive: Async/Await reactive tools for Python (by Dag Brattli)
  • streamz: build pipelines to manage continuous streams of data (by Matthew Rocklin)
  • MQTT: M2M connectivity protocol
  • Florian Feurstein: spending hours of discussion, coming up with great ideas and help me understand the concepts!