Source Feed

A Feed is a runtime platform component responsible for interpreting the ETL query defined as the project source and resolving the requested dataset using the linked storage system.

Architecture

The Feed concept is based on two main principles:

  1. A DSL-interpreting Reader acting as an adapter between the storage layer and the pipeline.

  2. A content resolver using an explicit mapping of the published schema catalogs to the hosted data sources effectively matching the logical schemas with actual data.

Content resolution takes place in the scope of the DSL parsing as part of the Reader routine simply by visiting and replacing the matched DSL Sources/Features with the mapped terms declared already using the parser-target semantic.

When launching the pipeline, ForML runner expands the Feed into one or more initial tasks within the assembled workflow making it a native part of the final DAG to be executed.

The core Feed API looks as follows:

forml.io.Producer

alias of Callable[[dsl.Statement, Optional[layout.Entry]], layout.Tabular]

class forml.io.Feed(**readerkw)[source]

Abstract base class for data source feed providers.

It integrates the concept of a DSL-based Reader provided using the producer() method or by overriding the inner .Reader class and a content resolver with its abstract parts represented by the sources and features properties.

The need for implementing the content resolver mapping specifically for each particular platform makes it more difficult to setup Feed providers using just the parametric configuration and might end up requiring to actually implement the Feed (or at least the final resolver part) explicitly as a code.

Important

Feeds need to be serializable!

classmethod producer(sources: Mapping[dsl.Source, parser.Source], features: Mapping[dsl.Feature, parser.Feature], **kwargs: Any) io.Producer[source]

Producer factory method.

A Producer is a generic callable interface most typically represented using the forml.io.Feed.Reader implementation whose task is to parse the provided DSL query and resolve it using its linked storage.

Unless overloaded, the method returns an instance of cls.Reader (which might be easier to extend without needing to overload this method).

Parameters:
sources: Mapping[dsl.Source, parser.Source]

Source mappings to be used by the reader (see sources).

features: Mapping[dsl.Feature, parser.Feature]

Column mappings to be used by the reader (see features).

**kwargs: Any

Optional reader keyword arguments.

Returns:

Producer instance.

abstract property sources : Mapping[dsl.Source, parser.Source]

The main part of the content resolver providing the Source mappings.

This way the Feed is advertising the available datasets represented using their published schemas logically mapped to the hosted data sources specified using the parser-specific semantics.

A Source is a DSL concept representing anything that can be queried as a data source.

Returns:

Sources mapping.

Examples

Using a parser with SQLAlchemy semantics, an example of the mapping might look like this:

return {
    schema.Titanic: sqlalchemy.table('titanic'),
    foo.Bar.join(foo.Baz, foo.Bar.id == foo.Baz.id): sqlalchemy.table('foobar'),
}

Note the capability of mapping a complex query to a denormalized dataset.

property features : Mapping[dsl.Feature, parser.Feature]

The minor part of the content resolver providing the optional Feature mappings.

Optional mapping of individual Features to their hosted representation using the parser-specific semantic.

A Feature is a DSL concept representing anything that can be projected to a data column.

Returns:

Features mapping.

class forml.io.Feed.Reader(sources: Mapping[dsl.Source, parser.Source], features: Mapping[dsl.Feature, parser.Feature], **kwargs: Any)

Generic reader base class matching the Feed producer interface.

It is a low-level input component responsible for parsing a generic data request in the form of a DSL query (the parser() method) and based on it retrieving the actual data from its supported storage technology and its specific data format (the read() method).

Reader can operate in two possible modes depending on the input parameters:

  • extraction - when called just using a query without the input entry parameter, it simply executes the (parsed) query against the backend storage.

  • augmentation - if the entry value is provided, it is interpreted as the actual source to be returned but potentially incomplete in terms of the expected schema; in which case the reader is supposed to just complete the partial data to match the query schema.

Todo

Implement the augmentation mode.

abstract classmethod parser(sources: Mapping[dsl.Source, parser.Source], features: Mapping[dsl.Feature, parser.Feature]) parser.Visitor

Parser factory method.

The parser instance must be able to convert a DSL query into storage-native instructions compatible with the read() method.

During parsing, the provided sources/features mappings are supposed to be used for the content resolution.

Parameters:
sources: Mapping[dsl.Source, parser.Source]

Source mappings to be used by the parser.

features: Mapping[dsl.Feature, parser.Feature]

Feature mappings to be used by the parser.

Returns:

Parser instance.

classmethod format(schema: dsl.Source.Schema, data: layout.Native) layout.Tabular

Convert the storage-native data into the required layout.Tabular format.

Parameters:
schema: dsl.Source.Schema

Data schema.

data: layout.Native

Input data.

Returns:

Data formatted into the layout.Tabular format.

abstract classmethod read(statement: parser.Source, **kwargs: Any) layout.Native

Perform the read operation using the given storage-native statement.

Parameters:
statement: parser.Source

Read instructions in the storage-native syntax.

**kwargs: Any

Optional reader keyword arguments (as given to the constructor).

Returns:

Raw data provided by the reader.

For reference, several existing Reader implementations can be found under the forml.provider.feed.reader package:

Alchemy

SQLAlchemy based reader.

Contextual Feed Selection

Unlike the other provider types which explicitly nominate exactly one instance each before launching, feeds go through a more dynamic process of selecting the most suitable candidate in the context of the actual data query.

For this purpose, ForML uses the io.Importer class:

class forml.io.Importer(*feeds: setup.Feed | str | io.Feed)[source]

Pool of (possibly) lazily instantiated feeds.

The pool is used to select the most suitable feed instance capable of resolving the particular DSL query (in terms of providing data sources for all the involved schemas). Feed instances can have a static priority assigned in which case the first feed with the highest priority capable of providing the data is returned.

If configured without any explicit instances, all the feeds registered in the provider cache are pooled.

Todo

This logic should be extended to also probe the available data range so that a feed without the expected data range is not prioritized over another feed that has the range but has a lower absolute priority.

match(source: dsl.Source) io.Feed[source]

Select a feed instance that can supply data for the given source.

Parameters:
source: dsl.Source

Any DSL Source representing the data request.

Returns:

Feed that’s able to provide data for the given request.

Raises:

forml.MissingError – If no feed can provide the requested data.

Custom Feed Setup

Existing generic Feed implementations can be configured as any other provider types. The strong deployment-specific character of the content resolver setup (explicit mapping of the published schemas and the hosted data sources) might, however, require to declare bespoke Feed providers using not just parametric configuration but rather directly as a non-generic code.

For more details, see the custom provider setup instructions.

Feed Providers

Feed providers can be configured within the runtime platform setup using the [FEED.*] sections.

The available implementations are:

Alchemy

Generic SQL feed based on SQLAlchemy.

Monolite

Lightweight feed for pulling data from multiple simple origins.

External Providers

openlake.Lite

ForML feed providing access to a number of public datasets.