Source code for forml.pipeline.wrap._operator

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Generic operators.
"""

import abc
import functools
import typing

from forml import flow as flowmod

if typing.TYPE_CHECKING:
    from forml import flow  # pylint: disable=reimported
    from forml.pipeline import wrap


class Decorator:
    """Helper for implementing the decorator functions."""

    def __init__(self, setup: typing.Callable[[type['wrap.Operator'], 'flow.Builder'], 'Setup']):
        self._setup: typing.Callable[[type['wrap.Operator'], 'flow.Builder'], Setup] = setup

    def __call__(
        self,
        parent: type['wrap.Operator'],
        inner: typing.Optional[typing.Union[type['flow.Actor'], type['wrap.Operator']]] = None,
        /,
        **kwargs: typing.Any,
    ) -> typing.Callable[..., 'wrap.Operator']:
        def decorator(inner: typing.Union[type['flow.Actor'], type['wrap.Operator']]) -> 'wrap.Operator':
            """Decorating function."""
            if issubclass(inner, flowmod.Actor):
                builder = inner.builder(**kwargs)
            else:
                nonlocal parent
                parent = inner
                builder = inner.Origin.reset(**kwargs) if kwargs else inner.Origin
            operator = Meta(inner.__name__, (), {}, setup=self._setup(parent, builder))
            return functools.update_wrapper(operator, inner, updated=())

        return decorator(inner) if inner else decorator


[docs]class Operator(flowmod.Operator, metaclass=abc.ABCMeta): """Special operator created via a decoration of particular actors. This represents a convenient way of implementing ForML *Operators* without requiring to fully implement the :class:`flow.Operator <forml.flow.Operator>` base class from scratch. Attention: Instances are expected to be created via the decorator methods. This approach is applicable only to a special case of *simple* operators implemented by at most one actor per each of the coherent :ref:`appy/train/label segments <topology-coherence>` corresponding to the relevant *primitive* decorators (:meth:`apply`, :meth:`train`, :meth:`label`) supplying the particular actors. In addition to the primitive decorators, there is the combined :meth:`mapper` decorator filling both the train/apply segments at once. Hint: The decorators can be *chained* together as well as applied in a *split* fashion onto separate actors for different builder:: @wrap.Operator.train @wrap.Operator.apply # can be chained if same actor is also to be used in another mode @wrap.Actor.apply def MyOperator(df, *, myarg=None): ... # stateless actor implementation used for train/apply segments @MyOperator.label # decorated operator can itself be used as decorator in split fashion @wrap.Actor.apply def MyOperator(df, *, myarg=None): ... # stateless actor implementation used for the label segment .. rubric:: Decorator Methods Actor definitions for individual builder can be provided using the following decorator methods. Methods: train(actor): Train segment actor decorator. When used as a decorator, this method creates an *operator* engaging the wrapped *actor* in the *train-mode*. If *stateful*, the actor also gets normally trained first. Note it does not get applied to the *apply-mode* features unless also decorated with the :meth:`apply` decorator (this is rarely desired - see the :meth:`mapper` decorator for a more typical use case)! Parameters: actor: Decorated actor. Returns: An Operator class using the given actor. Examples: Usage with a wrapped *stateless* actor:: @wrap.Operator.train @wrap.Actor.apply def TrainOnlyDropColumn( df: pandas.DataFrame, *, column: str ) -> pandas.DataFrame: return df.drop(columns=column) PIPELINE = AnotherOperator() >> TrainOnlyDropColumn(column='foo') apply(actor): Apply segment actor decorator. When used as a decorator, this method creates an *operator* engaging the wrapped *actor* in the *apply-mode*. If *stateful*, the actor also gets normally trained in *train-mode* (but does not get applied to the train-mode features unless also decorated with the :meth:`train` decorator!). Parameters: actor: Decorated actor. Returns: An Operator class using the given actor. Examples: Usage with a wrapped *stateful* actor:: @wrap.Actor.train def ApplyOnlyFillnaMean( state: typing.Optional[float], df: pandas.DataFrame, labels: pandas.Series, *, column: str, ) -> float: return df[column].mean() @wrap.Operator.apply @ApplyOnlyFillnaMean.apply def ApplyOnlyFillnaMean( state: float, df: pandas.DataFrame, *, column: str ) -> pandas.DataFrame: df[column] = df[column].fillna(state) return df PIPELINE = ( AnotherOperator() >> TrainOnlyDropColumn(column='foo') >> ApplyOnlyFillnaMean(column='bar') ) label(actor): Label segment actor decorator. When used as a decorator, this method creates an *operator* engaging the wrapped *actor* in the *train-mode* as the *label transformer*. If *stateful*, the actor also gets normally trained first. The actor gets engaged prior to any other stateful actors potentially added to the same operator (using the :meth:`train` or :meth:`apply` decorators). Parameters: actor: Decorated actor. Returns: An Operator class using the given actor. Examples: Usage with a wrapped *stateless* actor:: @wrap.Operator.label @wrap.Actor.apply def LabelOnlyFillZero(labels: pandas.Series) -> pandas.Series: return labels.fillna(0) PIPELINE = ( anotheroperator() >> LabelOnlyFillZero() >> TrainOnlyDropColumn(column='foo') >> ApplyOnlyFillnaMean(column='bar') ) Alternatively, it could as well be just added to the existing ``ApplyOnlyFillnaMean``:: @ApplyOnlyFillnaMean.label @wrap.Actor.apply def ApplyFillnaMeanLabelFillZero(labels: pandas.Series) -> pandas.Series: return labels.fillna(0) mapper(actor): Combined train-apply decorator. Decorator representing the wrapping of the same actor using both the :meth:`train` and :meth:`apply` decorators effectively engaging the actor in transforming the features in both the *train-mode* as well as the *apply-mode*. Parameters: actor: Decorated actor. Returns: An Operator class using the given actor. """ @property @abc.abstractmethod def Origin(self) -> 'flow.Builder': # pylint: disable=invalid-name """Builder provided in scope of the inner decorator (to be injected by metaclass).""" @property def Apply(self) -> typing.Optional['flow.Builder']: # pylint: disable=invalid-name """Apply path actor builder (to be injected by metaclass).""" return None @property def Train(self) -> typing.Optional['flow.Builder']: # pylint: disable=invalid-name """Train path actor builder (to be injected by metaclass).""" return None @property def Label(self) -> typing.Optional['flow.Builder']: # pylint: disable=invalid-name """Label path actor builder (to be injected by metaclass).""" return None apply = classmethod(Decorator(lambda parent, builder: Setup(builder, builder, parent.Train, parent.Label))) train = classmethod(Decorator(lambda parent, builder: Setup(builder, parent.Apply, builder, parent.Label))) label = classmethod(Decorator(lambda parent, builder: Setup(builder, parent.Apply, parent.Train, builder))) mapper = classmethod(Decorator(lambda parent, builder: Setup(builder, builder, builder, parent.Label))) def __init__(self, *args, **kwargs): self._args: tuple[typing.Any] = args self._kwargs: typing.Mapping[str, typing.Any] = kwargs def compose(self, scope: 'flow.Composable') -> 'flow.Trunk': """Composition implementation. Args: scope: Left side composition builder. Returns: Composed trunk. """ def build(builder: 'flow.Builder') -> 'flowmod.Worker': """Helper for creating a worker node.""" worker = groups.setdefault( id(builder), flowmod.Worker(builder.update(*self._args, **self._kwargs), 1, 1) ).fork() if worker.stateful and not worker.derived: worker.fork().train(left.train.publisher, label_publisher) return worker left = scope.expand() groups = {} apply = train = label = None label_publisher = left.label.publisher if self.Label: label = build(self.Label) label_publisher = label[0] if self.Apply: apply = build(self.Apply) if self.Train: train = build(self.Train) return left.extend(apply, train, label)
class Setup(typing.NamedTuple): """Combo of the individual actor builders.""" origin: 'flow.Builder' = Operator.Origin apply: typing.Optional['flow.Builder'] = Operator.Apply train: typing.Optional['flow.Builder'] = Operator.Train label: typing.Optional['flow.Builder'] = Operator.Label class Meta(abc.ABCMeta): """Metaclass for dynamically creating the decorated operator classes.""" def __new__( mcs, name: str, bases: tuple[type], namespace: dict[str, typing.Any], setup: Setup = Setup(), # noqa: B008 ): return super().__new__( mcs, name, (Operator,), { Operator.Origin.fget.__name__: setup.origin, Operator.Apply.fget.__name__: setup.apply, Operator.Train.fget.__name__: setup.train, Operator.Label.fget.__name__: setup.label, }, )