Source code for forml.provider.feed.monolite

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Special feed allowing to combine multiple simple sources.
"""
import abc
import pathlib
import typing

import pandas

import forml
from forml.io import dsl
from forml.provider.feed import lazy

if typing.TYPE_CHECKING:
    from forml.io import layout


class Origin(lazy.Origin[None], metaclass=abc.ABCMeta):
    """Base class for data origin handlers."""

    def __init__(self, source: typing.Union['dsl.Source', str]):
        if isinstance(source, str):
            source = dsl.Schema.from_path(source)
        self._source: 'dsl.Source' = source

    @classmethod
    def parse_config(cls, config: typing.Any) -> typing.Mapping[str, typing.Any]:  # pylint: disable=unused-argument
        """Parse the configuration options into the init ``**kwargs``.

        Args:
            config: Raw config options.

        Returns:
            Init ``**kwargs``.

        Raises:
            forml.InvalidError: In case of invalid configuration options.
        """
        return {}

    @classmethod
    def create(cls, setup: typing.Mapping[typing.Union['dsl.Source', str], typing.Any]) -> typing.Sequence['Origin']:
        """Factory method for creating origin instances out of the setup mapping.

        Args:
            setup: Mapping of DSL schema instances or fully qualified string paths to the origin
                   configuration parameters.

        Returns:
            Sequence of the parsed origin instances.
        """
        return tuple(cls(s, **cls.parse_config(o)) for s, o in setup.items())

    @property
    def source(self) -> 'dsl.Source':
        """The DSL source description of this origin.

        The left side of the content resolving mapping (e.g. a ``dsl.Table`` schema).

        Returns:
            DSL representation of this origin.
        """
        return self._source

    @property
    def names(self) -> typing.Sequence[str]:
        """List of column names within this origin.

        Returns:
            Column names.
        """
        return [f.name for f in self.source.schema]


class Inline(Origin):
    """Inline data origin."""

    def __init__(self, schema: typing.Union['dsl.Source', str], content: 'layout.RowMajor'):
        super().__init__(schema)
        self._content: pandas.DataFrame = pandas.DataFrame(content, columns=self.names)

    @classmethod
    def parse_config(cls, config: typing.Any) -> typing.Mapping[str, typing.Any]:
        return {'content': config}

    def load(self, partition: typing.Optional[None]) -> pandas.DataFrame:
        return self._content


class File(Origin, metaclass=abc.ABCMeta):
    """Abstract file origin."""

    OPTIONS = {}

    def __init__(self, schema: typing.Union['dsl.Source', str], path: typing.Union[pathlib.Path, str], **kwargs):
        super().__init__(schema)
        self._path: pathlib.Path = pathlib.Path(path)
        self._kwargs = self.OPTIONS | kwargs

    @classmethod
    def parse_config(
        cls, config: typing.Union[pathlib.Path, str, typing.Mapping[str, typing.Any]]
    ) -> typing.Mapping[str, typing.Any]:
        if isinstance(config, typing.Mapping):
            try:
                return config.get('kwargs', {}) | {'path': config['path']}
            except KeyError as err:
                raise forml.MissingError('Missing required parameter `path`') from err
        else:
            return {'path': config}

    def load(self, partition: typing.Optional[None]) -> pandas.DataFrame:
        return self.read(self._path, **self._kwargs)

    @abc.abstractmethod
    def read(self, path: pathlib.Path, **kwargs) -> pandas.DataFrame:
        """Physical reader implementation.

        Args:
            path: File to read.

        Returns:
            File content as Pandas dataframe.
        """


class Csv(File):
    """CSV file origin."""

    OPTIONS = {'parse_dates': True, 'header': 0}

    def read(self, path: pathlib.Path, **kwargs) -> pandas.DataFrame:
        return pandas.read_csv(path, **({'names': self.names} | kwargs))


class Parquet(File):
    """Parquet file origin."""

    def read(self, path: pathlib.Path, **kwargs) -> pandas.DataFrame:
        return pandas.read_parquet(path, **({'columns': self.names} | kwargs))


[docs]class Feed(lazy.Feed, alias='monolite'): """Lightweight feed for pulling data from multiple simple origins. The feed can resolve queries across all of its combined data sources. All the origins need to be declared using a proper :ref:`content resolver <io-resolution>` mapping with keys representing the fully qualified schema name formatted as ``<full.module.path>:<qualified.Class.Name>`` and the values should be origin-specific configuration options. Attention: All the referenced :ref:`schema catalogs <io-catalog>` must be installed. Supported origins: * *Inline* data provided as a row-oriented array. * *CSV files* parsed using the :func:`pandas:pandas.read_csv`. * *Parquet files* parsed using the :func:`pandas:pandas.read_parquet`. Args: inline: Schema mapping of datasets provided inline as native row-oriented arrays. csv: Schema mapping of datasets accessible using a CSV reader. Values can either be direct file system paths or mapping with two keys: * ``path`` pointing to the CSV file * ``kwargs`` containing additional options to be passed to the underlying :func:`pandas:pandas.read_csv` parquet: Schema mapping of datasets accessible using a Parquet reader. Values can either be direct file system paths or mapping with two keys: * ``path`` pointing to the Parquet file * ``kwargs`` containing additional options to be passed to the underlying :func:`pandas:pandas.read_parquet` The provider can be enabled using the following :ref:`platform configuration <platform-config>`: .. code-block:: toml :caption: config.toml [FEED.mono] provider = "monolite" [FEED.mono.inline] "foobar.schemas:Foo.Baz" = [ ["alpha", 27, 0.314, 2021-05-11T17:12:24], ["beta", 11, -1.12, 2020-11-03T01:24:56], ] [FEED.mono.csv] "openschema.kaggle:Titanic" = "/tmp/titanic.csv" [FEED.mono.csv."openschema.sklearn:Iris"] path = "/tmp/iris.csv" kwargs = {sep = ";", engine = "pyarrow"} [FEED.mono.parquet] "openschema.kaggle:Avazu" = "/tmp/avazu.parquet" Important: Select the ``sql`` :ref:`extras to install <install-extras>` ForML together with the SQLAlchemy support. Todo: * More file types (json) * Multi-file data sources (partitions) """ def __init__( self, inline: typing.Optional[typing.Mapping[typing.Union['dsl.Source', str], 'layout.RowMajor']] = None, csv: typing.Optional[ typing.Mapping[ typing.Union['dsl.Source', str], typing.Union[pathlib.Path, str, typing.Mapping[str, typing.Any]], ] ] = None, parquet: typing.Optional[ typing.Mapping[ typing.Union['dsl.Source', str], typing.Union[pathlib.Path, str, typing.Mapping[str, typing.Any]], ] ] = None, ): origins = [] if inline: origins.extend(Inline.create(inline)) if csv: origins.extend(Csv.create(csv)) if parquet: origins.extend(Parquet.create(parquet)) super().__init__(*origins)