IO Concept

ForML comes with a unique approach to data access. For projects to be truly portable, they must not be coupled directly with any specific data storages or formats. Accessing an explicit instance of a dataset as well as interpreting its particular structure can’t be part of the ML solution implemented as a ForML project. Instead, the solution has to work with an abstract representation of the relevant logical dataset, which only at runtime gets resolved using its actual provider (if available).

Important

Within the ForML architecture, all runtime system dependencies - including the pipeline I/O - are handled by the platform (using the pluggable provider concept) while projects themselves are independent of any runtime configuration to truly offer a conceptual solution to the given problem - not just to its particular instance within some specific environment.

The concepts ForML uses to handle data access are:

  • Schema Catalogs are the logical dataset representations distributed in catalogs available to both projects and platforms as portable dataset references

  • Query DSL is the internal expression language used by projects to define high-level ETL operations on top of the schema catalogs

  • Source Feeds are the platform data providers capable of resolving the project-defined DSL query using a particular data storage technology

  • Output Sinks are the feeds counterparts responsible for handling the produced pipeline results

  • Model Registries stand aside from the previous concepts as they deal with metadata rather than the data itself providing the pipeline state persistence

Schema Catalogs

To achieve data access abstraction, ForML integrates the concept of schema catalogs. Instead of implementing direct operations on specific data source instances, projects use the DSL expression to define the input data ETL referring only to abstract data schemas. It is then the responsibility of the platform feeds to resolve the requested schemas (and the whole ETL queries specified on top) mapping them to the actual data sources hosted in the particular runtime environment.

A schema catalog is a logical collection of schemas that both - projects and platforms - can use as a mutual data reference. It is neither a service nor a system, but rather a passive set of namespaced descriptors implemented simply as a python package that must be available to both the project expecting the particular data and the platform supposed to serve that project.

Content Resolution

When a project workflow is submitted to any given platform, it attempts to resolve the schemas referred from the source query using the configured feed providers, and only when all of these schema dependencies can be satisfied with the available data sources, the platform is able to launch that workflow.

The following diagram demonstrates the trilateral relationship between projects, schema catalogs and platform feeds - establishing the mechanism of the decoupled data access:

flowchart TB
    subgraph Schema Catalog 1
        s1{{Schema 1}}
    end
    subgraph Schema Catalog 2
        direction LR
        s3{{Schema 3}}
        s2{{Schema 2}}
        s4{{Schema 4}}
    end
    subgraph project [Project]
        src[[source.py]] --> s1 & s2
        pip[[pipeline.py]]
        eval[[evaluation.py]]
    end
    subgraph platform [Runtime Platform]
        subgraph Storage Layer
            direction LR
            db[(Database)]
            xy[(Other)]
            fs[(Filesystem)]
        end
        subgraph Feed Providers
            direction LR
            xyf[/Other Feed/]
            dbf[/DB Feed/]
            fsf[/FS Feed/]
            xyf ---> s1 & s2
            xyf --> xy
            fsf --> fs
            fsf ---> s4
            dbf ---> s3 & s4
            dbf --> db
        end
    end
    project ===> platform

It tells the following story:

  1. A project defines its data requirements using a source query specified in its source.py component referring to particular data source schema(s) from within certain catalogs - here Schema 1 from Catalog 1 and Schema 2 from Catalog 2.

  2. This platform happens to be configured with three different feed providers capable of supplying (using its physical storage layer) four data sources represented by the given schema catalog so that:

    • the DB Feed can serve data sources represented by Schema 3 and Schema 4 physically stored in the Database

    • the FS Feed can also provide the data source matching Schema 4 duplicating its physical copy stored on the Filesystem

    • finally, the Other Feed knows how to supply data for Schema 1 and Schema 2

  3. When the project gets launched on the platform, its source descriptor first goes through the feed selection process to find the most suitable feed provider for the given query, followed by the actual execution of the particular query by that selected feed, which results in the data payload entering the project workflow.

Publishing

An obvious aspect of the schema catalogs is their decentralization. Since they are implemented as python packages, they can be easily distributed using the standard means for python package publishing. Currently, there is no naming convention for the schema definition namespaces. Ideally, schemas should be published and held in namespaces of the original dataset producers. For private first-party datasets (i.e. internal company data) this is easy - the owner would just maintain a package with schemas of their data sources. For public datasets, this relies on some community-maintained schema catalogs like the Openschema catalog.

Continue to the schema DSL for more details regarding the actual implementation and use cases. Also, refer to the mentioned Openschema catalog for a real instance of a ForML schema catalog.

Source Descriptor

ForML projects specify their input data requirements - the ETL query optionally composed with other transforming operators - in form of a source descriptor (declared within the source.py project component).

This descriptor is created using the project.Source.query() class method:

class forml.project.Source(extract: project.Source.Extract, transform: flow.Composable | None = None)[source]

ForML data source descriptor representing the ETL operation to be carried out at runtime to deliver the required input payload to the project pipeline.

The descriptor is a combination of an extraction DSL query and an optional transformation workflow.

Attention

Instances are supposed to be created using the query() method rather than calling the constructor directly.

extract : project.Source.Extract

A DSL query to be performed by the eventual platform Feed representing the extraction part of the ETL process. The value is assembled directly from the parameters of the .query() method.

transform : flow.Composable | None

A workflow to be expanded into a regular task graph representing the optional transformation part of the ETL process. The value is accrued from (potentially repeated) chaining of the Source instance with workflow operators using the >> composition-like syntax.

Examples

>>> ETL = project.Source.query(
...     schema.FooBar.select(schema.FooBar.foo)
... ) >> payload.ToPandas()
Labels

Label type - either a single column, multiple columns, or a generic label extracting actor (with two output ports) builder.

alias of Union[Feature, Sequence[Feature], Builder[Actor[Tabular, None, tuple[Sequence[Any], Sequence[Any]]]]]

classmethod query(features: dsl.Source, labels: project.Source.Labels | None = None, apply: dsl.Source | None = None, ordinal: dsl.Operable | None = None, once: str | None = None) project.Source[source]

Factory method for creating a new Source descriptor instance with the given extraction parameters.

Parameters:
features: dsl.Source

A DSL query defining the train-mode (and implicitly also the apply-mode) dataset. The features must not contain any columns specified in the labels parameter.

labels: project.Source.Labels | None = None

Training label (or a sequence of) column(s) or a label extraction actor builder (single input and two output ports of [features, labels]).

apply: dsl.Source | None = None

Optional query defining the explicit apply-mode features (if different from the train ones). If provided, it must result in the same layout as the main one provided via features.

ordinal: dsl.Operable | None = None

Optional specification of an ordinal column defining the relative ordering of the data records. If provided, the workflow can be launched with optional lower and/or upper parameters specifying the requested data range.

once: str | None = None

The ordinal delivery semantic for incremental querying. Possible values are:

  • atleast: Include both the lower and the upper ordinal bounds (leads to duplicate processing).

  • atmost: Leave out the lower bound and include the upper one (leads to data loss in case of continuous ordinals - safe for discrete values).

  • exactly: Include the lower bound but leave the upper bound out for the next batch (excludes processing of the tail records).

Returns:

Source component instance.

bind(pipeline: str | flow.Composable, **modules: Any) project.Artifact[source]

Create a virtual project handle from this Source and the given pipeline component.

The typical use case is the interactive execution.

Parameters:
pipeline: str | flow.Composable

Pipeline component to create the virtual project handle from.

**modules: Any

Optional modules representing the other project components.

Returns:

Virtual project handle.

Examples

>>> PIPELINE = payload.ToPandas()
>>> SOURCE = project.Source.query(
...     schema.FooBar.select(schema.FooBar.foo)
... )
>>> SOURCE.bind(PIPELINE).launcher.apply()

Data Payloads

In line with the overall architecture, ForML is designed to be as data format agnostic as possible. Conceptually, there are several scopes involving payload exchange requiring compatibility with the passing data.

Internal Payload Exchange

Payload-wise, the core ForML runtime is pretty generic, dealing only with a few tiny interfaces to handle the necessary exchange with an absolutely minimal footprint. Following is the list of the involved core payload types:

Producer Side

Consumer Side

Exchange Protocol

Origin data at rest

Feed Reader

Each feed acts as an adapter designed specifically for the given origin format.

Feed Reader

Feed Slicer

Defined using the io.layout.Tabular interface.

Feed Slicer

Project Pipeline

Defined using the io.layout.RowMajor interface.

Actor Payload Output Port

Actor Payload Input Port

No specific format required, the choice of mutually compatible actors is the responsibility of the implementer, ForML only facilitates the exchange (possibly subject to serializability).

Project Pipeline

Platform Sink Writer

Defined using the io.layout.RowMajor interface.

Actor State Output Port

Model Registry

Handled in form of a bytestring as implemented by the .get_state() method.

Model Registry

Actor State Input Port

Handled in form of a bytestring as implemented by the .set_state() method.

class forml.io.layout.Native

Generic type variable representing arbitrary native type.

alias of TypeVar(‘Native’)

forml.io.layout.ColumnMajor

alias of Sequence[Any]

forml.io.layout.RowMajor

alias of Sequence[Any]

class forml.io.layout.Tabular[source]

Dataset interface providing both row and column-oriented representation of the underlying data.

This is a lightweight interface to be used internally for data payload as returned by the Feed Reader only to be immediately turned to RowMajor representation once leaving the Feed Slicer.

abstract to_columns() layout.ColumnMajor[source]

Get the dataset in a column-oriented structure.

Returns:

Column-wise dataset representation.

abstract to_rows() layout.RowMajor[source]

Get the dataset in a row-oriented structure.

Returns:

Row-wise dataset representation.

abstract take_rows(indices: Sequence[int]) layout.Tabular[source]

Slice the table returning a new instance with just the selected rows.

Parameters:
indices: Sequence[int]

Row indices to take.

Returns:

New instance with just the given rows taken.

abstract take_columns(indices: Sequence[int]) layout.Tabular[source]

Slice the table returning a new instance with just the selected columns.

Parameters:
indices: Sequence[int]

Column indices to take.

Returns:

New instance with just the given columns taken.

External Payload Exchange

In addition to the core payloads, the serving layer involves a few more data exchanges using the following structures:

Producer Side

Consumer Side

Exchange Protocol

Application Client

Serving Gateway

Each gateway acts as an adapter designed specifically for the given application protocol handling the payload as a bytestring with explicit Encoding.

Serving Gateway

Serving Engine

Using the io.layout.Request structure.

Serving Engine

Feed Reader

Passing the decoded io.layout.Entry to the given feed for potential augmentation.

Sink Writer

Serving Engine

Using the io.layout.Outcome structure.

Serving Engine

Serving Gateway

Using the encoded io.layout.Response structure.

Serving Gateway

Application Client

Handling the payload as a bytestring with an explicit Encoding wrapped to the given application protocol.

class forml.io.layout.Entry(schema: dsl.Source.Schema, data: layout.Tabular)[source]

Internal representation of the decoded Request payload.

class forml.io.layout.Outcome(schema: dsl.Source.Schema, data: layout.RowMajor)[source]

Internal result payload representation to be encoded as Response.

class forml.io.layout.Request(payload: bytes, encoding: layout.Encoding, params: Mapping[str, Any] | None = None, accept: Sequence[layout.Encoding] | None = None)[source]

Serving gateway request object.

Parameters:
payload: bytes

Raw encoded payload.

encoding: layout.Encoding

Content type encoding instance.

params: Mapping[str, Any] | None = None

Optional application-level parameters.

accept: Sequence[layout.Encoding] | None = None

Content types request for the eventual Response.

class Decoded(entry: layout.Entry, context: Any = None)[source]

Decoded request case class.

entry : layout.Entry

Input data points to be applied for prediction.

context : Any

Custom (serializable!) metadata produced within the (user-defined) application scope and carried throughout the request processing flow.

class forml.io.layout.Response(payload: layout.Payload, instance: asset.Instance)[source]

Serving gateway response object.

Parameters:
payload: layout.Payload

Raw encoded payload.

instance: asset.Instance

Model instance used to generate this response.

class forml.io.layout.Payload(data: bytes, encoding: layout.Encoding)[source]

Combo for binary data and its encoding.

Payload Encoding

ForML also depends on the following encoding features for the external payload exchange:

class forml.io.layout.Encoding(kind: str, /, **options: str)[source]

Content type/encoding representation to be used by the Serving gateways.

Parameters:
kind: str

Content type label.

**options: str

Encoding options.

property header : str

Get the header-formatted representation of this encoding.

Returns:

Header-formatted representation.

Examples

>>> layout.Encoding('application/json', charset='UTF-8').header
'application/json; charset=UTF-8'
classmethod parse(value: str) Sequence[layout.Encoding][source]

Caching parser of the content type header values.

Parameters:
value: str

Comma-separated list of content type values and their parameters.

Returns:

Sequence of the Encoding instances ordered according to the provided priority.

Examples

>>> layout.Encoding.parse('image/GIF; q=0.6; a=x, text/html; q=1.0')
(
    Encoding(kind='text/html', options={}),
    Encoding(kind='image/gif', options={'a': 'x'})
)
match(other: layout.Encoding) bool[source]

Return true if the other encoding matches ours including glob wildcards.

Encoding matches if its kind fits our kind as a pattern (including potential glob wildcards) while all of our options are a subset of the other options.

Parameters:
other: layout.Encoding

Encoding to match against this. Must not contain wildcards!

Returns:

True if matches.

Examples

>>> layout.Encoding('application/*').match(
...     layout.Encoding('application/json')
... )
True
class forml.io.layout.Encoder[source]

Encoder base class.

abstract property encoding : layout.Encoding

Get the encoding produced by this encoder.

Returns:

Encoding instance.

abstract dumps(outcome: layout.Outcome) bytes[source]

Encoder logic.

Parameters:
outcome: layout.Outcome

Outcome to encode.

Returns:

Encoded entry.

class forml.io.layout.Decoder[source]

Decoder base class.

abstract loads(data: bytes) layout.Entry[source]

Decoder logic.

Parameters:
data: bytes

Bytes to decode.

Returns:

Decoded entry.

The two encoder/decoder matching functions below currently support the following encodings/flavors:

Content-type

Example

Implementation

application/json; format=pandas-records

[{column -> value}, ... , {column -> value}]

Using pandas.read_json() for decoding and pandas.DataFrame.to_json() for encoding. The particular format refers to the value of the orient= parameter of each of the functions.

application/json; format=pandas-columns

{column -> {index -> value}}

application/json; format=pandas-index

{index -> {column -> value}}

application/json; format=pandas-split

{"index": [index], "columns": [columns], "data": [values]}

application/json; format=pandas-table

{"schema": {schema}, "data": {data}}

application/json; format=pandas-values

[[value, ..., value], [...]]

application/json

The Decoder attempts to interpret the data as:

  1. a list of row dictionaries

  2. TF serving’s instances format

  3. TF serving’s inputs format

  4. a dictionary of column lists

The Encoder defaults to the pandas-records format.

text/csv

A,B\n1,a\n2,b\n3,c\n

Using pandas.read_csv() for decoding and pandas.DataFrame.to_csv() for encoding.

forml.io.layout.get_encoder(*targets: layout.Encoding) layout.Encoder[source]

Get an encoder capable of producing one of the given target encodings.

Parameters:
*targets: layout.Encoding

Encoding patterns (wildcards possible) to be produced by the matched encoder.

Returns:

Encoder for one of the given target encoding.

Raises:

layout.Encoding.Unsupported – If no suitable encoder available.

Examples

>>> layout.get_encoder(
...     layout.Encoding('foo/bar'),
...     layout.Encoding('application/*')
... ).encoding.header
'application/json; format=pandas-records'
forml.io.layout.get_decoder(source: layout.Encoding) layout.Decoder[source]

Get a decoder suitable for the given source encoding.

Parameters:
source: layout.Encoding

Explicit encoding (no wildcards expected!) to find a decoder for.

Returns:

Decoder for the given source encoding.

Raises:

layout.Encoding.Unsupported – If no suitable encoder available.

Examples

>>> layout.get_decoder(
...     layout.Encoding('application/json', format='pandas-columns')
... ).loads(b'{"A":{"0":1,"1":2},"B":{"0":"a","1":"b"}}').data.to_rows()
array([[1, 'a'],
       [2, 'b']], dtype=object)
class forml.io.layout.Encoding.Unsupported

Indication of an unsupported content type/encoding.

Payload Transformation Operators

ForML also provides a bunch of payload transformation operators as part of the pipeline library.