Source code for forml.flow._code.target

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Code instructions.
"""
import abc
import collections
import logging
import time
import typing

from ... import _exception

if typing.TYPE_CHECKING:
    from forml import flow

LOGGER = logging.getLogger(__name__)


[docs]class Instruction(metaclass=abc.ABCMeta): """Executable part of the compiled symbol responsible for performing the processing activity."""
[docs] @abc.abstractmethod def execute(self, *args: typing.Any) -> typing.Any: """Actual instruction functionality. Args: args: A sequence of input arguments. Returns: Instruction result. """
def __repr__(self): return self.__class__.__name__ def __call__(self, *args: typing.Any) -> typing.Any: LOGGER.debug('%s invoked (%d args)', self, len(args)) start = time.time() try: result = self.execute(*args) except Exception as err: LOGGER.exception( 'Instruction %s failed when processing arguments: %s', self, ', '.join(f'{str(a):.1024s}' for a in args) ) raise err LOGGER.debug('%s completed (%.2fms)', self, (time.time() - start) * 1000) return result
[docs]class Symbol(collections.namedtuple('Symbol', 'instruction, arguments')): """The main unit of the compiled low-level runtime code. It represents the executable instruction and its dependency on other instructions in the task graph. Args: instruction: The executable instruction. arguments: The sequence of instructions whose output constitutes parameters to this symbol's instruction. """ instruction: 'flow.Instruction' arguments: tuple['flow.Instruction'] def __new__( cls, instruction: 'flow.Instruction', arguments: typing.Optional[typing.Sequence['flow.Instruction']] = None ): if arguments is None: arguments = [] if not all(arguments): raise _exception.AssemblyError('All arguments required') return super().__new__(cls, instruction, tuple(arguments)) def __repr__(self): return f'{self.instruction}{self.arguments}'