Source code for forml.io._output

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

"""
IO sink utils.
"""
import typing

from forml import flow, provider, setup

from . import _consumer, commit

if typing.TYPE_CHECKING:
    from forml import io
    from forml.io import dsl


[docs]class Sink(provider.Service, default=setup.Sink.default, path=setup.Sink.path): """Abstract base class for pipeline output sink providers. It integrates the concept of a ``Writer`` provided using the :meth:`consumer` method or by overriding the inner ``.Writer`` class. """ Writer = _consumer.Writer def __init__(self, **writerkw): self._writerkw: dict[str, typing.Any] = writerkw def save(self, schema: typing.Optional['dsl.Source.Schema']) -> flow.Composable: """Provide a pipeline composable implementing the publish action. Returns: Pipeline composable. """ return commit.Operator(commit.Driver.builder(self.consumer(schema, **self._writerkw)))
[docs] @classmethod def consumer(cls, schema: typing.Optional['dsl.Source.Schema'], **kwargs: typing.Any) -> 'io.Consumer': """Consumer factory method. A ``Consumer`` is a generic callable interface most typically represented using the :class:`forml.io.Sink.Writer` implementation whose task is to commit the pipeline output using an external media layer. Unless overloaded, the method returns an instance of ``cls.Writer`` (which might be easier to extend without needing to overload this method). Note: For compatibility with the :doc:`serving mode <serving>`, the callable ``Consumer`` is (contra-intuitively) expected to provide a return value. Args: schema: Result schema. kwargs: Optional writer keyword arguments. Returns: Consumer instance. """ return cls.Writer(schema, **kwargs) # pylint: disable=abstract-class-instantiated
class Exporter: """Sink exporter is a lazy wrapper around alternative sink specifiers providing a particular Sink instance upon request. """ def __init__(self, sink: typing.Union[setup.Sink.Mode, str, Sink]): if isinstance(sink, str): sink = setup.Sink.Mode.resolve(sink) self._sink: typing.Union[setup.Sink.Mode, Sink] = sink def __call__(self, getter: property) -> 'Sink': if isinstance(self._sink, Sink): # already a Sink instance return self._sink assert isinstance(self._sink, setup.Sink.Mode) descriptor: setup.Sink = getter.fget(self._sink) return Sink[descriptor.reference](**descriptor.params) # pylint: disable=no-member train = property(lambda self: self(setup.Sink.Mode.train)) apply = property(lambda self: self(setup.Sink.Mode.apply)) eval = property(lambda self: self(setup.Sink.Mode.eval))