Source code for forml.pipeline.payload._debug

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Debugging operators.
"""
import abc
import inspect
import logging
import pathlib
import secrets
import socket
import string
import typing
from multiprocessing import managers

from pandas.core import generic as pdtype

import forml
from forml import flow

from . import _convert

if typing.TYPE_CHECKING:
    from forml.pipeline import payload  # nopycln: import


LOGGER = logging.getLogger(__name__)


[docs]class Dumpable( flow.Actor[flow.Features, flow.Labels, flow.Result], metaclass=abc.ABCMeta ): # pylint: disable=abstract-method """Transparent actor interface that dumps the input dataset (typically to a file). Args: path: Target path to be used for dumping the content. kwargs: Optional keyword arguments to be passed to the :meth:`apply_dump` and/or :meth:`train_dump` methods. Following are the methods that can be overloaded with the actual dump action (no-op otherwise). Methods: apply_dump(features, path, **kwargs): Dump the features when in the *apply-mode*. Args: features: Input data. path: Target dump location. kwargs: Additional keyword arguments supplied via constructor. train_dump(features, labels, path, **kwargs): Dump the features and labels when in the *train-mode*. Args: features: Input features. labels: Input labels. path: Target dump location. kwargs: Additional keyword arguments supplied via constructor. """ def __init__(self, path: typing.Union[str, pathlib.Path], **kwargs): self._path: pathlib.Path = pathlib.Path(path) self._kwargs: dict[str, typing.Any] = kwargs def apply(self, features: flow.Features) -> flow.Result: # pylint: disable=arguments-differ """Standard actor apply method calling the dump. Args: features: Input frames. Returns: Original unchanged frames. """ self.apply_dump(features, self._path, **self._kwargs) return features
[docs] @classmethod def apply_dump( cls, features: flow.Features, path: pathlib.Path, **kwargs # pylint: disable=unused-argument ) -> None: """Dump the features. Args: features: Input data. path: Target dump location. kwargs: Additional keyword arguments supplied via constructor. """ return None
def train(self, features: flow.Features, labels: flow.Labels, /) -> None: """Standard actor train method calling the dump. Args: features: Input features. labels: Input labels. """ self.train_dump(features, labels, self._path, **self._kwargs)
[docs] @classmethod def train_dump( cls, # pylint: disable=unused-argument features: flow.Features, labels: flow.Labels, path: pathlib.Path, **kwargs, ) -> None: """Dump the features along with labels. Args: features: Input features. labels: Input labels. path: Target dump location. kwargs: Additional keyword arguments supplied via constructor. """ return None
@classmethod def is_stateful(cls) -> bool: return cls.train_dump.__code__ is not Dumpable.train_dump.__code__ @typing.final def get_state(self) -> None: """We aren't really stateful even though `.is_stateful()` can be true so that `.train()` get engaged. Return: Empty state. """ return None def get_params(self) -> dict[str, typing.Any]: """Standard param getter. Returns: Actor params. """ return {'path': self._path, **self._kwargs} def set_params(self, path: str, **kwargs) -> None: # pylint: disable=arguments-differ """Standard params setter. Args: path: New path. kwargs: Dumper kwargs. """ self._path = path self._kwargs.update(kwargs)
[docs]class PandasCSVDumper(Dumpable[typing.Any, typing.Any, typing.Any]): """PandasCSVDumper(path: typing.Union[str, pathlib.Path], label_header: str = 'Label', converter: typing.Callable[[typing.Any, typing.Optional[typing.Sequence[str]]], pandas.core.generic.NDFrame] = pandas_read, **kwargs) A pass-through transformer that dumps the input datasets to CSV files. The write operation including the CSV encoding is implemented using the :meth:`pandas:pandas.DataFrame.to_csv` method. The input payload is automatically converted to Pandas using the provided converter implementation (defaults to internal logic). Args: path: Target path to be used for dumping the content. label_header: Column name to be used for the train labels. converter: Optional callback to be used for converting the payload to Pandas. kwargs: Optional keyword arguments to be passed to the :meth:`pandas:pandas.DataFrame.to_csv` method. """ # pylint: disable=line-too-long # noqa: E501 CSV_DEFAULTS = {'index': False} def __init__( self, path: typing.Union[str, pathlib.Path], label_header: str = 'Label', converter: typing.Callable[ [typing.Any, typing.Optional[typing.Sequence[str]]], pdtype.NDFrame ] = _convert.pandas_read, **kwargs, ): super().__init__(path, label_header=label_header, converter=converter, **kwargs) @classmethod def apply_dump( cls, features: typing.Any, path: pathlib.Path, label_header: str, # pylint: disable=unused-argument converter: typing.Callable[[typing.Any, typing.Optional[typing.Sequence[str]]], pdtype.NDFrame], **kwargs, ) -> None: """Dump the features. Args: features: Input frames. path: Target dump location. label_header: Column name to be used for the train labels. converter: Pandas converter implementation. kwargs: Additional keyword arguments supplied via constructor. Returns: Original unchanged frames. """ path.parent.mkdir(parents=True, exist_ok=True) converter(features).to_csv(path, **(cls.CSV_DEFAULTS | kwargs)) @classmethod def train_dump( cls, features: typing.Any, labels: typing.Any, path: pathlib.Path, label_header: str, converter: typing.Callable[[typing.Any, typing.Optional[typing.Sequence[str]]], pdtype.NDFrame], **kwargs, ) -> None: """Dump the features along with labels. Args: features: X table. labels: Y series. path: Target dump location. label_header: Column name to be used for the train labels. converter: Pandas converter implementation. kwargs: Additional keyword arguments supplied via constructor. """ path.parent.mkdir(parents=True, exist_ok=True) converter(features).set_index(converter(labels).rename(label_header)).reset_index().to_csv( path, **(cls.CSV_DEFAULTS | kwargs) )
[docs]class Dump(flow.Operator): """Dump(apply: flow.Builder[payload.Dumpable] = PandasCSVDumper.builder(), train: typing.Optional[flow.Builder[payload.Dumpable]] = None, *, path: typing.Optional[typing.Union[str, pathlib.Path]] = None) A transparent operator that dumps the input dataset externally (typically to a file) before passing it downstream. If supplied as a template, the operator supports interpolation of potential placeholders in the dump path (e.g. file name). The supported placeholders are: * ``$seq`` - a sequence ID that gets incremented for each particular Actor instance * ``$mode`` - a label of the mode in which the dumping occurs (``train`` or ``apply``) The path (or path template) must be provided either within the raw builder parameters or as the standalone ``path`` parameter which is used as a fallback option. Args: apply: ``Dumpable`` builder for instantiating a Dumper operator to be used in *apply-mode*. train: Optional ``Dumpable`` builder for instantiating a Dumper operator to be used in *train-mode* (otherwise using the same one as for the *apply-mode*). path: Optional path template. Raises: TypeError: If *path* is provided neither in the ``apply``/``train`` builders nor as the explicit parameter. Examples: >>> PIPELINE = ( ... preprocessing.Action1() ... >> payload.Dump(path='/tmp/foobar/post_action1-$mode-$seq.csv') ... >> preprocessing.Action2() ... >> payload.Dump(path='/tmp/foobar/post_action2-$mode-$seq.csv') ... >> model.SomeModel() ... ) """ # pylint: disable=line-too-long # noqa: E501 CSV_SUFFIX = '.csv' def __init__( self, apply: flow.Builder['payload.Dumpable'] = PandasCSVDumper.builder(), # noqa: B008 train: typing.Optional[flow.Builder['payload.Dumpable']] = None, *, path: typing.Optional[typing.Union[str, pathlib.Path]] = None, ): self._apply: typing.Callable[[int], flow.Builder['payload.Dumpable']] = self._meta_builder(apply, path, 'apply') self._train: typing.Callable[[int], flow.Builder['payload.Dumpable']] = self._meta_builder( train or apply, path, 'train' ) self._instances: int = 0 @classmethod def _meta_builder( cls, builder: flow.Builder['payload.Dumpable'], path: typing.Optional[typing.Union[str, pathlib.Path]], mode: str, ) -> typing.Callable[[int], flow.Builder['payload.Dumpable']]: """Get a function for creating a Builder instance parameterized using a sequence id and a mode string which can be used to interpolate potential placeholders in the path template. Args: builder: Raw builder to be enhanced. Optional path parameter can have template placeholders. path: Optional path with potential template placeholders. Returns: Factory function for creating a builder instance with the potential path template placeholders interpolated. """ def wrapper(seq: int) -> flow.Builder['payload.Dumpable']: """The factory function for creating a builder instance with path template interpolation. """ interpolated = string.Template(str(pathlib.Path(path))).safe_substitute(seq=seq, mode=mode) return builder.update(path=interpolated, **binding.kwargs) binding = inspect.signature(builder.actor).bind_partial(*builder.args, **builder.kwargs) binding.apply_defaults() path = binding.arguments.pop('path', path) if not path: raise TypeError('Path is required') return wrapper def compose(self, scope: flow.Composable) -> flow.Trunk: """Composition implementation. Args: scope: Left side. Returns: Composed track. """ left: flow.Trunk = scope.expand() apply: flow.Worker = flow.Worker(self._apply(self._instances), 1, 1) train: flow.Worker = flow.Worker(self._train(self._instances), 1, 1) train.train(left.train.publisher, left.label.publisher) self._instances += 1 return left.extend(apply=apply)
[docs]class Sniff(flow.Operator): """Debugging operator for capturing the passing payload and exposing it using the ``Sniff.Value.Future`` instance provided when used as a context manager. Without the context, the operator acts as a transparent identity pass-through operator. The typical use case is in combination with the :class:`runtime.virtual <forml.runtime.Virtual>` launcher and the :ref:`interactive mode <interactive>`. Examples: >>> SNIFFER = payload.Sniff() >>> with SNIFFER as future: ... SOURCE.bind(PIPELINE >> SNIFFER >> ANOTHER).launcher.train() >>> future.result()[0] """ class Captor(flow.Actor[flow.Features, flow.Labels, flow.Result]): """Actor for sniffing all inputs and passing it to the remote value.""" class Trainset(typing.NamedTuple): """Tuple of the captured features/labels.""" features: flow.Features labels: flow.Labels def __init__(self, value: typing.Optional['Sniff.Value.Client']): self._value: typing.Optional['Sniff.Value.Client'] = value def train(self, features: flow.Features, labels: flow.Labels, /) -> None: if self._value is not None: self._value.set(self.Trainset(features, labels)) def apply(self, features: flow.Features) -> flow.Result: if self._value is not None: self._value.set(features) return features def get_state(self) -> None: """Not really having any state.""" return None class Lost(forml.MissingError): """Custom error indicating absence of the result.""" class Value: """Remote value delivering facility.""" class Future: """Future object to eventually contain the sniffed value.""" def __init__(self): self._value: typing.Any = None self._empty: bool = False self._done: bool = False def result(self) -> typing.Any: """Get the future result. Returns: Future result. Raises: payload.Sniff.Lost: If the sniffer didn't capture anything. """ if not self._done: raise RuntimeError('Future still pending') if self._empty: raise Sniff.Lost('Sniffer value empty') return self._value def set_result(self, value: typing.Any) -> None: """Set the future result. Args: value: Future result value. """ assert not self._done self._value = value self._done = True def set_empty(self) -> None: """Set the result to indicate absence of any value data.""" assert not self._done self._empty = True self._done = True class Client: """Remote value client.""" def __init__(self, manager: type[managers.BaseManager], address: tuple[str, int], authkey: bytes): self._manager: type[managers.BaseManager] = manager self._address = address self._authkey = authkey def __repr__(self): return 'Value' @property def _value(self) -> managers.ValueProxy: """Create and connect the manager and return its value proxy.""" manager = self._manager(address=self._address, authkey=self._authkey) manager.connect() return manager.value() def set(self, value: typing.Any) -> None: """Set the remote value.""" self._value.set(value) def __init__(self, manager: managers.BaseManager, result: 'Sniff.Value.Future', empty: typing.Any): self._manager: managers.BaseManager = manager self._result: Sniff.Value.Future = result self._empty: typing.Any = empty @classmethod def open(cls) -> tuple['Sniff.Value', 'Sniff.Value.Client', 'Sniff.Value.Future']: """Create new remote value facility. Returns: Tuple of the remote value instance, its client and the actual result future. """ class Manager(managers.BaseManager): """Custom manager type.""" empty = secrets.token_bytes() value = managers.Value(bytes, empty) Manager.register('value', callable=lambda: value) authkey = secrets.token_bytes() manager = Manager((socket.gethostname(), 0), authkey=authkey) manager.start() # pylint: disable=consider-using-with client = cls.Client(Manager, manager.address, authkey=authkey) result = cls.Future() return cls(manager, result, empty), client, result def close(self) -> None: """Collect the result and close the remote value manager.""" value = self._manager.value().get() if isinstance(value, self._empty.__class__) and value == self._empty: self._result.set_empty() else: self._result.set_result(value) self._manager.shutdown() def __init__(self): self._value: typing.Optional['Sniff.Value'] = None self._client: typing.Optional['Sniff.Value.Client'] = None def __enter__(self) -> 'payload.Sniff.Value.Future': assert self._value is None self._value, self._client, result = self.Value.open() return result def __exit__(self, exc_type, exc_val, exc_tb): self._value.close() # pylint: disable=no-member self._value = self._client = None def compose(self, scope: flow.Composable) -> flow.Trunk: left = scope.expand() apply = flow.Worker(self.Captor.builder(self._client), 1, 1) train = flow.Worker(self.Captor.builder(self._client), 1, 1) train.train(left.train.publisher, left.label.publisher) return left.extend(apply=apply)