Source code for forml.io._input

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

"""
IO feed utils.
"""
import abc
import functools
import logging
import typing

import forml
from forml import flow, provider, setup
from forml.io import dsl as dslmod
from forml.io.dsl import parser as parsmod

from . import _producer
from . import extract as extmod

if typing.TYPE_CHECKING:
    from forml import io, project
    from forml.io import dsl  # pylint: disable=reimported
    from forml.io.dsl import parser  # pylint: disable=reimported

LOGGER = logging.getLogger(__name__)


[docs]class Feed( provider.Service, typing.Generic[parsmod.Source, parsmod.Feature], path=setup.Feed.path, # pylint: disable=no-member ): """Abstract base class for data source feed providers. It integrates the concept of a DSL-based ``Reader`` provided using the :meth:`producer` method or by overriding the inner ``.Reader`` class and a *content resolver* with its abstract parts represented by the :attr:`sources` and :attr:`features` properties. The need for implementing the content resolver mapping specifically for each particular platform makes it more difficult to setup Feed providers using just the parametric configuration and might end up requiring to actually implement the Feed (or at least the final resolver part) explicitly as a code. Important: Feeds need to be serializable! """ Reader = _producer.Reader def __init__(self, **readerkw): self._readerkw: dict[str, typing.Any] = readerkw def load( self, extract: 'project.Source.Extract', lower: typing.Optional['dsl.Native'] = None, upper: typing.Optional['dsl.Native'] = None, ) -> flow.Composable: """Provide a pipeline composable implementing the extract action. Args: extract: Datasource extract component. lower: Optional ordinal lower bound. upper: Optional ordinal upper bound. Returns: Pipeline extract composable. """ def actor(driver: type[extmod.Driver], statement: 'dsl.Statement') -> 'flow.Builder[extmod.Driver]': """Helper for creating the reader actor builder for the given query. Args: driver: Driver class. statement: Data loading query. Returns: Reader actor builder. """ return driver.builder(producer, extmod.Statement.prepare(statement, extract.ordinal, lower, upper)) producer = self.producer(self.sources, self.features, **self._readerkw) apply_actor: flow.Builder[extmod.Driver] = actor(extmod.RowDriver, extract.apply) label_actor: typing.Optional[flow.Builder] = None train_statement: 'dsl.Statement' = extract.train train_driver = extmod.RowDriver if extract.labels: train_driver = extmod.TableDriver if isinstance(extract.labels, flow.Builder): label_actor = extract.labels else: columns, label_actor = extmod.Slicer.from_columns(train_statement.features, extract.labels) train_statement = train_statement.select(*columns) train_actor: flow.Builder[extmod.Driver] = actor(train_driver, train_statement) return extmod.Operator(apply_actor, train_actor, label_actor)
[docs] @classmethod def producer( cls, sources: typing.Mapping['dsl.Source', 'parser.Source'], features: typing.Mapping['dsl.Feature', 'parser.Feature'], **kwargs: typing.Any, ) -> 'io.Producer': """Producer factory method. A ``Producer`` is a generic callable interface most typically represented using the :class:`forml.io.Feed.Reader` implementation whose task is to parse the provided DSL query and resolve it using its linked storage. Unless overloaded, the method returns an instance of ``cls.Reader`` (which might be easier to extend without needing to overload this method). Args: sources: Source mappings to be used by the reader (see :attr:`sources`). features: Column mappings to be used by the reader (see :attr:`features`). kwargs: Optional reader keyword arguments. Returns: Producer instance. """ return cls.Reader(sources, features, **kwargs) # pylint: disable=abstract-class-instantiated
@property @abc.abstractmethod def sources(self) -> typing.Mapping['dsl.Source', 'parser.Source']: """The main part of the *content resolver* providing the *Source* mappings. This way the Feed is advertising the available datasets represented using their :ref:`published schemas <io-catalog>` logically mapped to the hosted data sources specified using the *parser-specific* semantics. A :class:`Source <forml.io.dsl.Source>` is a DSL concept representing anything that can be queried as a data source. Returns: Sources mapping. Examples: Using a parser with ``SQLAlchemy`` semantics, an example of the mapping might look like this:: return { schema.Titanic: sqlalchemy.table('titanic'), foo.Bar.join(foo.Baz, foo.Bar.id == foo.Baz.id): sqlalchemy.table('foobar'), } Note the capability of mapping a complex query to a denormalized dataset. """ @property def features(self) -> typing.Mapping['dsl.Feature', 'parser.Feature']: """The minor part of the *content resolver* providing the optional *Feature* mappings. Optional mapping of individual Features to their hosted representation using the *parser-specific* semantic. A :class:`Feature <forml.io.dsl.Feature>` is a DSL concept representing anything that can be projected to a data column. Returns: Features mapping. """ return {}
[docs]class Importer: """Pool of (possibly) lazily instantiated feeds. The pool is used to select the most suitable feed instance capable of resolving the particular DSL query (in terms of providing data sources for all the involved schemas). Feed instances can have a static priority assigned in which case the first feed with the highest priority capable of providing the data is returned. If configured without any explicit instances, all the feeds registered in the provider cache are pooled. Todo: This logic should be extended to also probe the available data range so that a feed without the expected data range is not prioritized over another feed that has the range but has a lower absolute priority. """ class Slot: """Representation of a single feed provided either explicitly s an instance or lazily as a descriptor.""" def __init__(self, feed: typing.Union[setup.Feed, str, 'io.Feed']): if isinstance(feed, str): feed = setup.Feed.resolve(feed) descriptor, instance = (feed, None) if isinstance(feed, setup.Feed) else (None, feed) self._descriptor: typing.Optional[setup.Feed] = descriptor self._instance: typing.Optional[Feed] = instance def __lt__(self, other: 'io.Importer.Slot'): return self.priority < other.priority @property def priority(self) -> float: """Slots defined explicitly have infinite priority, lazy ones have priority according to their setup. Returns: Priority value. """ return self._descriptor.priority if self._descriptor else float('inf') @property def instance(self) -> 'io.Feed': """Return the feed instance possibly creating it on the fly if lazy. Returns: Feed instance. """ if self._instance is None: LOGGER.debug('Instantiating feed %s', self._descriptor.reference) self._instance = Feed[self._descriptor.reference](**self._descriptor.params) return self._instance class Matcher(dslmod.Source.Visitor): """Visitor that can be used to determine whether the accepting Frame can be constructed using the provided sources. The logic is based on traversing the Frame tree and if hitting a Table (tree leaf) that's not among the defined sources it resolves as not matching. """ def __init__(self, sources: typing.Iterable['dsl.Source']): self._sources: frozenset['dsl.Source'] = frozenset(sources) self._matches: bool = True def __bool__(self): return self._matches def visit_reference(self, source: 'dsl.Reference') -> None: if self and source not in self._sources: super().visit_reference(source) def visit_join(self, source: 'dsl.Join') -> None: if self and source not in self._sources: super().visit_join(source) def visit_set(self, source: 'dsl.Set') -> None: if self and source not in self._sources: super().visit_set(source) def visit_query(self, source: 'dsl.Query') -> None: if self and source not in self._sources: super().visit_query(source) def visit_table(self, source: 'dsl.Table') -> None: if source not in self._sources: self._matches = False def __init__(self, *feeds: typing.Union['setup.Feed', str, 'io.Feed']): self._feeds: tuple[Importer.Slot] = tuple(sorted((self.Slot(f) for f in feeds or Feed), reverse=True)) def __iter__(self) -> typing.Iterable['io.Feed']: for feed in self._feeds: yield feed.instance
[docs] @functools.lru_cache def match(self, source: 'dsl.Source') -> 'io.Feed': """Select a feed instance that can supply data for the given ``source``. Args: source: Any *DSL Source* representing the data request. Returns: Feed that's able to provide data for the given request. Raises: forml.MissingError: If no feed can provide the requested data. """ for feed in self: matcher = self.Matcher(feed.sources) source.accept(matcher) if matcher: return feed raise forml.MissingError(f'None of the {len(self._feeds)} available feeds provide all of the required sources')