Source code for broqer.op.filter_

"""
Filters values based on a ``predicate`` function

Usage:

>>> from broqer import Value, op, Sink
>>> s = Value()

>>> filtered_publisher = s | op.Filter(lambda v:v>0)
>>> _disposable = filtered_publisher.subscribe(Sink(print))

>>> s.emit(1)
1
>>> s.emit(-1)
>>> s.emit(0)
>>> _disposable.dispose()

Also possible with additional args and kwargs:

>>> import operator
>>> filtered_publisher = s | op.Filter(operator.and_, 0x01)
>>> _disposable = filtered_publisher.subscribe(Sink(print))
>>> s.emit(100)
>>> s.emit(101)
101

"""
from functools import partial, wraps
from typing import Any, Callable

from broqer import NONE, Publisher
from broqer.operator import Operator


[docs]class Filter(Operator): """ Filter object applied to publisher :param predicate: function to evaluate the filtering :param \\*args: variable arguments to be used for evaluating predicate :param unpack: value from emits will be unpacked (\\*value) :param \\*\\*kwargs: keyword arguments to be used for evaluating predicate """ def __init__(self, predicate: Callable[[Any], bool], *args, unpack: bool = False, **kwargs) -> None: Operator.__init__(self) self._predicate = partial(predicate, *args, **kwargs) # type: Callable self._unpack = unpack def get(self) -> Any: if self._originator is None: raise ValueError('Operator is missing originator') if self._subscriptions: return self._state value = self._originator.get() # type: Any if self._unpack: # assert isinstance(value, (list, tuple)) if self._predicate(*value): return value elif self._predicate(value): return value return NONE def emit(self, value: Any, who: Publisher) -> None: if who is not self._originator: raise ValueError('Emit from non assigned publisher') if self._unpack: if self._predicate(*value): return Publisher.notify(self, value) elif self._predicate(value): return Publisher.notify(self, value) return None
class EvalTrue(Operator): """ Emits all values which evaluates for True. This operator can be used in the pipline style (v | EvalTrue()) or as standalone operation (EvalTrue(v)). """ def __init__(self, publisher: Publisher = None) -> None: Operator.__init__(self) self._originator = publisher def get(self) -> Any: if self._subscriptions: return self._state assert isinstance(self._originator, Publisher) value = self._originator.get() # type: Any if bool(value): return value return NONE def emit(self, value: Any, who: Publisher) -> None: if who is not self._originator: raise ValueError('Emit from non assigned publisher') if bool(value): return Publisher.notify(self, value) return None class EvalFalse(Operator): """ Filters all emits which evaluates for False. This operator can be used in the pipline style (v | EvalFalse() or as standalone operation (EvalFalse(v)).""" def __init__(self, publisher: Publisher = None) -> None: Operator.__init__(self) self._originator = publisher def get(self) -> Any: if self._subscriptions: return self._state assert isinstance(self._originator, Publisher) value = self._originator.get() # type: Any if not bool(value): return value return NONE def emit(self, value: Any, who: Publisher) -> None: if who is not self._originator: raise ValueError('Emit from non assigned publisher') if not bool(value): return Publisher.notify(self, value) return None def build_filter(predicate: Callable[[Any], bool] = None, *, unpack: bool = False): """ Decorator to wrap a function to return a Filter operator. :param function: function to be wrapped :param unpack: value from emits will be unpacked (*value) """ def _build_filter(predicate): return Filter(predicate, unpack=unpack) if predicate: return _build_filter(predicate) return _build_filter def build_filter_factory(predicate: Callable[[Any], bool] = None, *, unpack: bool = False): """ Decorator to wrap a function to return a factory for Filter operators. :param predicate: function to be wrapped :param unpack: value from emits will be unpacked (*value) """ def _build_filter(predicate: Callable[[Any], bool]): @wraps(predicate) def _wrapper(*args, **kwargs) -> Filter: if 'unpack' in kwargs: raise TypeError('"unpack" has to be defined by decorator') return Filter(predicate, *args, unpack=unpack, **kwargs) return _wrapper if predicate: return _build_filter(predicate) return _build_filter