Python Broqer¶
Initial focus on embedded systems Broqer can be used wherever continuous streams of data have to be processed - and they are everywhere. Watch out!
Content¶
Introduction¶
What’s all the fuss about Reactive Programming?¶
Reactive Programming is all about asynchronous data streams. In history of programming a lot of architectures and paradigms had come up solving this issues. Terms like event driven, oberserver pattern and others are coming up describing the idea behind reactive programming.
In reactive programming you work on asynchronous data streams. And this can be nearly anything.
Examples of sources for asynchronous data streams:
- events in an user interface (clicks, mouse moves, …)
- requests from clients in a server environment
- sensor and input data from an embedded system
- variables, states or data structures in an application
- …
Reactive programming gives you operators to work on those streams:
- filter streams
- map functions to streams
- merge streams in various ways
- process contained data
- …
Publishers¶
In broqer a subscriber can subscribe to a publisher. After subscription the subscriber is notified about emitted values from the publisher ( starting with the current state). In other frameworks publisher/subscriber are referenced as observable/observer.
broqer.NONE is used as default initialisation. .get() will always return the internal state (even when it’s broqer.NONE). .subscribe() will emit the actual state to the new subscriber only if it is something else than broqer.NONE .
To receive information use following methods to interact with Publisher:
.subscribe(subscriber)
to subscribe for events on this publisher.unsubscribe(subscriber)
to unsubscribe.get()
to get the current state
When implementing a Publisher use the following methods:
.notify(value)
calls .emit(value) on all subscribers
param init: | the initial state. |
---|---|
ivar _state: | state of the publisher |
ivar _inherited_type: | |
type class for method lookup | |
ivar _subscriptions: | |
holding a list of subscribers | |
ivar _on_subscription_cb: | |
callback with boolean as argument, telling if at least one subscription exists | |
ivar _dependencies: | |
list with publishers this publisher is (directly or indirectly) dependent on. |
Publisher¶
-
class
broqer.
Publisher
(init=<class 'broqer.types.NONE'>, type_=None)[source]¶ In broqer a subscriber can subscribe to a publisher. After subscription the subscriber is notified about emitted values from the publisher ( starting with the current state). In other frameworks publisher/subscriber are referenced as observable/observer.
broqer.NONE is used as default initialisation. .get() will always return the internal state (even when it’s broqer.NONE). .subscribe() will emit the actual state to the new subscriber only if it is something else than broqer.NONE .
To receive information use following methods to interact with Publisher:
.subscribe(subscriber)
to subscribe for events on this publisher.unsubscribe(subscriber)
to unsubscribe.get()
to get the current state
When implementing a Publisher use the following methods:
.notify(value)
calls .emit(value) on all subscribers
Parameters: init – the initial state.
Variables: - _state – state of the publisher
- _inherited_type – type class for method lookup
- _subscriptions – holding a list of subscribers
- _on_subscription_cb – callback with boolean as argument, telling if at least one subscription exists
- _dependencies – list with publishers this publisher is (directly or indirectly) dependent on.
-
notify
(value: ValueT) → None[source]¶ Calling .emit(value) on all subscribers and store state.
Parameters: value – value to be emitted to subscribers
-
subscribe
(subscriber: Subscriber, prepend: bool = False) → SubscriptionDisposable[source]¶ Subscribing the given subscriber.
Parameters: - subscriber – subscriber to add
- prepend – For internal use - usually the subscribers will be added at the end of a list. When prepend is True, it will be added in front of the list. This will habe an effect in the order the subscribers are called.
Raises: SubscriptionError – if subscriber already subscribed
-
subscriptions
¶ Property returning a tuple with all current subscribers
StatefulPublisher¶
FromPolling¶
Operators¶
Operator | Description |
---|---|
Accumulate | Apply func(value, state) which is returning new state and value to emit |
Cache | Caching the emitted values (make a stateless publisher stateful) |
CatchException | Catching exceptions of following operators in the pipeline |
CombineLatest | Combine the latest emit of multiple publishers and emit the combination |
Debounce | Emit a value only after a given idle time (emits meanwhile are skipped). |
Delay | Emit every value delayed by the given time. |
Filter | Filters values based on a predicate function |
Map | Apply a function to each emitted value |
MapAsync | Apply a coroutine to each emitted value allowing async processing |
MapThreaded | Apply a blocking function to each emitted value allowing threaded processing |
Merge | Merge emits of multiple publishers into one stream |
Partition | Group size emits into one emit as tuple |
Reduce | Like Map but with additional previous result as argument to the function. |
Replace | Replace each received value by the given value |
Sample | Emit the last received value periodically |
SlidingWindow | Group size emitted values overlapping |
Switch | Emit selected source mapped by mapping |
Throttle | Rate limit emits by the given time |
Cache¶
Definition¶
Usage¶
Cache the latest emit - the result is suppressing multiple emits with the same value. Also initialization can be defined in the case the source publisher does not emit on subscription.
Usage:
>>> from broqer import Value, op, Sink
>>> s = Value(1)
>>> cached_publisher = s | op.Cache()
>>> _disposable = cached_publisher.subscribe(Sink(print))
1
>>> s.emit(2)
2
>>> s.emit(2)
>>> _disposable.dispose()
Using the initial value for cache:
>>> from broqer import Value, op, Sink
>>> s = Value()
>>> cached_publisher = s | op.Cache(1)
>>> _disposable = cached_publisher.subscribe(Sink(print))
1
>>> s.emit(1)
>>> s.emit(2)
2
>>> _disposable.dispose()
CombineLatest¶
Definition¶
-
class
broqer.op.
CombineLatest
(*publishers, map_: Callable[[...], Any] = None, emit_on=None)[source]¶ Combine the latest emit of multiple publishers and emit the combination
Parameters: - publishers – source publishers
- map – optional function to be called for evaluation of current state
- emit_on – publisher or list of publishers - only emitting result when emit comes from one of this list. If None, emit on any source publisher.
Usage¶
>>> from broqer import Value, Sink, op
>>> s1 = Value()
>>> s2 = Value()
>>> combination = op.CombineLatest(s1, s2)
>>> disposable = combination.subscribe(Sink(print))
CombineLatest is only emitting, when all values are collected:
>>> s1.emit(1)
>>> s2.emit(2)
(1, 2)
>>> s2.emit(3)
(1, 3)
Subscribing to a CombineLatest with all values available is emitting the values immediate on subscription:
>>> combination.subscribe(Sink(print, 'Second sink:'))
Second sink: (1, 3)
<...>
Filter¶
Definition¶
-
class
broqer.op.
Filter
(predicate: Callable[[Any], bool], *args, unpack: bool = False, **kwargs)[source]¶ Filter object applied to publisher
Parameters: - predicate – function to evaluate the filtering
- *args – variable arguments to be used for evaluating predicate
- unpack – value from emits will be unpacked (*value)
- **kwargs – keyword arguments to be used for evaluating predicate
Usage¶
Filters values based on a predicate
function
Usage:
>>> from broqer import Value, op, Sink
>>> s = Value()
>>> filtered_publisher = s | op.Filter(lambda v:v>0)
>>> _disposable = filtered_publisher.subscribe(Sink(print))
>>> s.emit(1)
1
>>> s.emit(-1)
>>> s.emit(0)
>>> _disposable.dispose()
Also possible with additional args and kwargs:
>>> import operator
>>> filtered_publisher = s | op.Filter(operator.and_, 0x01)
>>> _disposable = filtered_publisher.subscribe(Sink(print))
>>> s.emit(100)
>>> s.emit(101)
101
Map¶
Definition¶
-
class
broqer.op.
Map
(function: Callable[[Any], Any], *args, unpack: bool = False, **kwargs)[source]¶ Map object applied to publisher
Parameters: - function – function to be applied for each emit
- *args – variable arguments to be used for calling function
- unpack – value from emits will be unpacked (*value)
- **kwargs – keyword arguments to be used for calling function
Usage¶
Apply function(*args, value, **kwargs)
to each emitted value
Usage:
>>> from broqer import Value, op, Sink
>>> s = Value()
>>> mapped_publisher = s | op.Map(lambda v:v*2)
>>> _disposable = mapped_publisher.subscribe(Sink(print))
>>> s.emit(1)
2
>>> s.emit(-1)
-2
>>> s.emit(0)
0
>>> _disposable.dispose()
Also possible with additional args and kwargs:
>>> import operator
>>> mapped_publisher = s | op.Map(operator.add, 3)
>>> _disposable = mapped_publisher.subscribe(Sink(print))
3
>>> s.emit(100)
103
>>> _disposable.dispose()
>>> _disposable = (s | op.Map(print, 'Output:')).subscribe( Sink(print, 'EMITTED'))
Output: 100
EMITTED None
>>> s.emit(1)
Output: 1
EMITTED None
MapAsync¶
Definition¶
-
class
broqer.op.
MapAsync
(coro, *args, mode=<AsyncMode.CONCURRENT: 1>, error_callback=<broqer.error_handler.DefaultErrorHandler object>, unpack: bool = False, **kwargs)[source]¶ Apply
coro(*args, value, **kwargs)
to each emitted value allow async processing.Parameters: - coro – coroutine to be applied on emit
- *args – variable arguments to be used for calling coro
- mode – behavior when a value is currently processed
- error_callback – error callback to be registered
- unpack – value from emits will be unpacked as (*value)
- **kwargs – keyword arguments to be used for calling coro
Variables: scheduled – Publisher emitting the value when coroutine is actually started.
Usage¶
Apply coro
to each emitted value allowing async processing
Usage:
>>> import asyncio
>>> from broqer import Value, Sink, op
>>> s = Value()
>>> async def delay_add(a):
... print('Starting with argument', a)
... await asyncio.sleep(0.015)
... result = a + 1
... print('Finished with argument', a)
... return result
AsyncMode: CONCURRENT (is default)
>>> s.emit(0)
>>> _d = (s | op.MapAsync(delay_add)).subscribe(Sink())
>>> s.emit(1)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.02))
Starting with argument 0
Starting with argument 1
Finished with argument 0
Finished with argument 1
>>> _d.dispose()
AsyncMode: INTERRUPT
>>> s.emit(0)
>>> o = (s | op.MapAsync(delay_add, mode=op.AsyncMode.INTERRUPT))
>>> _d = o.subscribe(Sink(print))
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.005))
Starting with argument 0
>>> s.emit(1)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.02))
Starting with argument 1
Finished with argument 1
2
>>> _d.dispose()
AsyncMode: QUEUE
>>> s.emit(0)
>>> o = (s | op.MapAsync(delay_add, mode=op.AsyncMode.QUEUE))
>>> _d = o.subscribe(Sink(print))
>>> s.emit(1)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.04))
Starting with argument 0
Finished with argument 0
1
Starting with argument 1
Finished with argument 1
2
>>> _d.dispose()
AsyncMode: LAST
>>> s.emit(0)
>>> o = (s | op.MapAsync(delay_add, mode=op.AsyncMode.LAST))
>>> _d = o.subscribe(Sink(print))
>>> s.emit(1)
>>> s.emit(2)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.04))
Starting with argument 0
Finished with argument 0
1
Starting with argument 2
Finished with argument 2
3
>>> _d.dispose()
AsyncMode: SKIP
>>> s.emit(0)
>>> o = (s | op.MapAsync(delay_add, mode=op.AsyncMode.SKIP))
>>> _d = o.subscribe(Sink(print))
>>> s.emit(1)
>>> s.emit(2)
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.04))
Starting with argument 0
Finished with argument 0
1
>>> _d.dispose()
Using error_callback:
>>> def cb(*e):
... print('Got error')
>>> s.emit('abc')
>>> _d = (s | op.MapAsync(delay_add, error_callback=cb)).subscribe(Sink(print))
>>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.02))
Starting with argument abc
Got error
>>> _d.dispose()
Throttle¶
Definition¶
-
class
broqer.op.
Throttle
(duration: float, error_callback=<broqer.error_handler.DefaultErrorHandler object>, loop=None)[source]¶ Rate limit emits by the given time.
Parameters: - duration – time for throttling in seconds
- error_callback – the error callback to be registered
- loop – asyncio event loop to use
Usage¶
Rate limit emits by the given time. Usage: >>> import asyncio >>> from broqer import Value, op, Sink >>> v = Value() >>> throttle_publisher = v | op.Throttle(0.1) >>> _d = throttle_publisher.subscribe(Sink(print)) >>> v.emit(1) 1 >>> v.emit(2) >>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.05)) >>> v.emit(3) >>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.2)) 3 >>> # It’s also possible to reset the throttling duration: >>> v.emit(4) 4 >>> v.emit(5) >>> asyncio.get_event_loop().run_until_complete(asyncio.sleep(0.05)) >>> throttle_publisher.reset()
Hub¶
Synopsis¶
- Pure python implementation without dependencies
- Under MIT license (2018 Günther Jena)
- Source is hosted on GitHub.com
- Documentation is hosted on ReadTheDocs.com
- Tested on Python 3.7. 3.8, 3.9, 3.10 and 3.11
- Unit tested with pytest, coding style checked with Flake8, static type checked with mypy, static code checked with Pylint, documented with Sphinx
- Operators known from ReactiveX and other streaming frameworks (like Map, CombineLatest, …)
- Centralised object to keep track of publishers and subscribers
- Starting point to build applications with a microservice architecture
Showcase¶
In other frameworks a Publisher is sometimes called Oberservable. A Subscriber is able to observe changes the publisher is emitting. With these basics you’re able to use the observer pattern - let’s see!
Observer pattern¶
Subscribing to a publisher is done via the .subscribe() method.
A simple subscriber is Sink
which is calling a function with optional positional
and keyword arguments.
>>> from broqer import Publisher, Sink
>>> a = Publisher(5) # create a publisher with state `5`
>>> s = Sink(print, 'Change:') # create a subscriber
>>> disposable = a.subscribe(s) # subscribe subscriber to publisher
Change: 5
>>> a.notify(3) # change the state
Change: 3
>>> disposable.dispose() # unsubscribe
Combine publishers with arithmetic operators¶
You’re able to create publishers on the fly by combining two publishers with
the common operators (like +
, >
, <<
, …).
>>> a = Publisher(1)
>>> b = Publisher(3)
>>> c = a * 3 > b # create a new publisher via operator overloading
>>> disposable = c.subscribe(Sink(print, 'c:'))
c: False
>>> a.notify(2)
c: True
>>> b.notify(10)
c: False
Also fancy stuff like getting item by index or key is possible:
>>> i = Publisher('a')
>>> d = Publisher({'a':100, 'b':200, 'c':300})
>>> disposable = d[i].subscribe(Sink(print, 'r:'))
r: 100
>>> i.notify('c')
r: 300
>>> d.notify({'c':123})
r: 123
Some python built in functions can’t return Publishers (e.g. len()
needs to
return an integer). For these cases special functions are defined in broqer: Str
,
Int
, Float
, Len
and In
(for x in y
). Also other functions
for convenience are available: All
, Any
, BitwiseAnd
and BitwiseOr
.
Attribute access on a publisher is building a publisher where the actual attribute
access is done on emitting values. A publisher has to know, which type it should
mimic - this is done via .inherit_type(type)
.
>>> i = Publisher('Attribute access made REACTIVE')
>>> i.inherit_type(str)
>>> disposable = i.lower().split(sep=' ').subscribe(Sink(print))
['attribute', 'access', 'made', 'reactive']
>>> i.notify('Reactive and pythonic')
['reactive', 'and', 'pythonic']
Function decorators¶
Make your own operators on the fly with function decorators. Decorators are
available for Accumulate
, CombineLatest
, Filter
, Map
, MapAsync
,
MapThreaded
, Reduce
and Sink
.
>>> from broqer import op
>>> @op.build_map
... def count_vowels(s):
... return sum([s.count(v) for v in 'aeiou'])
>>> msg = Publisher('Hello World!')
>>> disposable = (msg | count_vowels).subscribe(Sink(print, 'Number of vowels:'))
Number of vowels: 3
>>> msg.notify('Wahuuu')
Number of vowels: 4
You can even make configurable Map
s and Filter
s:
>>> import re
>>> @op.build_filter_factory
... def filter_pattern(pattern, s):
... return re.search(pattern, s) is not None
>>> msg = Publisher('Cars passed: 135!')
>>> disposable = (msg | filter_pattern('[0-9]+')).subscribe(Sink(print))
Cars passed: 135!
>>> msg.notify('No cars have passed')
>>> msg.notify('Only 1 car has passed')
Only 1 car has passed
Install¶
pip install broqer
Credits¶
Broqer was inspired by:
- RxPY: Reactive Extension for Python (by Børge Lanes and Dag Brattli)
- aioreactive: Async/Await reactive tools for Python (by Dag Brattli)
- streamz: build pipelines to manage continuous streams of data (by Matthew Rocklin)
- MQTT: M2M connectivity protocol
- Florian Feurstein: spending hours of discussion, coming up with great ideas and help me understand the concepts!