IO Concept¶
ForML comes with a unique approach to data access. For projects to be truly portable, they must not be coupled directly with any specific data storages or formats. Accessing an explicit instance of a dataset as well as interpreting its particular structure can’t be part of the ML solution implemented as a ForML project. Instead, the solution has to work with an abstract representation of the relevant logical dataset, which only at runtime gets resolved using its actual provider (if available).
Important
Within the ForML architecture, all runtime system dependencies - including the pipeline I/O - are handled by the platform (using the pluggable provider concept) while projects themselves are independent of any runtime configuration to truly offer a conceptual solution to the given problem - not just to its particular instance within some specific environment.
The concepts ForML uses to handle data access are:
Schema Catalogs are the logical dataset representations distributed in catalogs available to both projects and platforms as portable dataset references
Query DSL is the internal expression language used by projects to define high-level ETL operations on top of the schema catalogs
Source Feeds are the platform data providers capable of resolving the project-defined DSL query using a particular data storage technology
Output Sinks are the feeds counterparts responsible for handling the produced pipeline results
Model Registries stand aside from the previous concepts as they deal with metadata rather than the data itself providing the pipeline state persistence
Schema Catalogs¶
To achieve data access abstraction, ForML integrates the concept of schema catalogs. Instead of implementing direct operations on specific data source instances, projects use the DSL expression to define the input data ETL referring only to abstract data schemas. It is then the responsibility of the platform feeds to resolve the requested schemas (and the whole ETL queries specified on top) mapping them to the actual data sources hosted in the particular runtime environment.
A schema catalog is a logical collection of schemas that both - projects and platforms - can use as a mutual data reference. It is neither a service nor a system, but rather a passive set of namespaced descriptors implemented simply as a python package that must be available to both the project expecting the particular data and the platform supposed to serve that project.
Content Resolution¶
When a project workflow is submitted to any given platform, it attempts to resolve the schemas referred from the source query using the configured feed providers, and only when all of these schema dependencies can be satisfied with the available data sources, the platform is able to launch that workflow.
The following diagram demonstrates the trilateral relationship between projects, schema catalogs and platform feeds - establishing the mechanism of the decoupled data access:
flowchart TB
subgraph Schema Catalog 1
s1{{Schema 1}}
end
subgraph Schema Catalog 2
direction LR
s3{{Schema 3}}
s2{{Schema 2}}
s4{{Schema 4}}
end
subgraph project [Project]
src[[source.py]] --> s1 & s2
pip[[pipeline.py]]
eval[[evaluation.py]]
end
subgraph platform [Runtime Platform]
subgraph Storage Layer
direction LR
db[(Database)]
xy[(Other)]
fs[(Filesystem)]
end
subgraph Feed Providers
direction LR
xyf[/Other Feed/]
dbf[/DB Feed/]
fsf[/FS Feed/]
xyf ---> s1 & s2
xyf --> xy
fsf --> fs
fsf ---> s4
dbf ---> s3 & s4
dbf --> db
end
end
project ===> platform
It tells the following story:
A project defines its data requirements using a source query specified in its source.py component referring to particular data source schema(s) from within certain catalogs - here Schema 1 from Catalog 1 and Schema 2 from Catalog 2.
This platform happens to be configured with three different feed providers capable of supplying (using its physical storage layer) four data sources represented by the given schema catalog so that:
the DB Feed can serve data sources represented by Schema 3 and Schema 4 physically stored in the Database
the FS Feed can also provide the data source matching Schema 4 duplicating its physical copy stored on the Filesystem
finally, the Other Feed knows how to supply data for Schema 1 and Schema 2
When the project gets launched on the platform, its source descriptor first goes through the feed selection process to find the most suitable feed provider for the given query, followed by the actual execution of the particular query by that selected feed, which results in the data payload entering the project workflow.
Publishing¶
An obvious aspect of the schema catalogs is their decentralization. Since they are implemented as python packages, they can be easily distributed using the standard means for python package publishing. Currently, there is no naming convention for the schema definition namespaces. Ideally, schemas should be published and held in namespaces of the original dataset producers. For private first-party datasets (i.e. internal company data) this is easy - the owner would just maintain a package with schemas of their data sources. For public datasets, this relies on some community-maintained schema catalogs like the Openschema catalog.
Continue to the schema DSL for more details regarding the actual implementation and use cases. Also, refer to the mentioned Openschema catalog for a real instance of a ForML schema catalog.
Source Descriptor¶
ForML projects specify their input data requirements - the ETL query optionally composed with other transforming operators - in form of a source descriptor (declared within the source.py project component).
This descriptor is created using the project.Source.query()
class method:
-
class forml.project.Source(extract: project.Source.Extract, transform: flow.Composable | None =
None
)[source]¶ ForML data source descriptor representing the ETL operation to be carried out at runtime to deliver the required input payload to the project pipeline.
The descriptor is a combination of an extraction DSL query and an optional transformation workflow.
Attention
Instances are supposed to be created using the
query()
method rather than calling the constructor directly.- extract : project.Source.Extract¶
A DSL query to be performed by the eventual platform Feed representing the extraction part of the ETL process. The value is assembled directly from the parameters of the
.query()
method.
- transform : flow.Composable | None¶
A workflow to be expanded into a regular task graph representing the optional transformation part of the ETL process. The value is accrued from (potentially repeated) chaining of the Source instance with workflow operators using the
>>
composition-like syntax.Examples
>>> ETL = project.Source.query( ... schema.FooBar.select(schema.FooBar.foo) ... ) >> payload.ToPandas()
- Labels¶
Label type - either a single column, multiple columns, or a generic label extracting actor (with two output ports) builder.
alias of
Union
[Feature
,Sequence
[Feature
],Builder
[Actor
[Tabular
,None
,tuple
[Sequence
[Any
],Sequence
[Any
]]]]]
-
classmethod query(features: dsl.Source, labels: project.Source.Labels | None =
None
, apply: dsl.Source | None =None
, ordinal: dsl.Operable | None =None
, once: str | None =None
) project.Source [source]¶ Factory method for creating a new Source descriptor instance with the given extraction parameters.
- Parameters:
- features: dsl.Source¶
A DSL query defining the train-mode (and implicitly also the apply-mode) dataset. The features must not contain any columns specified in the
labels
parameter.- labels: project.Source.Labels | None =
None
¶ Training label (or a sequence of) column(s) or a label extraction actor builder (single input and two output ports of [features, labels]).
- apply: dsl.Source | None =
None
¶ Optional query defining the explicit apply-mode features (if different from the train ones). If provided, it must result in the same layout as the main one provided via
features
.- ordinal: dsl.Operable | None =
None
¶ Optional specification of an ordinal column defining the relative ordering of the data records. If provided, the workflow can be launched with optional
lower
and/orupper
parameters specifying the requested data range.- once: str | None =
None
¶ The ordinal delivery semantic for incremental querying. Possible values are:
atleast
: Include both the lower and the upper ordinal bounds (leads to duplicate processing).atmost
: Leave out the lower bound and include the upper one (leads to data loss in case of continuous ordinals - safe for discrete values).exactly
: Include the lower bound but leave the upper bound out for the next batch (excludes processing of the tail records).
- Returns:
Source component instance.
- bind(pipeline: str | flow.Composable, **modules: Any) project.Artifact [source]¶
Create a virtual project handle from this Source and the given pipeline component.
The typical use case is the interactive execution.
- Parameters:
- pipeline: str | flow.Composable¶
Pipeline component to create the virtual project handle from.
- **modules: Any¶
Optional modules representing the other project components.
- Returns:
Virtual project handle.
Examples
>>> PIPELINE = payload.ToPandas() >>> SOURCE = project.Source.query( ... schema.FooBar.select(schema.FooBar.foo) ... ) >>> SOURCE.bind(PIPELINE).launcher.apply()
Data Payloads¶
In line with the overall architecture, ForML is designed to be as data format agnostic as possible. Conceptually, there are several scopes involving payload exchange requiring compatibility with the passing data.
Internal Payload Exchange¶
Payload-wise, the core ForML runtime is pretty generic, dealing only with a few tiny interfaces to handle the necessary exchange with an absolutely minimal footprint. Following is the list of the involved core payload types:
Producer Side |
Consumer Side |
Exchange Protocol |
---|---|---|
Origin data at rest |
Feed |
Each feed acts as an adapter designed specifically for the given origin format. |
Feed |
Feed Slicer |
Defined using the |
Feed Slicer |
Project Pipeline |
Defined using the |
Actor Payload Output Port |
Actor Payload Input Port |
No specific format required, the choice of mutually compatible actors is the responsibility of the implementer, ForML only facilitates the exchange (possibly subject to serializability). |
Project Pipeline |
Platform Sink Writer |
Defined using the |
Actor State Output Port |
Handled in form of a
|
|
Actor State Input Port |
Handled in form of a
|
-
forml.io.layout.Native =
~Native
¶ Generic type variable representing arbitrary native type.
- forml.io.layout.ColumnMajor¶
Sequence of columns of any type (columnar, column-wise semantic).
- class forml.io.layout.Tabular[source]¶
Dataset interface providing both row and column-oriented representation of the underlying data.
This is a lightweight interface to be used internally for data payload as returned by the Feed
Reader
only to be immediately turned toRowMajor
representation once leaving the FeedSlicer
.- abstract to_columns() layout.ColumnMajor [source]¶
Get the dataset in a column-oriented structure.
- Returns:
Column-wise dataset representation.
- abstract to_rows() layout.RowMajor [source]¶
Get the dataset in a row-oriented structure.
- Returns:
Row-wise dataset representation.
External Payload Exchange¶
In addition to the core payloads, the serving layer involves a few more data exchanges using the following structures:
Producer Side |
Consumer Side |
Exchange Protocol |
---|---|---|
Application Client |
Serving Gateway |
Each gateway acts as an adapter designed
specifically for the given application protocol
handling the payload as a |
Serving Gateway |
Serving Engine |
Using the |
Serving Engine |
Feed |
Passing the decoded |
Sink Writer |
Serving Engine |
Using the |
Serving Engine |
Serving Gateway |
Using the encoded |
Serving Gateway |
Application Client |
Handling the payload as a
|
- class forml.io.layout.Entry(schema: dsl.Source.Schema, data: layout.Tabular)[source]¶
Internal representation of the decoded
Request
payload.
- class forml.io.layout.Outcome(schema: dsl.Source.Schema, data: layout.RowMajor)[source]¶
Internal result payload representation to be encoded as
Response
.
-
class forml.io.layout.Request(payload: bytes, encoding: layout.Encoding, params: Mapping[str, Any] | None =
None
, accept: Sequence[layout.Encoding] | None =None
)[source]¶ Serving gateway request object.
- Parameters:
-
class Decoded(entry: layout.Entry, context: Any =
None
)[source]¶ Decoded request case class.
- entry : layout.Entry¶
Input data points to be applied for prediction.
- class forml.io.layout.Response(payload: bytes, encoding: layout.Encoding)[source]¶
Serving gateway response object.
- Parameters:
- payload: bytes¶
Raw encoded payload.
- encoding: layout.Encoding¶
Content type encoding instance.
Payload Encoding¶
ForML also depends on the following encoding features for the external payload exchange:
- class forml.io.layout.Encoding(kind: str, /, **options: str)[source]¶
Content type/encoding representation to be used by the Serving gateways.
- property header : str¶
Get the header-formatted representation of this encoding.
- Returns:
Header-formatted representation.
Examples
>>> layout.Encoding('application/json', charset='UTF-8').header 'application/json; charset=UTF-8'
- classmethod parse(value: str) Sequence[layout.Encoding] [source]¶
Caching parser of the content type header values.
- Parameters:
- Returns:
Sequence of the
Encoding
instances ordered according to the provided priority.
Examples
>>> layout.Encoding.parse('image/GIF; q=0.6; a=x, text/html; q=1.0') ( Encoding(kind='text/html', options={}), Encoding(kind='image/gif', options={'a': 'x'}) )
- match(other: layout.Encoding) bool [source]¶
Return true if the other encoding matches ours including glob wildcards.
Encoding matches if its kind fits our kind as a pattern (including potential glob wildcards) while all of our options are a subset of the other options.
- Parameters:
- other: layout.Encoding¶
Encoding to match against this. Must not contain wildcards!
- Returns:
True if matches.
Examples
>>> layout.Encoding('application/*').match( ... layout.Encoding('application/json') ... ) True
- class forml.io.layout.Encoder[source]¶
Encoder base class.
- abstract property encoding : layout.Encoding¶
Get the encoding produced by this encoder.
- Returns:
Encoding instance.
- abstract dumps(outcome: layout.Outcome) bytes [source]¶
Encoder logic.
- Parameters:
- outcome: layout.Outcome¶
Outcome to encode.
- Returns:
Encoded entry.
The two encoder/decoder matching functions below currently support the following encodings/flavors:
Content-type |
Example |
Implementation |
---|---|---|
|
|
Using |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
The
The |
|
|
|
Using |
- forml.io.layout.get_encoder(*targets: layout.Encoding) layout.Encoder [source]¶
Get an encoder capable of producing one of the given target encodings.
- Parameters:
- *targets: layout.Encoding¶
Encoding patterns (wildcards possible) to be produced by the matched encoder.
- Returns:
Encoder for one of the given target encoding.
- Raises:
layout.Encoding.Unsupported – If no suitable encoder available.
Examples
>>> layout.get_encoder( ... layout.Encoding('foo/bar'), ... layout.Encoding('application/*') ... ).encoding.header 'application/json; format=pandas-records'
- forml.io.layout.get_decoder(source: layout.Encoding) layout.Decoder [source]¶
Get a decoder suitable for the given source encoding.
- Parameters:
- source: layout.Encoding¶
Explicit encoding (no wildcards expected!) to find a decoder for.
- Returns:
Decoder for the given source encoding.
- Raises:
layout.Encoding.Unsupported – If no suitable encoder available.
Examples
>>> layout.get_decoder( ... layout.Encoding('application/json', format='pandas-columns') ... ).loads(b'{"A":{"0":1,"1":2},"B":{"0":"a","1":"b"}}').data.to_rows() array([[1, 'a'], [2, 'b']], dtype=object)
- class forml.io.layout.Encoding.Unsupported¶
Indication of an unsupported content type/encoding.
Payload Transformation Operators¶
ForML also provides a bunch of payload transformation operators as part of the pipeline
library
.