Source code for forml.pipeline.payload._generic
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Column manipulation actors.
"""
import logging
import typing
import pandas
from forml import flow
from .. import wrap
from . import _convert
LOGGER = logging.getLogger(__name__)
[docs]@wrap.Actor.apply
def Apply( # pylint: disable=invalid-name
*features: flow.Features, function: typing.Callable[..., flow.Features]
) -> flow.Features:
"""Apply(*, function: typing.Callable[..., flow.Features])
Generic stateless function-based transformer actor.
It can work with an arbitrary number of M:N input/output ports depending on the topology
implemented by the owning operator.
Args:
function: Callable transformer to be applied to the data.
Returns:
Result of the *function* applied to the input data.
Examples:
>>> APPLY = payload.Apply(function=lambda df: df.dropna())
"""
return function(*features)
[docs]@wrap.Actor.apply
def PandasConcat(*features: flow.Features, **kwargs) -> flow.Features: # pylint: disable=invalid-name
"""PandasConcat(**kwargs)
Stateless concatenation actor based on :func:`pandas:pandas.concat`.
It plugs into a topology of *N:1* input/output ports.
Args:
kwargs: Any keywords accepted by :func:`pandas:pandas.concat`.
Examples:
>>> CONCAT = payload.PandasConcat(axis='columns', ignore_index=False)
"""
return pandas.concat((_convert.pandas_read(f) for f in features), **({'copy': False} | kwargs))
[docs]@wrap.Actor.apply
def PandasSelect( # pylint: disable=invalid-name
features: pandas.DataFrame, *, columns: typing.Sequence[str]
) -> pandas.DataFrame:
"""PandasSelect(*, columns: typing.Sequence[str])
Stateless mapper actor for :class:`pandas:pandas.DataFrame` column selection.
Args:
columns: Sequence of columns to be selected.
Returns:
New DataFrame with only the selected columns.
Examples:
>>> SELECT = payload.PandasSelect(columns=['foo', 'bar'])
"""
return features[list(columns)]
[docs]@wrap.Actor.apply
def PandasDrop( # pylint: disable=invalid-name
features: pandas.DataFrame, *, columns: typing.Sequence[str]
) -> pandas.DataFrame:
"""PandasDrop(*, columns: typing.Sequence[str])
Stateless mapper actor for :class:`pandas:pandas.DataFrame` column dropping.
Args:
columns: Sequence of columns to be dropped.
Returns:
New DataFrame without the dropped columns.
Examples:
>>> DROP = payload.PandasDrop(columns=['foo', 'bar'])
"""
return features.drop(columns=list(columns))
[docs]class MapReduce(flow.Operator):
"""MapReduce(*mappers: flow.Builder, reducer: flow.Builder = PandasConcat.builder())
Operator for applying parallel (possibly stateful) mapper actors and combining their outputs
using a final (stateless) reducer.
Args:
mappers: Builders for individual mapper actors to be applied in parallel.
reducer: Builder for a final reducer actor. The actor needs to accept as many inputs
as many mappers are provided. Defaults to :class:`payload.PandasConcat
<forml.pipeline.payload.PandasConcat>`.
Examples:
>>> MAPRED = payload.MapReduce(
... payload.PandasSelect.builder(columns=['foo']),
... payload.PandasDrop.builder(columns=['foo']),
... )
"""
def __init__(
self, *mappers: flow.Builder, reducer: flow.Builder = PandasConcat.builder(axis='columns') # noqa: B008
):
if reducer.actor.is_stateful():
raise TypeError('Stateful reducer')
if not mappers:
raise ValueError('Mappers required')
self._mappers: tuple[flow.Builder] = mappers
self._reducer: flow.Builder = reducer
def compose(self, scope: flow.Composable) -> flow.Trunk:
left: flow.Trunk = scope.expand()
apply_reducer = flow.Worker(self._reducer, len(self._mappers), 1)
train_reducer = apply_reducer.fork()
for idx, mapper in enumerate(self._mappers):
apply_applier = flow.Worker(mapper, 1, 1)
apply_applier[0].subscribe(left.apply.publisher)
train_applier = apply_applier.fork()
train_applier[0].subscribe(left.train.publisher)
if mapper.actor.is_stateful():
train_trainer = apply_applier.fork()
train_trainer.train(left.train.publisher, left.label.publisher)
apply_reducer[idx].subscribe(apply_applier[0])
train_reducer[idx].subscribe(train_applier[0])
return left.use(apply=left.apply.extend(tail=apply_reducer), train=left.train.extend(tail=train_reducer))