Source code for forml.provider.feed.monolite

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
Special feed allowing to combine multiple simple sources.
import abc
import pathlib
import typing

import pandas

import forml
from import dsl
from forml.provider.feed import lazy

if typing.TYPE_CHECKING:
    from import layout

class Origin(lazy.Origin[None], metaclass=abc.ABCMeta):
    """Base class for data origin handlers."""

    def __init__(self, source: typing.Union['dsl.Source', str]):
        if isinstance(source, str):
            source = dsl.Schema.from_path(source)
        self._source: 'dsl.Source' = source

    def parse_config(cls, config: typing.Any) -> typing.Mapping[str, typing.Any]:  # pylint: disable=unused-argument
        """Parse the configuration options into the init ``**kwargs``.

            config: Raw config options.

            Init ``**kwargs``.

            forml.InvalidError: In case of invalid configuration options.
        return {}

    def create(cls, setup: typing.Mapping[typing.Union['dsl.Source', str], typing.Any]) -> typing.Sequence['Origin']:
        """Factory method for creating origin instances out of the setup mapping.

            setup: Mapping of DSL schema instances or fully qualified string paths to the origin
                   configuration parameters.

            Sequence of the parsed origin instances.
        return tuple(cls(s, **cls.parse_config(o)) for s, o in setup.items())

    def source(self) -> 'dsl.Source':
        """The DSL source description of this origin.

        The left side of the content resolving mapping (e.g. a ``dsl.Table`` schema).

            DSL representation of this origin.
        return self._source

    def names(self) -> typing.Sequence[str]:
        """List of column names within this origin.

            Column names.
        return [ for f in self.source.schema]

class Inline(Origin):
    """Inline data origin."""

    def __init__(self, schema: typing.Union['dsl.Source', str], content: 'layout.RowMajor'):
        self._content: pandas.DataFrame = pandas.DataFrame(content, columns=self.names)

    def parse_config(cls, config: typing.Any) -> typing.Mapping[str, typing.Any]:
        return {'content': config}

    def load(self, partition: typing.Optional[None]) -> pandas.DataFrame:
        return self._content

class File(Origin, metaclass=abc.ABCMeta):
    """Abstract file origin."""

    OPTIONS = {}

    def __init__(self, schema: typing.Union['dsl.Source', str], path: typing.Union[pathlib.Path, str], **kwargs):
        self._path: pathlib.Path = pathlib.Path(path)
        self._kwargs = self.OPTIONS | kwargs

    def parse_config(
        cls, config: typing.Union[pathlib.Path, str, typing.Mapping[str, typing.Any]]
    ) -> typing.Mapping[str, typing.Any]:
        if isinstance(config, typing.Mapping):
                return config.get('kwargs', {}) | {'path': config['path']}
            except KeyError as err:
                raise forml.MissingError('Missing required parameter `path`') from err
            return {'path': config}

    def load(self, partition: typing.Optional[None]) -> pandas.DataFrame:
        return, **self._kwargs)

    def read(self, path: pathlib.Path, **kwargs) -> pandas.DataFrame:
        """Physical reader implementation.

            path: File to read.

            File content as Pandas dataframe.

class Csv(File):
    """CSV file origin."""

    OPTIONS = {'parse_dates': True, 'header': 0}

    def read(self, path: pathlib.Path, **kwargs) -> pandas.DataFrame:
        return pandas.read_csv(path, **({'names': self.names} | kwargs))

class Parquet(File):
    """Parquet file origin."""

    def read(self, path: pathlib.Path, **kwargs) -> pandas.DataFrame:
        return pandas.read_parquet(path, **({'columns': self.names} | kwargs))

[docs]class Feed(lazy.Feed, alias='monolite'): """Lightweight feed for pulling data from multiple simple origins. The feed can resolve queries across all of its combined data sources. All the origins need to be declared using a proper :ref:`content resolver <io-resolution>` mapping with keys representing the fully qualified schema name formatted as ``<full.module.path>:<qualified.Class.Name>`` and the values should be origin-specific configuration options. Attention: All the referenced :ref:`schema catalogs <io-catalog>` must be installed. Supported origins: * *Inline* data provided as a row-oriented array. * *CSV files* parsed using the :func:`pandas:pandas.read_csv`. * *Parquet files* parsed using the :func:`pandas:pandas.read_parquet`. Args: inline: Schema mapping of datasets provided inline as native row-oriented arrays. csv: Schema mapping of datasets accessible using a CSV reader. Values can either be direct file system paths or mapping with two keys: * ``path`` pointing to the CSV file * ``kwargs`` containing additional options to be passed to the underlying :func:`pandas:pandas.read_csv` parquet: Schema mapping of datasets accessible using a Parquet reader. Values can either be direct file system paths or mapping with two keys: * ``path`` pointing to the Parquet file * ``kwargs`` containing additional options to be passed to the underlying :func:`pandas:pandas.read_parquet` The provider can be enabled using the following :ref:`platform configuration <platform-config>`: .. code-block:: toml :caption: config.toml [FEED.mono] provider = "monolite" [FEED.mono.inline] "foobar.schemas:Foo.Baz" = [ ["alpha", 27, 0.314, 2021-05-11T17:12:24], ["beta", 11, -1.12, 2020-11-03T01:24:56], ] [FEED.mono.csv] "openschema.kaggle:Titanic" = "/tmp/titanic.csv" [FEED.mono.csv."openschema.sklearn:Iris"] path = "/tmp/iris.csv" kwargs = {sep = ";", engine = "pyarrow"} [FEED.mono.parquet] "openschema.kaggle:Avazu" = "/tmp/avazu.parquet" Important: Select the ``sql`` :ref:`extras to install <install-extras>` ForML together with the SQLAlchemy support. Todo: * More file types (json) * Multi-file data sources (partitions) """ def __init__( self, inline: typing.Optional[typing.Mapping[typing.Union['dsl.Source', str], 'layout.RowMajor']] = None, csv: typing.Optional[ typing.Mapping[ typing.Union['dsl.Source', str], typing.Union[pathlib.Path, str, typing.Mapping[str, typing.Any]], ] ] = None, parquet: typing.Optional[ typing.Mapping[ typing.Union['dsl.Source', str], typing.Union[pathlib.Path, str, typing.Mapping[str, typing.Any]], ] ] = None, ): origins = [] if inline: origins.extend(Inline.create(inline)) if csv: origins.extend(Csv.create(csv)) if parquet: origins.extend(Parquet.create(parquet)) super().__init__(*origins)