Module trainlog.logger

Friendly API for writing log files.

Expand source code
"""Friendly API for writing log files."""

from __future__ import annotations

import builtins
import contextlib
import datetime
import random
import time
from types import TracebackType
from typing import (
    Any,
    Callable,
    ContextManager,
    Dict,
    Iterable,
    Iterator,
    Optional,
    Protocol,
    Tuple,
    Type,
)

from . import io

Event = Dict[str, Any]
EventAnnotation = Callable[[Event], Optional[Event]]
Annotation = Callable[[Optional[Event]], Optional[EventAnnotation]]
LogLineScope = Callable[[Event], ContextManager[None]]


class Writer(Protocol):
    """Stream capable of writing general objects (e.g. io.JsonLinesIO)."""

    def write(self, obj: Event) -> None:
        """Write out a single event."""
        raise NotImplementedError

    def flush(self) -> None:
        """Flush the underlying stream - write out the buffered data."""
        raise NotImplementedError

    def close(self) -> None:
        """Close the underlying stream."""
        raise NotImplementedError


@contextlib.contextmanager
def add_duration(event: Event, ndigits: int = 3) -> Iterator[None]:
    """[LogLineScope] Add the time elapsed during a `log.adding` scope."""
    start = time.time()
    yield
    event["duration"] = round(time.time() - start, ndigits)


def set_time(header: Optional[Event], ndigits: int = 3) -> EventAnnotation:
    """[Annotation] Set the header date & elapsed fields."""
    start = time.time()
    if header is not None:
        header["time"] = datetime.datetime.now().isoformat()
    return lambda event: dict(elapsed=round(time.time() - start, ndigits))


def set_id(header: Optional[Event]) -> None:
    """[Annotation] Set the ID field of the header."""
    if header is not None:
        header["id"] = random.Random().randint(0, 1 << 64)


KIND_HEADER = "header"
DEFAULT_ANNOTATE: Tuple[Annotation, ...] = (set_time, set_id)
DEFAULT_SCOPES: Tuple[LogLineScope, ...] = (add_duration,)


class LogLine:
    """A mutable builder for a single line within a log.

    For example:

        with log.adding(kind="eval") as line:
            line.set(loss=compute_loss())
    """

    def __init__(self, log: Log, scopes: Iterable[LogLineScope], event: Event):
        self.log = log
        self.scopes = scopes
        self.event = event
        self.stack = contextlib.ExitStack()
        for scope in self.scopes:
            self.stack.enter_context(scope(self.event))
        self.finished = False

    def __enter__(self) -> LogLine:
        return self

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_value: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> None:
        self.add_to_log()

    def set(self, **kwargs: Any) -> None:
        """Set/reset event parameters in the log"""
        if self.finished:
            raise ValueError("Log line set() after being added to the log")
        self.event.update(kwargs)

    def add_to_log(self) -> None:
        """Add the scope to the underlying log."""
        if self.finished:
            raise ValueError("Trying to add a log line to the log multiple times")
        self.stack.close()
        self.log.add(**self.event)
        self.finished = True


class Log:
    """A general-purpose event log.

    For example:

        with logger.open("log.jsonl") as log:
            log.add(kind="step", loss=step_loss())
            log.add(kind="step", loss=step_loss())
            with log.adding(kind="eval") as line:
                line.set(loss=eval_loss())
    """

    def __init__(  # pylint:disable=dangerous-default-value
        self,
        writer: Writer,
        header: Event = {},
        annotate: Iterable[Annotation] = (),
        default_annotate: bool = True,
    ):
        self.writer = writer
        header_event = None if header is None else dict(kind=KIND_HEADER, **header)
        all_annotate = (DEFAULT_ANNOTATE if default_annotate else ()) + tuple(annotate)
        self.annotators = [
            annotator
            for annotator in (fn(header_event) for fn in all_annotate)
            if annotator is not None
        ]
        if header_event is not None:
            self.add(**header_event)

    def __enter__(self) -> Log:
        return self

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_value: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> None:
        self.close()

    def close(self) -> None:
        """Close the underlying stream (the log cannot be used afterwards).

        Note that this is called automatically if the log is used in a context manager
        (recommended) and it is an error to close() twice.
        """
        self.writer.close()

    def add(self, kind: Optional[str] = None, **args: Any) -> None:
        """Immediately add a single event to the log.

        The key "kind" is optional, but recommended if the log contains different kinds
        of events, to aid info extraction.

        Values must be serializable by the current writer (e.g. JSON-serializable).

        Note that extra keys may be added by "Annotations", which are set up in the
        Log constructor.
        """
        line = args if kind is None else dict(kind=kind, **args)
        for annotator in self.annotators:
            out = annotator(line)
            if out is not None:
                line.update(out)
        self.writer.write(line)
        self.writer.flush()

    def adding(
        self,
        kind: Optional[str] = None,
        _scopes: Iterable[LogLineScope] = (),
        _default_scopes: bool = True,
        **args: Any,
    ) -> LogLine:
        """Start building an event to add to the log.

        The key "kind" is optional, but recommended if the log contains different kinds
        of events, to aid info extraction.

        We recommend using this as a context manager:

            with log.adding(kind="eval") as line:
                line.set(loss=eval_loss())

        Scopes allow automated logging of before-and-after state, for example the
        (built-in, automatically added) scope `add_duration`. Disable built-in scopes
        with `default_scopes=False`.
        """
        all_scopes = tuple(_scopes) + (DEFAULT_SCOPES if _default_scopes else ())
        return LogLine(
            self,
            scopes=all_scopes,
            event=args if kind is None else dict(kind=kind, **args),
        )


class JsonLinesFileLog(Log):
    """A standard log that uses a `trainlog.io.JsonLinesIO` writer to a local file."""

    def __init__(
        self,
        path: str,
        gzip_on_close: bool = True,
        dump_args: Optional[Dict[str, Any]] = None,
        **args: Any,
    ):
        """Create a JsonLines writer the the given local path.

        If `gzip_on_close` is True, after the log is closed, the file is compressed
        to `<path>.gz` and the original deleted.
        """
        super().__init__(
            io.JsonLinesIO[Event](builtins.open(path, "w"), dump_args=dump_args),
            **args,
        )
        self.path = path
        self.gzip_on_close = gzip_on_close

    def close(self) -> None:
        """Close the underlying stream, optionally converting to gzip."""
        super().close()
        if self.gzip_on_close:
            io.gzip(self.path)


def open(  # pylint:disable=redefined-builtin
    path: str,
    _gzip_on_close: bool = True,
    _annotate: Iterable[Annotation] = (),
    _default_annotate: bool = True,
    _add_header: bool = True,
    **header: Any,
) -> JsonLinesFileLog:
    """Open a logger writing to a local JsonLines file.

    This is just syntactic sugar; if customization is needed, directly using
    `JsonLinesFileLog` or `Log` may be preferable.
    """
    if header and not _add_header:
        raise ValueError(
            "Trying to add keys to a header with _add_header=False.\n"
            f"Either remove the header {header.keys()} or use open(_add_header=True)."
        )
    return JsonLinesFileLog(
        path,
        gzip_on_close=_gzip_on_close,
        annotate=_annotate,
        default_annotate=_default_annotate,
        header=header if _add_header else None,
    )

Functions

def add_duration(event: Event, ndigits: int = 3) ‑> Iterator[NoneType]

[LogLineScope] Add the time elapsed during a log.adding scope.

Expand source code
@contextlib.contextmanager
def add_duration(event: Event, ndigits: int = 3) -> Iterator[None]:
    """[LogLineScope] Add the time elapsed during a `log.adding` scope."""
    start = time.time()
    yield
    event["duration"] = round(time.time() - start, ndigits)
def open(path: str, **header: Any) ‑> JsonLinesFileLog

Open a logger writing to a local JsonLines file.

This is just syntactic sugar; if customization is needed, directly using JsonLinesFileLog or Log may be preferable.

Expand source code
def open(  # pylint:disable=redefined-builtin
    path: str,
    _gzip_on_close: bool = True,
    _annotate: Iterable[Annotation] = (),
    _default_annotate: bool = True,
    _add_header: bool = True,
    **header: Any,
) -> JsonLinesFileLog:
    """Open a logger writing to a local JsonLines file.

    This is just syntactic sugar; if customization is needed, directly using
    `JsonLinesFileLog` or `Log` may be preferable.
    """
    if header and not _add_header:
        raise ValueError(
            "Trying to add keys to a header with _add_header=False.\n"
            f"Either remove the header {header.keys()} or use open(_add_header=True)."
        )
    return JsonLinesFileLog(
        path,
        gzip_on_close=_gzip_on_close,
        annotate=_annotate,
        default_annotate=_default_annotate,
        header=header if _add_header else None,
    )
def set_id(header: Optional[Event]) ‑> NoneType

[Annotation] Set the ID field of the header.

Expand source code
def set_id(header: Optional[Event]) -> None:
    """[Annotation] Set the ID field of the header."""
    if header is not None:
        header["id"] = random.Random().randint(0, 1 << 64)
def set_time(header: Optional[Event], ndigits: int = 3) ‑> Callable[[Dict[str, Any]], Union[Dict[str, Any], NoneType]]

[Annotation] Set the header date & elapsed fields.

Expand source code
def set_time(header: Optional[Event], ndigits: int = 3) -> EventAnnotation:
    """[Annotation] Set the header date & elapsed fields."""
    start = time.time()
    if header is not None:
        header["time"] = datetime.datetime.now().isoformat()
    return lambda event: dict(elapsed=round(time.time() - start, ndigits))

Classes

class JsonLinesFileLog (path: str, gzip_on_close: bool = True, dump_args: Optional[Dict[str, Any]] = None, **args: Any)

A standard log that uses a JsonLinesIO writer to a local file.

Create a JsonLines writer the the given local path.

If gzip_on_close is True, after the log is closed, the file is compressed to <path>.gz and the original deleted.

Expand source code
class JsonLinesFileLog(Log):
    """A standard log that uses a `trainlog.io.JsonLinesIO` writer to a local file."""

    def __init__(
        self,
        path: str,
        gzip_on_close: bool = True,
        dump_args: Optional[Dict[str, Any]] = None,
        **args: Any,
    ):
        """Create a JsonLines writer the the given local path.

        If `gzip_on_close` is True, after the log is closed, the file is compressed
        to `<path>.gz` and the original deleted.
        """
        super().__init__(
            io.JsonLinesIO[Event](builtins.open(path, "w"), dump_args=dump_args),
            **args,
        )
        self.path = path
        self.gzip_on_close = gzip_on_close

    def close(self) -> None:
        """Close the underlying stream, optionally converting to gzip."""
        super().close()
        if self.gzip_on_close:
            io.gzip(self.path)

Ancestors

Methods

def close(self) ‑> NoneType

Close the underlying stream, optionally converting to gzip.

Expand source code
def close(self) -> None:
    """Close the underlying stream, optionally converting to gzip."""
    super().close()
    if self.gzip_on_close:
        io.gzip(self.path)

Inherited members

class Log (writer: Writer, header: Event = {}, annotate: Iterable[Annotation] = (), default_annotate: bool = True)

A general-purpose event log.

For example:

with logger.open("log.jsonl") as log:
    log.add(kind="step", loss=step_loss())
    log.add(kind="step", loss=step_loss())
    with log.adding(kind="eval") as line:
        line.set(loss=eval_loss())
Expand source code
class Log:
    """A general-purpose event log.

    For example:

        with logger.open("log.jsonl") as log:
            log.add(kind="step", loss=step_loss())
            log.add(kind="step", loss=step_loss())
            with log.adding(kind="eval") as line:
                line.set(loss=eval_loss())
    """

    def __init__(  # pylint:disable=dangerous-default-value
        self,
        writer: Writer,
        header: Event = {},
        annotate: Iterable[Annotation] = (),
        default_annotate: bool = True,
    ):
        self.writer = writer
        header_event = None if header is None else dict(kind=KIND_HEADER, **header)
        all_annotate = (DEFAULT_ANNOTATE if default_annotate else ()) + tuple(annotate)
        self.annotators = [
            annotator
            for annotator in (fn(header_event) for fn in all_annotate)
            if annotator is not None
        ]
        if header_event is not None:
            self.add(**header_event)

    def __enter__(self) -> Log:
        return self

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_value: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> None:
        self.close()

    def close(self) -> None:
        """Close the underlying stream (the log cannot be used afterwards).

        Note that this is called automatically if the log is used in a context manager
        (recommended) and it is an error to close() twice.
        """
        self.writer.close()

    def add(self, kind: Optional[str] = None, **args: Any) -> None:
        """Immediately add a single event to the log.

        The key "kind" is optional, but recommended if the log contains different kinds
        of events, to aid info extraction.

        Values must be serializable by the current writer (e.g. JSON-serializable).

        Note that extra keys may be added by "Annotations", which are set up in the
        Log constructor.
        """
        line = args if kind is None else dict(kind=kind, **args)
        for annotator in self.annotators:
            out = annotator(line)
            if out is not None:
                line.update(out)
        self.writer.write(line)
        self.writer.flush()

    def adding(
        self,
        kind: Optional[str] = None,
        _scopes: Iterable[LogLineScope] = (),
        _default_scopes: bool = True,
        **args: Any,
    ) -> LogLine:
        """Start building an event to add to the log.

        The key "kind" is optional, but recommended if the log contains different kinds
        of events, to aid info extraction.

        We recommend using this as a context manager:

            with log.adding(kind="eval") as line:
                line.set(loss=eval_loss())

        Scopes allow automated logging of before-and-after state, for example the
        (built-in, automatically added) scope `add_duration`. Disable built-in scopes
        with `default_scopes=False`.
        """
        all_scopes = tuple(_scopes) + (DEFAULT_SCOPES if _default_scopes else ())
        return LogLine(
            self,
            scopes=all_scopes,
            event=args if kind is None else dict(kind=kind, **args),
        )

Subclasses

Methods

def add(self, kind: Optional[str] = None, **args: Any) ‑> NoneType

Immediately add a single event to the log.

The key "kind" is optional, but recommended if the log contains different kinds of events, to aid info extraction.

Values must be serializable by the current writer (e.g. JSON-serializable).

Note that extra keys may be added by "Annotations", which are set up in the Log constructor.

Expand source code
def add(self, kind: Optional[str] = None, **args: Any) -> None:
    """Immediately add a single event to the log.

    The key "kind" is optional, but recommended if the log contains different kinds
    of events, to aid info extraction.

    Values must be serializable by the current writer (e.g. JSON-serializable).

    Note that extra keys may be added by "Annotations", which are set up in the
    Log constructor.
    """
    line = args if kind is None else dict(kind=kind, **args)
    for annotator in self.annotators:
        out = annotator(line)
        if out is not None:
            line.update(out)
    self.writer.write(line)
    self.writer.flush()
def adding(self, kind: Optional[str] = None, **args: Any) ‑> LogLine

Start building an event to add to the log.

The key "kind" is optional, but recommended if the log contains different kinds of events, to aid info extraction.

We recommend using this as a context manager:

with log.adding(kind="eval") as line:
    line.set(loss=eval_loss())

Scopes allow automated logging of before-and-after state, for example the (built-in, automatically added) scope add_duration(). Disable built-in scopes with default_scopes=False.

Expand source code
def adding(
    self,
    kind: Optional[str] = None,
    _scopes: Iterable[LogLineScope] = (),
    _default_scopes: bool = True,
    **args: Any,
) -> LogLine:
    """Start building an event to add to the log.

    The key "kind" is optional, but recommended if the log contains different kinds
    of events, to aid info extraction.

    We recommend using this as a context manager:

        with log.adding(kind="eval") as line:
            line.set(loss=eval_loss())

    Scopes allow automated logging of before-and-after state, for example the
    (built-in, automatically added) scope `add_duration`. Disable built-in scopes
    with `default_scopes=False`.
    """
    all_scopes = tuple(_scopes) + (DEFAULT_SCOPES if _default_scopes else ())
    return LogLine(
        self,
        scopes=all_scopes,
        event=args if kind is None else dict(kind=kind, **args),
    )
def close(self) ‑> NoneType

Close the underlying stream (the log cannot be used afterwards).

Note that this is called automatically if the log is used in a context manager (recommended) and it is an error to close() twice.

Expand source code
def close(self) -> None:
    """Close the underlying stream (the log cannot be used afterwards).

    Note that this is called automatically if the log is used in a context manager
    (recommended) and it is an error to close() twice.
    """
    self.writer.close()
class LogLine (log: Log, scopes: Iterable[LogLineScope], event: Event)

A mutable builder for a single line within a log.

For example:

with log.adding(kind="eval") as line:
    line.set(loss=compute_loss())
Expand source code
class LogLine:
    """A mutable builder for a single line within a log.

    For example:

        with log.adding(kind="eval") as line:
            line.set(loss=compute_loss())
    """

    def __init__(self, log: Log, scopes: Iterable[LogLineScope], event: Event):
        self.log = log
        self.scopes = scopes
        self.event = event
        self.stack = contextlib.ExitStack()
        for scope in self.scopes:
            self.stack.enter_context(scope(self.event))
        self.finished = False

    def __enter__(self) -> LogLine:
        return self

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_value: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> None:
        self.add_to_log()

    def set(self, **kwargs: Any) -> None:
        """Set/reset event parameters in the log"""
        if self.finished:
            raise ValueError("Log line set() after being added to the log")
        self.event.update(kwargs)

    def add_to_log(self) -> None:
        """Add the scope to the underlying log."""
        if self.finished:
            raise ValueError("Trying to add a log line to the log multiple times")
        self.stack.close()
        self.log.add(**self.event)
        self.finished = True

Methods

def add_to_log(self) ‑> NoneType

Add the scope to the underlying log.

Expand source code
def add_to_log(self) -> None:
    """Add the scope to the underlying log."""
    if self.finished:
        raise ValueError("Trying to add a log line to the log multiple times")
    self.stack.close()
    self.log.add(**self.event)
    self.finished = True
def set(self, **kwargs: Any) ‑> NoneType

Set/reset event parameters in the log

Expand source code
def set(self, **kwargs: Any) -> None:
    """Set/reset event parameters in the log"""
    if self.finished:
        raise ValueError("Log line set() after being added to the log")
    self.event.update(kwargs)
class Writer (*args, **kwargs)

Stream capable of writing general objects (e.g. io.JsonLinesIO).

Expand source code
class Writer(Protocol):
    """Stream capable of writing general objects (e.g. io.JsonLinesIO)."""

    def write(self, obj: Event) -> None:
        """Write out a single event."""
        raise NotImplementedError

    def flush(self) -> None:
        """Flush the underlying stream - write out the buffered data."""
        raise NotImplementedError

    def close(self) -> None:
        """Close the underlying stream."""
        raise NotImplementedError

Ancestors

  • typing.Protocol
  • typing.Generic

Methods

def close(self) ‑> NoneType

Close the underlying stream.

Expand source code
def close(self) -> None:
    """Close the underlying stream."""
    raise NotImplementedError
def flush(self) ‑> NoneType

Flush the underlying stream - write out the buffered data.

Expand source code
def flush(self) -> None:
    """Flush the underlying stream - write out the buffered data."""
    raise NotImplementedError
def write(self, obj: Event) ‑> NoneType

Write out a single event.

Expand source code
def write(self, obj: Event) -> None:
    """Write out a single event."""
    raise NotImplementedError