Source code for broqer.op.map_

"""
Apply ``function(*args, value, **kwargs)`` to each emitted value

Usage:

>>> from broqer import Value, op, Sink
>>> s = Value()

>>> mapped_publisher = s | op.Map(lambda v:v*2)
>>> _disposable = mapped_publisher.subscribe(Sink(print))

>>> s.emit(1)
2
>>> s.emit(-1)
-2
>>> s.emit(0)
0
>>> _disposable.dispose()

Also possible with additional args and kwargs:

>>> import operator
>>> mapped_publisher = s | op.Map(operator.add, 3)
>>> _disposable = mapped_publisher.subscribe(Sink(print))
3
>>> s.emit(100)
103
>>> _disposable.dispose()

>>> _disposable = (s | op.Map(print, 'Output:')).subscribe(\
                                                Sink(print, 'EMITTED'))
Output: 100
EMITTED None
>>> s.emit(1)
Output: 1
EMITTED None
"""
from functools import partial, wraps
from typing import Any, Callable

from broqer import Publisher, NONE
from broqer.publisher import ValueT
from broqer.operator import Operator


[docs]class Map(Operator): """ Map object applied to publisher :param function: function to be applied for each emit :param \\*args: variable arguments to be used for calling function :param unpack: value from emits will be unpacked (\\*value) :param \\*\\*kwargs: keyword arguments to be used for calling function """ def __init__(self, function: Callable[[Any], Any], *args, unpack: bool = False, **kwargs) -> None: """ Special care for return values: - return `None` (or nothing) if you don't want to return a result - return `None, ` if you want to return `None` - return `(a, b), ` to return a tuple as value - every other return value will be unpacked """ Operator.__init__(self) self._function = partial(function, *args, **kwargs) self._unpack = unpack def get(self) -> ValueT: if self._subscriptions: return self._state if self._originator is None: raise ValueError('Operator is missing originator') value = self._originator.get() # type: ValueT if value is NONE: return value if self._unpack: assert isinstance(value, (list, tuple)) return self._function(*value) return self._function(value) def emit(self, value: ValueT, who: Publisher) -> None: if who is not self._originator: raise ValueError('Emit from non assigned publisher') if self._unpack: assert isinstance(value, (list, tuple)) result = self._function(*value) else: result = self._function(value) if result is not NONE: return Publisher.notify(self, result) return None
def build_map(function: Callable[..., None] = None, *, unpack: bool = False): """ Decorator to wrap a function to return a Map operator. :param function: function to be wrapped :param unpack: value from emits will be unpacked (*value) """ def _build_map(function): return Map(function, unpack=unpack) if function: return _build_map(function) return _build_map def build_map_factory(function: Callable[[Any], Any] = None, unpack: bool = False): """ Decorator to wrap a function to return a factory for Map operators. :param function: function to be wrapped :param unpack: value from emits will be unpacked (*value) """ def _build_map(function: Callable[[Any], Any]): @wraps(function) def _wrapper(*args, **kwargs) -> Map: if 'unpack' in kwargs: raise TypeError('"unpack" has to be defined by decorator') return Map(function, *args, unpack=unpack, **kwargs) return _wrapper if function: return _build_map(function) return _build_map