Source Feed¶
A Feed is a runtime platform component responsible for interpreting the ETL query defined as the project source and resolving the requested dataset using the linked storage system.
Architecture¶
The Feed concept is based on two main principles:
A DSL-interpreting
Reader
acting as an adapter between the storage layer and the pipeline.A content resolver using an explicit mapping of the published schema catalogs to the hosted data sources effectively matching the logical schemas with actual data.
Content resolution takes place in the scope of the DSL parsing
as part of the Reader routine simply by visiting
and
replacing the matched DSL Sources
/Features
with the mapped terms declared already using the parser-target semantic.
When launching the pipeline, ForML runner expands the Feed into one or more initial tasks within the assembled workflow making it a native part of the final DAG to be executed.
The core Feed API looks as follows:
- forml.io.Producer¶
alias of
Callable
[[dsl.Statement
,Optional
[layout.Entry
]],layout.Tabular
]
- class forml.io.Feed(**readerkw)[source]¶
Abstract base class for data source feed providers.
It integrates the concept of a DSL-based
Reader
provided using theproducer()
method or by overriding the inner.Reader
class and a content resolver with its abstract parts represented by thesources
andfeatures
properties.The need for implementing the content resolver mapping specifically for each particular platform makes it more difficult to setup Feed providers using just the parametric configuration and might end up requiring to actually implement the Feed (or at least the final resolver part) explicitly as a code.
Important
Feeds need to be serializable!
- classmethod producer(sources: Mapping[dsl.Source, parser.Source], features: Mapping[dsl.Feature, parser.Feature], **kwargs: Any) io.Producer [source]¶
Producer factory method.
A
Producer
is a generic callable interface most typically represented using theforml.io.Feed.Reader
implementation whose task is to parse the provided DSL query and resolve it using its linked storage.Unless overloaded, the method returns an instance of
cls.Reader
(which might be easier to extend without needing to overload this method).- Parameters:
- sources: Mapping[dsl.Source, parser.Source]¶
Source mappings to be used by the reader (see
sources
).- features: Mapping[dsl.Feature, parser.Feature]¶
Column mappings to be used by the reader (see
features
).- **kwargs: Any¶
Optional reader keyword arguments.
- Returns:
Producer instance.
- abstract property sources : Mapping[dsl.Source, parser.Source]¶
The main part of the content resolver providing the Source mappings.
This way the Feed is advertising the available datasets represented using their published schemas logically mapped to the hosted data sources specified using the parser-specific semantics.
A
Source
is a DSL concept representing anything that can be queried as a data source.- Returns:
Sources mapping.
Examples
Using a parser with
SQLAlchemy
semantics, an example of the mapping might look like this:return { schema.Titanic: sqlalchemy.table('titanic'), foo.Bar.join(foo.Baz, foo.Bar.id == foo.Baz.id): sqlalchemy.table('foobar'), }
Note the capability of mapping a complex query to a denormalized dataset.
- property features : Mapping[dsl.Feature, parser.Feature]¶
The minor part of the content resolver providing the optional Feature mappings.
Optional mapping of individual Features to their hosted representation using the parser-specific semantic.
A
Feature
is a DSL concept representing anything that can be projected to a data column.- Returns:
Features mapping.
- class forml.io.Feed.Reader(sources: Mapping[dsl.Source, parser.Source], features: Mapping[dsl.Feature, parser.Feature], **kwargs: Any)¶
Generic reader base class matching the Feed producer interface.
It is a low-level input component responsible for parsing a generic data request in the form of a
DSL query
(theparser()
method) and based on it retrieving the actual data from its supported storage technology and its specific data format (theread()
method).Reader can operate in two possible modes depending on the input parameters:
extraction - when called just using a
query
without the inputentry
parameter, it simply executes the (parsed) query against the backend storage.augmentation - if the
entry
value is provided, it is interpreted as the actual source to be returned but potentially incomplete in terms of the expected schema; in which case the reader is supposed to just complete the partial data to match thequery
schema.
Todo
Implement the augmentation mode.
- abstract classmethod parser(sources: Mapping[dsl.Source, parser.Source], features: Mapping[dsl.Feature, parser.Feature]) parser.Visitor ¶
Parser factory method.
The parser instance must be able to convert a DSL query into storage-native instructions compatible with the
read()
method.During parsing, the provided
sources
/features
mappings are supposed to be used for the content resolution.- Parameters:
- sources: Mapping[dsl.Source, parser.Source]¶
Source mappings to be used by the parser.
- features: Mapping[dsl.Feature, parser.Feature]¶
Feature mappings to be used by the parser.
- Returns:
Parser instance.
- classmethod format(schema: dsl.Source.Schema, data: layout.Native) layout.Tabular ¶
Convert the storage-native data into the required
layout.Tabular
format.- Parameters:
- schema: dsl.Source.Schema¶
Data schema.
- data: layout.Native¶
Input data.
- Returns:
Data formatted into the
layout.Tabular
format.
For reference, several existing Reader implementations can be found under the
forml.provider.feed.reader
package:
|
SQLAlchemy based reader. |
|
SQL reader base class for PEP 249 compliant DB APIs. |
Contextual Feed Selection¶
Unlike the other provider types which explicitly nominate exactly one instance each before launching, feeds go through a more dynamic process of selecting the most suitable candidate in the context of the actual data query.
For this purpose, ForML uses the io.Importer
class:
- class forml.io.Importer(*feeds: setup.Feed | str | io.Feed)[source]¶
Pool of (possibly) lazily instantiated feeds.
The pool is used to select the most suitable feed instance capable of resolving the particular DSL query (in terms of providing data sources for all the involved schemas). Feed instances can have a static priority assigned in which case the first feed with the highest priority capable of providing the data is returned.
If configured without any explicit instances, all the feeds registered in the provider cache are pooled.
Todo
This logic should be extended to also probe the available data range so that a feed without the expected data range is not prioritized over another feed that has the range but has a lower absolute priority.
- match(source: dsl.Source) io.Feed [source]¶
Select a feed instance that can supply data for the given
source
.- Parameters:
- source: dsl.Source¶
Any DSL Source representing the data request.
- Returns:
Feed that’s able to provide data for the given request.
- Raises:
forml.MissingError – If no feed can provide the requested data.
Custom Feed Setup¶
Existing generic Feed implementations can be configured as any other provider types. The strong deployment-specific character of the content resolver setup (explicit mapping of the published schemas and the hosted data sources) might, however, require to declare bespoke Feed providers using not just parametric configuration but rather directly as a non-generic code.
For more details, see the custom provider setup instructions.
Feed Providers¶
Feed providers can be configured within the runtime platform setup using the [FEED.*]
sections.
The available implementations are:
Generic SQL feed based on SQLAlchemy. |
|
Lightweight feed for pulling data from multiple simple origins. |
External Providers
ForML feed providing access to a number of public datasets. |