# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Utilities for creating actors using decorator wrappings.
"""
import abc
import copyreg
import functools
import inspect
import itertools
import logging
import types
import typing
import cloudpickle
from forml import flow
LOGGER = logging.getLogger(__name__)
class Class(abc.ABCMeta):
"""Wrapped class-based actor metaclass."""
Target = typing.Union[str, typing.Callable[..., typing.Any]]
class Actor(flow.Actor[flow.Features, flow.Labels, flow.Result], metaclass=abc.ABCMeta):
"""Wrapper around user class implementing the Actor interface."""
class Decorated:
"""Decorated representation of the mapping target."""
def __init__(self, instance: object, decorator: typing.Callable[..., typing.Any]):
self._instance: object = instance
self._decorator: typing.Callable[..., typing.Any] = decorator
def __call__(self, *args, **kwargs):
return self._decorator(self._instance, *args, **kwargs)
class Origin(abc.ABC):
"""Wrapped origin class (to be injected by metaclass)."""
@property
@abc.abstractmethod
def Mapping(self) -> typing.Mapping[str, 'Class.Target']: # pylint: disable=invalid-name
"""Mapping from the Actor API to the Origin API (to be injected by metaclass)."""
def __init__(self, *args, **kwargs):
self._origin = self.Origin(*args, **kwargs)
def apply(self, *features: flow.Features) -> flow.Result:
return self.Mapping['apply'](*features)
@classmethod
def is_stateful(cls) -> bool:
attr = cls.Mapping[flow.Actor.train.__name__]
return callable(attr) or hasattr(cls.Origin, attr)
def __getattribute__(self, item):
if item not in {'Origin', 'Mapping', '_origin'}:
if item in self.Mapping:
attr = self.Mapping[item]
return self.Decorated(self._origin, attr) if callable(attr) else getattr(self._origin, attr)
if hasattr(self._origin, item):
return getattr(self._origin, item)
return super().__getattribute__(item)
def __new__(
mcs,
name: str,
bases: tuple[type],
namespace: dict[str, typing.Any],
*,
origin: typing.Optional[type] = None,
mapping: typing.Optional[typing.Mapping[str, 'Class.Target']] = None,
):
if origin:
assert not issubclass(origin, flow.Actor), 'Already an actor'
if mapping:
mapping = dict(mapping)
if not all(
isinstance(a, (str, typing.Callable)) # pylint: disable=isinstance-second-argument-not-valid-type
for a in mapping.values()
):
raise TypeError('Invalid mapping')
for method in (flow.Actor.apply, flow.Actor.train, flow.Actor.get_params, flow.Actor.set_params):
mapping.setdefault(method.__name__, method.__name__)
for target in {t for s, t in mapping.items() if s != flow.Actor.train.__name__ and not callable(t)}:
if not callable(getattr(origin, target, None)):
raise TypeError(f'Wrapped actor missing required {target} implementation')
mapping = types.MappingProxyType(mapping)
actor = super().__new__(
mcs,
name,
(mcs.Actor,),
{mcs.Actor.Origin.__name__: origin, mcs.Actor.Mapping.fget.__name__: mapping},
)
actor = functools.update_wrapper(actor, origin, updated=())
copyreg.pickle(
actor,
lambda a: (
actor,
(),
(a.get_state(), a.get_params()),
None,
None,
lambda o, s: (o.set_state(s[0]), o.set_params(**s[1])),
),
)
return actor
class Parametric(flow.Actor[flow.Features, flow.Labels, flow.Result], metaclass=abc.ABCMeta):
"""Base class for function based actors."""
def __init__(self, **kwargs: typing.Any):
self._kwargs: dict[str, typing.Any] = kwargs
def get_params(self) -> typing.Mapping[str, typing.Any]:
return dict(self._kwargs)
def set_params(self, **kwargs) -> None:
self._kwargs.update(kwargs)
class Stateless(abc.ABCMeta):
"""Stateless function-based actor metaclass."""
class Actor(Parametric[flow.Features, None, flow.Result], metaclass=abc.ABCMeta):
"""Stateless actor based on the given function."""
@staticmethod
@abc.abstractmethod
def Apply(*features, **kwargs) -> flow.Result: # pylint: disable=invalid-name
"""Wrapped origin apply function (to be injected by metaclass)."""
def __init__(self, **kwargs: typing.Any):
signature = inspect.signature(self.Apply)
params = {
p
for p in signature.parameters.values()
if p.kind in {inspect.Parameter.KEYWORD_ONLY, inspect.Parameter.VAR_KEYWORD}
}
# validating the kwonly params - the rest is expected to be *features
signature.replace(parameters=params).bind(**kwargs)
super().__init__(**kwargs)
def apply(self, *features: flow.Features) -> flow.Result:
return self.Apply(*features, **self._kwargs)
def __new__(
mcs,
name: str,
bases: tuple[type],
namespace: dict[str, typing.Any],
*,
apply: typing.Optional[typing.Callable[..., flow.Result]] = None,
):
actor = super().__new__(
mcs,
name,
(mcs.Actor,),
{mcs.Actor.Apply.__name__: staticmethod(apply)},
)
return functools.update_wrapper(actor, apply, updated=())
State = typing.TypeVar('State')
class Stateful(abc.ABCMeta):
"""Stateful function-based actor metaclass."""
class Actor(
typing.Generic[State, flow.Features, flow.Labels, flow.Result],
Parametric[flow.Features, flow.Labels, flow.Result],
metaclass=abc.ABCMeta,
):
"""Stateful actor based on the given functions."""
@staticmethod
@abc.abstractmethod
def Apply(state: State, features: flow.Features, **kwargs) -> flow.Result: # pylint: disable=invalid-name
"""Wrapped origin apply function (to be injected by metaclass)."""
@staticmethod
@abc.abstractmethod
def Train( # pylint: disable=invalid-name
state: typing.Optional[State], features: flow.Features, labels: flow.Labels, **kwargs
) -> State:
"""Wrapped origin train function (to be injected by metaclass)."""
def __init__(self, **kwargs):
# validating the kwargs against the signatures
# skipping initial args - state+features+labels for train mode and state+features for apply mode
for skip, origin in (3, self.Train), (2, self.Apply):
signature = inspect.signature(origin)
params = itertools.islice(signature.parameters.values(), skip, None)
signature.replace(parameters=params).bind(**kwargs)
self._state: typing.Optional[State] = None
super().__init__(**kwargs)
def train(self, features: flow.Features, labels: flow.Labels, /) -> None:
state = self.Train(self._state, features, labels, **self._kwargs)
if state is None:
LOGGER.warning('Stateful function-based actor returned None state - not considered trained')
self._state = state
def apply(self, features: flow.Features) -> flow.Result:
if self._state is None:
raise RuntimeError('Actor not trained')
return self.Apply(self._state, features, **self._kwargs)
def get_state(self) -> bytes:
if self._state is None:
return b''
return cloudpickle.dumps(self._state)
def set_state(self, state: bytes) -> None:
if state:
self._state = cloudpickle.loads(state)
def __new__(
mcs,
name: str,
bases: tuple[type],
namespace: dict[str, typing.Any],
*,
train: typing.Optional[typing.Callable[..., State]] = None,
apply: typing.Optional[typing.Callable[..., flow.Result]] = None,
):
actor = super().__new__(
mcs,
name,
(mcs.Actor,),
{mcs.Actor.Apply.__name__: staticmethod(apply), mcs.Actor.Train.__name__: staticmethod(train)},
)
return functools.update_wrapper(actor, apply, updated=())
[docs]class Actor:
"""Central class providing decorators/wrappers for creating ForML *Actors* using a number of
convenient ways not requiring to fully implement the :class:`flow.Actor <forml.flow.Actor>` base
class from scratch.
.. rubric:: Decorator Methods
Methods:
apply(origin):
Decorator for turning a given plain function into a *stateless* Actor.
Args:
origin: Decorated function.
The function must have one of the following signatures::
def foo(*features: flow.Features) -> flow.Result:
def foo(features: flow.Features) -> flow.Result:
def foo(*features: flow.Features, opt1, optN=None) -> flow.Result:
def foo(features: flow.Features, *, opt1, optN=None) -> flow.Result:
def foo(*features: flow.Features, opt1, **kwargs) -> flow.Result:
def foo(features: flow.Features, /, *, opt1, **kwargs) -> flow.Result:
Attention:
The optional arguments ``opt1``, ``opt2``, and ``**kwargs`` must all be
*keyword-only* arguments.
Returns:
A *stateless* Actor class with the given *apply* logic.
Examples:
Simple stateless imputation actor using the provided value to fill the NaNs::
@wrap.Actor.apply
def StaticImpute(
df: pandas.DataFrame,
*,
column: str,
value: float,
) -> pandas.DataFrame:
df[column] = df[column].fillna(value)
return df
train(origin):
Decorator for turning a given plain function into a follow-up apply function decorator.
Stateful actors need to have distinct implementations for their :ref:`train vs apply
modes <workflow-mode>`. This wrapping facility achieves that by decorating two
companion functions each implementing the relevant mode.
Args:
origin: Decorated train function.
The decorated *train* function must have one of the following signatures::
def foo(state: typing.Optional[State], features: flow.Features, labels: flow.Labels) -> State:
def foo(state: typing.Optional[State], features: flow.Features, labels: flow.Labels, opt1, optN=None) -> State:
def foo(state: typing.Optional[State], features: flow.Features, labels: flow.Labels, /, opt1,**kwargs) -> State:
The function will receive the *previous state* as the first parameter and is
expected to provide the *new state* instance as its return value.
Returns:
Follow-up *decorator* to be used for wrapping the companion *apply* function which
eventually returns a *stateful* Actor class with the given *train-apply* logic.
The decorated *apply* function must have one of the following signatures::
def foo(state: State, features: flow.Features) -> flow.Result:
def foo(state: State, features: flow.Features, opt1, optN=None) -> flow.Result:
def foo(state: State, features: flow.Features, /, opt1, **kwargs) -> flow.Result:
The function will receive the *current state* as the first parameter and is
expected to provide the *apply-mode* transformation result.
Examples:
Simple stateful imputation actor using the trained mean value to fill the NaNs::
@wrap.Actor.train # starting with wrapping the train-mode function
def MeanImpute(
state: typing.Optional[float], # receving the previous state (not used)
features: pandas.DataFrame,
labels: pandas.Series,
*,
column: str,
) -> float:
return features[column].mean() # returning the new state
@MeanImpute.apply # continue with the follow-up apply-mode function decorator
def MeanImpute(
state: float, # receiving current state
features: pandas.DataFrame,
*,
column: str
) -> pandas.DataFrame:
features[column] = features[column].fillna(state)
return features # apply-mode result
type(origin=None, /, *, apply=None, train=None, get_params=None, set_params=None):
Wrapper for turning an external user class into a valid Actor.
This can be used either as a parameterless decorator or optionally with mapping of Actor
methods to decorated user class implementation.
Args:
origin: Decorated class.
apply: Target method name or decorator function implementing the actor
:meth:`apply <forml.flow.Actor.apply>` logic.
train: Target method name or decorator function implementing the actor
:meth:`train <forml.flow.Actor.train>` logic.
get_params: Target method name or decorator function implementing the actor
:meth:`get_params <forml.flow.Actor.get_params>` logic.
set_params: Target method name or decorator function implementing the actor
:meth:`set_params <forml.flow.Actor.set_params>` logic.
Returns:
Actor class.
Examples:
>>> RfcActor = wrap.Actor.type(
... sklearn.ensemble.RandomForestClassifier,
... train='fit',
... apply=lambda c, *a, **kw: c.predict_proba(*a, **kw).transpose()[-1],
... )
""" # pylint: disable=line-too-long # noqa: E501
class Train:
"""Follow-up train function decorator."""
def __init__(self, origin: typing.Callable[..., State]):
self._origin: typing.Callable[..., State] = origin
def apply(self, origin: typing.Callable[..., flow.Result], /) -> type[flow.Actor]:
"""Apply function decorator (following-up from a train function decorator) turning it
into a stateful Actor.
See Also: Full description in the class docstring.
"""
if not inspect.isfunction(origin):
raise TypeError(f'Invalid actor function {origin}')
return Stateful(origin.__name__, (), {}, train=self._origin, apply=origin)
[docs] @classmethod
def apply(cls, origin: typing.Callable[..., flow.Result], /) -> type[flow.Actor]:
"""Function decorator turning it into a stateless Actor.
See Also: Full description in the class docstring.
"""
if not inspect.isfunction(origin):
raise TypeError(f'Invalid actor function {origin}')
return Stateless(origin.__name__, (), {}, apply=origin)
[docs] @classmethod
def train(cls, origin: typing.Callable[..., State], /) -> 'Actor.Train':
"""Train function decorator turning it into a follow-up apply function decorator.
See Also: Full description in the class docstring.
"""
if not inspect.isfunction(origin):
raise TypeError(f'Invalid actor function {origin}')
return cls.Train(origin)
[docs] @classmethod
def type(cls, origin: typing.Optional[type] = None, /, **mapping: Class.Target) -> type[flow.Actor]:
"""Decorator for turning an external user class into a valid actor.
See Also: Full description in the class docstring.
"""
def decorator(origin: type) -> type[flow.Actor]:
"""Decorating function."""
if not inspect.isclass(origin):
raise TypeError(f'Invalid actor class {origin}')
return Class(origin.__name__, (), {}, origin=origin, mapping=mapping)
if origin:
decorator = decorator(origin)
return decorator
def __new__(cls, *args, **kwargs):
raise TypeError(f'Illegal attempt instantiating {cls.__name__}.')