Source code for forml.flow._suite.member
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Flow members represent partial pipeline blocks during pipeline assembly.
"""
import abc
import typing
import weakref
from .. import _exception
from . import assembly
if typing.TYPE_CHECKING:
from forml import flow
[docs]class Composable(metaclass=abc.ABCMeta):
"""Common interface for operators and expressions."""
def __rshift__(self, right: 'flow.Composable') -> 'Compound':
"""Semantical composition construct."""
return Compound(right, self)
def __repr__(self):
return self.__class__.__name__
[docs] @abc.abstractmethod
def compose(self, scope: 'flow.Composable') -> 'flow.Trunk':
"""Implementation of the internal task graph and its composition with the preceding part of
the expression.
Args:
scope: Preceding part of the expression that this operator is supposed to compose with.
Returns:
Trunk instance representing the composed task graph.
"""
[docs] @abc.abstractmethod
def expand(self) -> 'flow.Trunk':
"""Compose this instance and the entire preceding part of the expression and return the
resulting trunk.
This is typically called by a downstream operator (*right* side of the expression) within
its :meth:`compose` method where this is passed as part of the *left* side of the
expression.
Returns:
Trunk instance representing the composed task graph.
"""
class Origin(Composable):
"""Initial builder without a predecessor."""
def compose(self, scope: 'flow.Composable') -> 'flow.Trunk':
"""Origin composition is just the left side trunk.
Args:
scope: Left side composable.
Returns:
Trunk.
"""
return scope.expand()
def expand(self) -> 'flow.Trunk':
return assembly.Trunk()
[docs]class Operator(Composable, metaclass=abc.ABCMeta): # pylint: disable=abstract-method
"""Base class for operator implementations."""
def expand(self) -> 'flow.Trunk':
return self.compose(Origin())
class Compound(Composable):
"""Operator chaining descriptor."""
_TERMS = weakref.WeakValueDictionary()
def __init__(self, right: 'flow.Composable', left: 'flow.Composable'):
for term in (left, right):
if not isinstance(term, Composable):
raise ValueError(f'{type(term)} not composable')
if term in self._TERMS:
raise _exception.TopologyError(f'Non-linear {term} composition')
self._TERMS[term] = self
self._right: 'flow.Composable' = right
self._left: 'flow.Composable' = left
def __repr__(self):
return f'{self._left} >> {self._right}'
def compose(self, scope: 'flow.Composable') -> 'flow.Trunk':
"""Expression composition is just extension of its segments.
Args:
scope: Left side composable.
Returns:
Trunk.
"""
return scope.expand().extend(*self.expand())
def expand(self) -> 'flow.Trunk':
return self._right.compose(self._left)