Module trainlog.ops
Defines operations that transform log event sequences.
Includes a generic abstraction BaseOperation, for any transformation
of Iterable[Event] -> Iterable[Event]
.
Most operations are derived from Operation
, which defines an
operation that starts with an initial value and reduces over a log,
updating events as it goes.
For example, this operation can count the number of events that have {"kind": "step"}:
op = ops.Sum(ops.kind("step"), "step")
events_out = op(events_in)
There is also a "functional" API that provides some convenient defaults
and conversions. Since kind("...")
is such a common case, there is an
automatic conversion from string. Also, kind()
provides a default name
for the output key, so the above op
is equivalent to:
op = ops.sum("step")
Expand source code
"""Defines operations that transform log event sequences.
Includes a generic abstraction BaseOperation, for any transformation
of `Iterable[Event] -> Iterable[Event]`.
Most operations are derived from `Operation`, which defines an
operation that starts with an initial value and reduces over a log,
updating events as it goes.
For example, this operation can count the number of events that have
{"kind": "step"}:
op = ops.Sum(ops.kind("step"), "step")
events_out = op(events_in)
There is also a "functional" API that provides some convenient defaults
and conversions. Since `kind("...")` is such a common case, there is an
automatic conversion from string. Also, `kind()` provides a default name
for the output key, so the above `op` is equivalent to:
op = ops.sum("step")
"""
import abc
import builtins
from typing import (
Any,
Callable,
Dict,
Iterable,
Iterator,
List,
Optional,
TypeVar,
Union,
)
# pylint:disable=redefined-builtin
T = TypeVar("T")
Event = Dict[str, Any]
Mapping = Callable[[Event], Any]
Predicate = Callable[[Event], bool]
AutoPredicate = Union[str, Predicate]
ScalarFn = Callable[[Event], Union[bool, float]]
AutoScalarFn = Union[str, ScalarFn]
Reduction = Callable[[Iterable[Event]], Any]
BaseOperation = Callable[[Iterator[Event]], Iterator[Event]]
# Operations
class Operation(abc.ABC):
"""Base class for log denormalization operations.
An operation is a small program that is run over a sequence of events from a log:
value = operation.initial()
for event in events:
operation.update(event, value)
value = operation.accumulate(event, value)
"""
# pylint:disable=no-self-use
def initial(self) -> Any:
"""[Optional] An initial value for the reduction."""
return None
def accumulate(
self, event: Event, value: Any # pylint:disable=unused-argument
) -> Any:
"""[Optional] A reducing function to generate a new value."""
return None
@abc.abstractmethod
def update(self, event: Event, value: Any) -> None:
"""Applies the operation to update the given event.
This operation is run before the event is `accumulate`d into `value`.
The default implementation of `__call__` makes a shallow copy of `event`
externally, so that it's safe to add or remove toplevel keys, however the
`update()` implementation is responsible for deep copies, if required.
Note that this may not be called for every event that accumulate() is called on
(see `When`).
"""
raise NotImplementedError
def __call__(self, events: Iterator[Event]) -> Iterator[Event]:
"""Run the operation over an event stream, yielding a new stream."""
# pylint:disable=assignment-from-none
value = self.initial()
for event in events:
event = event.copy()
self.update(event, value)
yield event
value = self.accumulate(event, value)
class Map(Operation):
"""Apply an elementwise mapping function."""
def __init__(self, mapping: Mapping, name: str):
self.mapping = mapping
self.name = name
def update(self, event: Event, value: None) -> None:
event[self.name] = self.mapping(event)
class Copy(Operation):
"""Copy a value from a previous event."""
def __init__(self, predicate: Predicate, src: str, dest: str):
self.predicate = predicate
self.src = src
self.dest = dest
def accumulate(self, event: Event, value: Any) -> Any:
return event[self.src] if self.predicate(event) else value
def update(self, event: Event, value: Any) -> None:
if not self.predicate(event):
event[self.dest] = value
class Sum(Operation):
"""Compute a scalar sum of previous events.
E.g. counting events of a specific kind.
"""
def __init__(self, scalarfn: ScalarFn, name: str):
self.scalarfn = scalarfn
self.name = name
def initial(self) -> float:
return 0
def accumulate(self, event: Event, value: float) -> float:
return value + self.scalarfn(event)
def update(self, event: Event, value: float) -> None:
event[self.name] = value
class Window(Operation):
"""Aggregate over a sliding window of preceding events matching a predicate."""
def __init__(
self, predicate: Predicate, size: Optional[int], reduction: Reduction, name: str
):
self.predicate = predicate
self.size = size
self.reduction = reduction
self.name = name
def initial(self) -> List[Event]:
return []
def accumulate(self, event: Event, value: List[Event]) -> List[Event]:
if self.predicate(event):
value.append(event)
if self.size is not None and len(value) > self.size:
value = value[-self.size :]
return value
def update(self, event: Event, value: List[Event]) -> None:
event[self.name] = self.reduction(value)
class Group(Operation):
"""Create a single operation that applies multiple operations in series."""
def __init__(self, operations: Iterable[Operation]):
self.operations = operations
def initial(self) -> List[Any]:
return [op.initial() for op in self.operations]
def accumulate(self, event: Event, value: List[Any]) -> List[Any]:
return [
op.accumulate(event, opval) for op, opval in zip(self.operations, value)
]
def update(self, event: Event, value: List[Any]) -> None:
for op, opval in zip(self.operations, value):
op.update(event, opval)
class When(Operation):
"""[Wrapper] Conditionally run update() for an operation.
E.g. with `kind("step")` to only update step events
"""
def __init__(self, predicate: Predicate, body: Operation):
self.predicate = predicate
self.body = body
def initial(self) -> Any:
return self.body.initial()
def accumulate(self, event: Event, value: Any) -> Any:
return self.body.accumulate(event, value)
def update(self, event: Event, value: Any) -> None:
if self.predicate(event):
self.body.update(event, value)
class Duck(Operation):
"""Swallow key errors to make an operation "duck typed" on the event.
Note: if the wrapped event raises a `KeyError`, it must leave `event`
unchanged.
"""
def __init__(self, body: Operation):
self.body = body
def initial(self) -> Any:
return self.body.initial()
def accumulate(self, event: Event, value: Any) -> Any:
try:
return self.body.accumulate(event, value)
except KeyError:
return value
def update(self, event: Event, value: Any) -> None:
try:
self.body.update(event, value)
except KeyError:
pass
def _auto_name(
name: Optional[str], fn: Callable[[Any], Any], prefix: str, context: str
) -> str:
if name is not None:
return name
if fn.__name__ in {None, "<lambda>"}:
raise ValueError(
f"If `{context}()` using a lambda, you must provide a `name=...`"
)
return prefix + fn.__name__
def filter(predicate: AutoPredicate) -> BaseOperation:
"""An operation to filter an event stream by predicate or kind.
For example:
filter("valid")
filter(lambda event: event["step"] >= 100)
"""
pred = to_predicate(predicate)
return lambda events: builtins.filter(pred, events)
def map(mapping: Mapping, name: Optional[str] = None) -> Operation:
"""An operation to apply a "pointwise" mapping function to events.
The default `name` is `mapping.__name__`, unless None or "<lambda>".
For example:
map(lambda event: 1 - event["error_rate"], "accuracy")
"""
return Map(mapping, _auto_name(name, mapping, prefix="", context="map"))
def copy(predicate: AutoPredicate, src: str, dest: Optional[str] = None) -> Operation:
"""An operation to copy a field from a previous event.
The default `dest` is `src`.
For example:
copy("valid", "loss", "last_valid_loss")
Equivalent to:
value = None
for event in events:
if predicate(event):
value = event[src]
else:
event[dest] = value
"""
return Copy(to_predicate(predicate), src, dest or src)
def header(src: str, dest: Optional[str] = None) -> Operation:
"""An operation to copy a field from the header.
Equivalent to `copy("header", src, dest)`.
For example:
header("id")
header("learning_rate", "lr")
"""
return Copy(kind("header"), src, dest or src)
def sum(scalarfn: AutoScalarFn, name: Optional[str] = None) -> Operation:
"""An operation to sum values from previous events.
The default `name` is `sum_{scalarfn.__name__}`, unless None or "<lambda>".
For example:
sum(ops.get("examples"))
sum(lambda event: event["examples"], "sum_examples")
"""
scalarfn = to_predicate(scalarfn)
return Sum(scalarfn, _auto_name(name, scalarfn, prefix="sum_", context="sum"))
def count(predicate: AutoPredicate, name: Optional[str] = None) -> Operation:
"""An operation to count occurrences of previous events.
The default `name` is `predicate.__name__`, unless None or "<lambda>".
For example:
count("step")
count(lambda event: event["loss"] > 10, "n_large_losses")
"""
predicate = to_predicate(predicate)
return Sum(predicate, _auto_name(name, predicate, prefix="", context="count"))
def window(
predicate: AutoPredicate,
size: Optional[int],
reduction: Reduction,
name: Optional[str] = None,
) -> Operation:
"""An operation to compute a statistic over a window of previous events.
The default `name` is `reduction.__name__`, unless None or "<lambda>".
For example:
window("step", 10, ops.reduce_mean("loss"), "train_loss")
window("step", 10,
lambda events: np.mean([e["loss"] for e in events]),
"train_loss")
Equivalent to:
previous = []
for event in events:
event[name] = reduction(previous)
if predicate(event):
previous = (previous + [event])[-size:]
"""
return Window(
to_predicate(predicate),
size,
reduction,
_auto_name(name, reduction, prefix="", context="window"),
)
def when(predicate: AutoPredicate, body: Operation) -> Operation:
"""An operation wrapper that only updates events when `predicate(event)`.
For example:
when("valid", ops.map(lambda event: 1 - event["error_rate"], "accuracy"))
"""
return When(to_predicate(predicate), body)
def group(*operations: Operation) -> Operation:
"""An operation wrapper to apply multiple operations in a single pass.
All `op.update()` are run first, followed by `op.accumulate()`.
For example:
group(ops.header("id"), ops.count("step"))
"""
return operations[0] if len(operations) == 1 else Group(operations)
def duck(operation: Operation) -> Operation:
"""An operation wrapper to catch and ignore any KeyError from the operation.
This can be used to "duck type" an operation & only apply it to events with
the expected keys.
For example:
duck(ops.map(lambda event: 1 - event["error_rate"], "accuracy"))
"""
return Duck(operation)
# Predicate/Scalarfn
def kind(value: str) -> Predicate:
"""A predicate to match events with `{"kind": value}`."""
def predicate(event: Event) -> bool:
return event.get("kind") == value
predicate.__name__ = value
return predicate
def to_predicate(
kind_or_fn: Union[str, Callable[[Event], T]]
) -> Callable[[Event], Union[bool, T]]:
"""Automatic conversion from string to predicate `kind(kind_or_fn)`."""
if isinstance(kind_or_fn, str):
return kind(kind_or_fn)
return kind_or_fn
def get(key: str, required: bool = False) -> ScalarFn:
"""A scalarfn for use with `sum` that gets a key from the event.
If `required`, raise a KeyError if the key is missing (maybe useful with `duck`),
otherwise returns `None`.
Functionally equivalent to `operators.itemgetter(key)` when required is True,
or `lambda e: e.get(key)` if required is False.
Unlike lambdas or `operators`, this sets `__name__` so that there is a sensible
default name for the generated event.
For example:
ops.sum(get("examples")) # sums "examples" => "sum_examples"
"""
def scalarfn(event: Event) -> Any:
return event[key] if required else event.get(key)
scalarfn.__name__ = key
return scalarfn
# Reductions
def reduce_mean(key: str) -> Callable[[Iterable[Event]], Optional[float]]:
"""A reduction for use with `window`, that computes the mean of a scalar key.
If passed an empty list, returns `None`.
"""
def reduce(events: Iterable[Event]) -> Optional[float]:
total = 0
n = 0
for event in events:
total += event[key]
n += 1
return total / n if n else None
reduce.__name__ = f"mean_{key}"
return reduce
Functions
def copy(predicate: Union[str, Callable[[Dict[str, Any]], bool]], src: str, dest: Union[str, NoneType] = None) ‑> Operation
-
An operation to copy a field from a previous event.
The default
dest
issrc
.For example:
copy("valid", "loss", "last_valid_loss")
Equivalent to:
value = None for event in events: if predicate(event): value = event[src] else: event[dest] = value
Expand source code
def copy(predicate: AutoPredicate, src: str, dest: Optional[str] = None) -> Operation: """An operation to copy a field from a previous event. The default `dest` is `src`. For example: copy("valid", "loss", "last_valid_loss") Equivalent to: value = None for event in events: if predicate(event): value = event[src] else: event[dest] = value """ return Copy(to_predicate(predicate), src, dest or src)
def count(predicate: Union[str, Callable[[Dict[str, Any]], bool]], name: Union[str, NoneType] = None) ‑> Operation
-
An operation to count occurrences of previous events.
The default
name
ispredicate.__name__
, unless None or "". For example:
count("step") count(lambda event: event["loss"] > 10, "n_large_losses")
Expand source code
def count(predicate: AutoPredicate, name: Optional[str] = None) -> Operation: """An operation to count occurrences of previous events. The default `name` is `predicate.__name__`, unless None or "<lambda>". For example: count("step") count(lambda event: event["loss"] > 10, "n_large_losses") """ predicate = to_predicate(predicate) return Sum(predicate, _auto_name(name, predicate, prefix="", context="count"))
def duck(operation: Operation) ‑> Operation
-
An operation wrapper to catch and ignore any KeyError from the operation.
This can be used to "duck type" an operation & only apply it to events with the expected keys.
For example:
duck(ops.map(lambda event: 1 - event["error_rate"], "accuracy"))
Expand source code
def duck(operation: Operation) -> Operation: """An operation wrapper to catch and ignore any KeyError from the operation. This can be used to "duck type" an operation & only apply it to events with the expected keys. For example: duck(ops.map(lambda event: 1 - event["error_rate"], "accuracy")) """ return Duck(operation)
def filter(predicate: Union[str, Callable[[Dict[str, Any]], bool]]) ‑> Callable[[Iterator[Dict[str, Any]]], Iterator[Dict[str, Any]]]
-
An operation to filter an event stream by predicate or kind.
For example:
filter("valid") filter(lambda event: event["step"] >= 100)
Expand source code
def filter(predicate: AutoPredicate) -> BaseOperation: """An operation to filter an event stream by predicate or kind. For example: filter("valid") filter(lambda event: event["step"] >= 100) """ pred = to_predicate(predicate) return lambda events: builtins.filter(pred, events)
def get(key: str, required: bool = False) ‑> Callable[[Dict[str, Any]], Union[bool, float]]
-
A scalarfn for use with
sum()
that gets a key from the event.If
required
, raise a KeyError if the key is missing (maybe useful withduck()
), otherwise returnsNone
.Functionally equivalent to
operators.itemgetter(key)
when required is True, orlambda e: e.get(key)
if required is False.Unlike lambdas or
operators
, this sets__name__
so that there is a sensible default name for the generated event.For example:
ops.sum(get("examples")) # sums "examples" => "sum_examples"
Expand source code
def get(key: str, required: bool = False) -> ScalarFn: """A scalarfn for use with `sum` that gets a key from the event. If `required`, raise a KeyError if the key is missing (maybe useful with `duck`), otherwise returns `None`. Functionally equivalent to `operators.itemgetter(key)` when required is True, or `lambda e: e.get(key)` if required is False. Unlike lambdas or `operators`, this sets `__name__` so that there is a sensible default name for the generated event. For example: ops.sum(get("examples")) # sums "examples" => "sum_examples" """ def scalarfn(event: Event) -> Any: return event[key] if required else event.get(key) scalarfn.__name__ = key return scalarfn
def group(*operations: Operation) ‑> Operation
-
An operation wrapper to apply multiple operations in a single pass.
All
op.update()
are run first, followed byop.accumulate()
.For example:
group(ops.header("id"), ops.count("step"))
Expand source code
def group(*operations: Operation) -> Operation: """An operation wrapper to apply multiple operations in a single pass. All `op.update()` are run first, followed by `op.accumulate()`. For example: group(ops.header("id"), ops.count("step")) """ return operations[0] if len(operations) == 1 else Group(operations)
def header(src: str, dest: Union[str, NoneType] = None) ‑> Operation
-
An operation to copy a field from the header.
Equivalent to
copy("header", src, dest)
.For example:
header("id") header("learning_rate", "lr")
Expand source code
def header(src: str, dest: Optional[str] = None) -> Operation: """An operation to copy a field from the header. Equivalent to `copy("header", src, dest)`. For example: header("id") header("learning_rate", "lr") """ return Copy(kind("header"), src, dest or src)
def kind(value: str) ‑> Callable[[Dict[str, Any]], bool]
-
A predicate to match events with
{"kind": value}
.Expand source code
def kind(value: str) -> Predicate: """A predicate to match events with `{"kind": value}`.""" def predicate(event: Event) -> bool: return event.get("kind") == value predicate.__name__ = value return predicate
def map(mapping: Callable[[Dict[str, Any]], Any], name: Union[str, NoneType] = None) ‑> Operation
-
An operation to apply a "pointwise" mapping function to events.
The default
name
ismapping.__name__
, unless None or "". For example:
map(lambda event: 1 - event["error_rate"], "accuracy")
Expand source code
def map(mapping: Mapping, name: Optional[str] = None) -> Operation: """An operation to apply a "pointwise" mapping function to events. The default `name` is `mapping.__name__`, unless None or "<lambda>". For example: map(lambda event: 1 - event["error_rate"], "accuracy") """ return Map(mapping, _auto_name(name, mapping, prefix="", context="map"))
def reduce_mean(key: str) ‑> Callable[[Iterable[Dict[str, Any]]], Union[float, NoneType]]
-
A reduction for use with
window()
, that computes the mean of a scalar key.If passed an empty list, returns
None
.Expand source code
def reduce_mean(key: str) -> Callable[[Iterable[Event]], Optional[float]]: """A reduction for use with `window`, that computes the mean of a scalar key. If passed an empty list, returns `None`. """ def reduce(events: Iterable[Event]) -> Optional[float]: total = 0 n = 0 for event in events: total += event[key] n += 1 return total / n if n else None reduce.__name__ = f"mean_{key}" return reduce
def sum(scalarfn: Union[str, Callable[[Dict[str, Any]], Union[bool, float]]], name: Union[str, NoneType] = None) ‑> Operation
-
An operation to sum values from previous events.
The default
name
issum_{scalarfn.__name__}
, unless None or "". For example:
sum(ops.get("examples")) sum(lambda event: event["examples"], "sum_examples")
Expand source code
def sum(scalarfn: AutoScalarFn, name: Optional[str] = None) -> Operation: """An operation to sum values from previous events. The default `name` is `sum_{scalarfn.__name__}`, unless None or "<lambda>". For example: sum(ops.get("examples")) sum(lambda event: event["examples"], "sum_examples") """ scalarfn = to_predicate(scalarfn) return Sum(scalarfn, _auto_name(name, scalarfn, prefix="sum_", context="sum"))
def to_predicate(kind_or_fn: Union[str, Callable[[Dict[str, Any]], ~T]]) ‑> Callable[[Dict[str, Any]], Union[bool, ~T]]
-
Automatic conversion from string to predicate
kind()(kind_or_fn)
.Expand source code
def to_predicate( kind_or_fn: Union[str, Callable[[Event], T]] ) -> Callable[[Event], Union[bool, T]]: """Automatic conversion from string to predicate `kind(kind_or_fn)`.""" if isinstance(kind_or_fn, str): return kind(kind_or_fn) return kind_or_fn
def when(predicate: Union[str, Callable[[Dict[str, Any]], bool]], body: Operation) ‑> Operation
-
An operation wrapper that only updates events when
predicate(event)
.For example:
when("valid", ops.map(lambda event: 1 - event["error_rate"], "accuracy"))
Expand source code
def when(predicate: AutoPredicate, body: Operation) -> Operation: """An operation wrapper that only updates events when `predicate(event)`. For example: when("valid", ops.map(lambda event: 1 - event["error_rate"], "accuracy")) """ return When(to_predicate(predicate), body)
def window(predicate: Union[str, Callable[[Dict[str, Any]], bool]], size: Union[int, NoneType], reduction: Callable[[Iterable[Dict[str, Any]]], Any], name: Union[str, NoneType] = None) ‑> Operation
-
An operation to compute a statistic over a window of previous events.
The default
name
isreduction.__name__
, unless None or "". For example:
window("step", 10, ops.reduce_mean("loss"), "train_loss") window("step", 10, lambda events: np.mean([e["loss"] for e in events]), "train_loss")
Equivalent to:
previous = [] for event in events: event[name] = reduction(previous) if predicate(event): previous = (previous + [event])[-size:]
Expand source code
def window( predicate: AutoPredicate, size: Optional[int], reduction: Reduction, name: Optional[str] = None, ) -> Operation: """An operation to compute a statistic over a window of previous events. The default `name` is `reduction.__name__`, unless None or "<lambda>". For example: window("step", 10, ops.reduce_mean("loss"), "train_loss") window("step", 10, lambda events: np.mean([e["loss"] for e in events]), "train_loss") Equivalent to: previous = [] for event in events: event[name] = reduction(previous) if predicate(event): previous = (previous + [event])[-size:] """ return Window( to_predicate(predicate), size, reduction, _auto_name(name, reduction, prefix="", context="window"), )
Classes
class Copy (predicate: Callable[[Dict[str, Any]], bool], src: str, dest: str)
-
Copy a value from a previous event.
Expand source code
class Copy(Operation): """Copy a value from a previous event.""" def __init__(self, predicate: Predicate, src: str, dest: str): self.predicate = predicate self.src = src self.dest = dest def accumulate(self, event: Event, value: Any) -> Any: return event[self.src] if self.predicate(event) else value def update(self, event: Event, value: Any) -> None: if not self.predicate(event): event[self.dest] = value
Ancestors
- Operation
- abc.ABC
Inherited members
class Duck (body: Operation)
-
Swallow key errors to make an operation "duck typed" on the event.
Note: if the wrapped event raises a
KeyError
, it must leaveevent
unchanged.Expand source code
class Duck(Operation): """Swallow key errors to make an operation "duck typed" on the event. Note: if the wrapped event raises a `KeyError`, it must leave `event` unchanged. """ def __init__(self, body: Operation): self.body = body def initial(self) -> Any: return self.body.initial() def accumulate(self, event: Event, value: Any) -> Any: try: return self.body.accumulate(event, value) except KeyError: return value def update(self, event: Event, value: Any) -> None: try: self.body.update(event, value) except KeyError: pass
Ancestors
- Operation
- abc.ABC
Inherited members
class Group (operations: Iterable[Operation])
-
Create a single operation that applies multiple operations in series.
Expand source code
class Group(Operation): """Create a single operation that applies multiple operations in series.""" def __init__(self, operations: Iterable[Operation]): self.operations = operations def initial(self) -> List[Any]: return [op.initial() for op in self.operations] def accumulate(self, event: Event, value: List[Any]) -> List[Any]: return [ op.accumulate(event, opval) for op, opval in zip(self.operations, value) ] def update(self, event: Event, value: List[Any]) -> None: for op, opval in zip(self.operations, value): op.update(event, opval)
Ancestors
- Operation
- abc.ABC
Inherited members
class Map (mapping: Callable[[Dict[str, Any]], Any], name: str)
-
Apply an elementwise mapping function.
Expand source code
class Map(Operation): """Apply an elementwise mapping function.""" def __init__(self, mapping: Mapping, name: str): self.mapping = mapping self.name = name def update(self, event: Event, value: None) -> None: event[self.name] = self.mapping(event)
Ancestors
- Operation
- abc.ABC
Inherited members
class Operation
-
Base class for log denormalization operations.
An operation is a small program that is run over a sequence of events from a log:
value = operation.initial() for event in events: operation.update(event, value) value = operation.accumulate(event, value)
Expand source code
class Operation(abc.ABC): """Base class for log denormalization operations. An operation is a small program that is run over a sequence of events from a log: value = operation.initial() for event in events: operation.update(event, value) value = operation.accumulate(event, value) """ # pylint:disable=no-self-use def initial(self) -> Any: """[Optional] An initial value for the reduction.""" return None def accumulate( self, event: Event, value: Any # pylint:disable=unused-argument ) -> Any: """[Optional] A reducing function to generate a new value.""" return None @abc.abstractmethod def update(self, event: Event, value: Any) -> None: """Applies the operation to update the given event. This operation is run before the event is `accumulate`d into `value`. The default implementation of `__call__` makes a shallow copy of `event` externally, so that it's safe to add or remove toplevel keys, however the `update()` implementation is responsible for deep copies, if required. Note that this may not be called for every event that accumulate() is called on (see `When`). """ raise NotImplementedError def __call__(self, events: Iterator[Event]) -> Iterator[Event]: """Run the operation over an event stream, yielding a new stream.""" # pylint:disable=assignment-from-none value = self.initial() for event in events: event = event.copy() self.update(event, value) yield event value = self.accumulate(event, value)
Ancestors
- abc.ABC
Subclasses
Methods
def accumulate(self, event: Dict[str, Any], value: Any) ‑> Any
-
[Optional] A reducing function to generate a new value.
Expand source code
def accumulate( self, event: Event, value: Any # pylint:disable=unused-argument ) -> Any: """[Optional] A reducing function to generate a new value.""" return None
def initial(self) ‑> Any
-
[Optional] An initial value for the reduction.
Expand source code
def initial(self) -> Any: """[Optional] An initial value for the reduction.""" return None
def update(self, event: Dict[str, Any], value: Any) ‑> NoneType
-
Applies the operation to update the given event.
This operation is run before the event is
accumulate
d intovalue
.The default implementation of
__call__
makes a shallow copy ofevent
externally, so that it's safe to add or remove toplevel keys, however theupdate()
implementation is responsible for deep copies, if required.Note that this may not be called for every event that accumulate() is called on (see
When
).Expand source code
@abc.abstractmethod def update(self, event: Event, value: Any) -> None: """Applies the operation to update the given event. This operation is run before the event is `accumulate`d into `value`. The default implementation of `__call__` makes a shallow copy of `event` externally, so that it's safe to add or remove toplevel keys, however the `update()` implementation is responsible for deep copies, if required. Note that this may not be called for every event that accumulate() is called on (see `When`). """ raise NotImplementedError
class Sum (scalarfn: Callable[[Dict[str, Any]], Union[bool, float]], name: str)
-
Compute a scalar sum of previous events.
E.g. counting events of a specific kind.
Expand source code
class Sum(Operation): """Compute a scalar sum of previous events. E.g. counting events of a specific kind. """ def __init__(self, scalarfn: ScalarFn, name: str): self.scalarfn = scalarfn self.name = name def initial(self) -> float: return 0 def accumulate(self, event: Event, value: float) -> float: return value + self.scalarfn(event) def update(self, event: Event, value: float) -> None: event[self.name] = value
Ancestors
- Operation
- abc.ABC
Inherited members
class When (predicate: Callable[[Dict[str, Any]], bool], body: Operation)
-
[Wrapper] Conditionally run update() for an operation.
E.g. with
kind("step")
to only update step eventsExpand source code
class When(Operation): """[Wrapper] Conditionally run update() for an operation. E.g. with `kind("step")` to only update step events """ def __init__(self, predicate: Predicate, body: Operation): self.predicate = predicate self.body = body def initial(self) -> Any: return self.body.initial() def accumulate(self, event: Event, value: Any) -> Any: return self.body.accumulate(event, value) def update(self, event: Event, value: Any) -> None: if self.predicate(event): self.body.update(event, value)
Ancestors
- Operation
- abc.ABC
Inherited members
class Window (predicate: Callable[[Dict[str, Any]], bool], size: Union[int, NoneType], reduction: Callable[[Iterable[Dict[str, Any]]], Any], name: str)
-
Aggregate over a sliding window of preceding events matching a predicate.
Expand source code
class Window(Operation): """Aggregate over a sliding window of preceding events matching a predicate.""" def __init__( self, predicate: Predicate, size: Optional[int], reduction: Reduction, name: str ): self.predicate = predicate self.size = size self.reduction = reduction self.name = name def initial(self) -> List[Event]: return [] def accumulate(self, event: Event, value: List[Event]) -> List[Event]: if self.predicate(event): value.append(event) if self.size is not None and len(value) > self.size: value = value[-self.size :] return value def update(self, event: Event, value: List[Event]) -> None: event[self.name] = self.reduction(value)
Ancestors
- Operation
- abc.ABC
Inherited members