Pipeline Implementation

After finishing the exploration, we can proceed to implement the actual ML solution in form of the ForML project pipeline component.

For cleaner structuring, we change the pipeline component from a flat module to a hierarchical package (which is semantically identical) and add a skeleton for our titanic.pipeline.preprocessing module together with its unit tests under tests/pipeline/test_preprocessing.py that we are going to implement later. The project structure now looks as follows:

$ tree forml-tutorial-titanic
forml-tutorial-titanic
├── notebooks
│   └── exploration.ipynb
├── pyproject.toml
├── tests
│   └── __init__.py
└── titanic
    ├── __init__.py
    ├── evaluation.py
    ├── pipeline
    │   ├── __init__.py
    │   └── preprocessing.py
    └── source.py

Important

Make sure to remove the original titanic/pipeline.py created initially along with the default project structure.

Custom Preprocessing Operators

In addition to the Imputer operator we have created in the scope of our exploration, let’s improve our preprocessing with a couple more operators. We stick to the simple @wrap technique for implementing actors and operators eventually.

ParseTitle

Let’s start with a simple stateless transformer extracting the title from the name creating a new column (target) and dropping the original (source) with the name:

titanic/pipeline/preprocessing.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
@wrap.Operator.mapper
@wrap.Actor.apply
def ParseTitle(
    features: pandas.DataFrame,
    *,
    source: str,
    target: str,
) -> pandas.DataFrame:
    """Transformer extracting a person's title from the name string."""

    def get_title(name: str) -> str:
        """Auxiliary method for extracting the title."""
        if '.' in name:
            return name.split(',')[1].split('.')[0].strip().lower()
        return 'n/a'

    features[target] = features[source].map(get_title)
    return features.drop(columns=source)

Encode

The OneHotEncoder we used in our baseline workflow was applied bluntly to all columns including those non-categorical ones. Let’s improve it by creating a custom operator with a parameterized selection of the encoded columns:

titanic/pipeline/preprocessing.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@wrap.Actor.train
def Encode(
    state: typing.Optional[preprocessing.OneHotEncoder],
    features: pandas.DataFrame,
    labels: pandas.Series,
    columns: typing.Sequence[str],
) -> preprocessing.OneHotEncoder:
    """Train part of a stateful encoder for the various categorical features."""
    encoder = preprocessing.OneHotEncoder(handle_unknown='infrequent_if_exist', sparse_output=False)
    encoder.fit(features[columns])
    return encoder


@wrap.Operator.mapper
@Encode.apply
def Encode(
    state: preprocessing.OneHotEncoder, features: pandas.DataFrame, columns: typing.Sequence[str]
) -> pandas.DataFrame:
    """Apply part of a stateful encoder for the various categorical features."""
    onehot = pandas.DataFrame(state.transform(features[columns]))
    result = pandas.concat((features.drop(columns=columns), onehot), axis='columns')
    result.columns = [str(c) for c in result.columns]
    return result


Writing Unit Tests

As a best practice, let’s define unit tests for our operators.

tests/pipeline/test_preprocessing.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import numpy
import pandas
from titanic.pipeline import preprocessing

from forml import testing


class TestParseTitle(testing.operator(preprocessing.ParseTitle)):
    """Unit testing the stateless TitleParser transformer."""

    # Dataset fixtures
    INPUT = pandas.DataFrame({'Name': ['Smith, Mr. John', 'Black, Ms. Jane', 'Brown, Mrs. Jo', 'White, Ian']})
    EXPECTED = pandas.DataFrame({'Title': ['mr', 'ms', 'mrs', 'n/a']})

    # Test scenarios
    invalid_params = testing.Case(foo='bar').raises(TypeError, "got an unexpected keyword argument 'foo'")
    invalid_source = testing.Case(source='Foo', target='Bar').apply(INPUT).raises(KeyError, 'Foo')
    valid_parsing = testing.Case(source='Name', target='Title').apply(INPUT).returns(EXPECTED, testing.pandas_equals)


class TestImpute(testing.operator(preprocessing.Impute)):
    """NaN Imputer unit tests."""

    def matcher(expected: pandas.DataFrame, actual: pandas.DataFrame) -> bool:  # pylint: disable=no-self-argument
        """Custom matcher to verify the actual imputations."""
        assert actual.notna().all().all()
        # pylint: disable=unsubscriptable-object
        if not testing.pandas_equals(expected['Embarked'], actual['Embarked']) or not testing.pandas_equals(
            expected['Fare'], actual['Fare']
        ):
            return False
        source_age = TestImpute.FEATURES['Age']
        source_lo = source_age.mean() - source_age.std(ddof=0)
        source_hi = source_age.mean() + source_age.std(ddof=0)
        imputed_age = actual['Age'][source_age.isna()]
        return ((imputed_age >= source_lo) & (imputed_age <= source_hi)).all()

    # Dataset fixtures
    FEATURES = pandas.DataFrame(
        {'Age': [1.0, numpy.nan, 3.0], 'Embarked': ['X', numpy.nan, 'Y'], 'Fare': [1.0, numpy.nan, 3.0]}
    )
    EXPECTED = pandas.DataFrame({'foo': [1.0, 4.0, 2.0], 'Embarked': ['X', 'S', 'Y'], 'Fare': [1.0, 2.0, 3.0]})

    # Test scenarios
    invalid_params = testing.Case('foo').raises(TypeError, 'takes 1 positional argument but 2 were given')
    not_trained = testing.Case().apply(FEATURES).raises(RuntimeError, 'not trained')
    valid_imputation = testing.Case().train(FEATURES).apply(FEATURES).returns(EXPECTED, matcher)

Caution

Don’t forget to create the empty __init__.py module in the tests/pipeline/ directory.

Pipeline Expression

With all of the preprocessing operators now ready, we can proceed to define the actual workflow expression for the pipeline component within the titanic/pipeline/__init__.py.

Unlike in our baseline workflow, we are going to use multiple classifiers stacked together using the pipeline.ensemble.FullStack ensembler. Each of the individual models is a native Scikit-learn classifier auto-wrapped into a ForML operator using the pipeline.wrap.importer() context manager.

The file again ends with a call to the project.setup() to register the component within the framework.

titanic/pipeline/__init__.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from sklearn import model_selection
from titanic.pipeline import preprocessing

from forml import project
from forml.pipeline import ensemble, wrap

# Let's import a number of the sklearn classifiers:
# using the ``wrap.importer``, they'll transparently get converted into ForML operators
with wrap.importer():
    # pylint: disable=ungrouped-imports
    from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
    from sklearn.linear_model import LogisticRegression
    from sklearn.naive_bayes import GaussianNB
    from sklearn.neighbors import KNeighborsClassifier
    from sklearn.preprocessing import StandardScaler
    from sklearn.svm import SVC
    from sklearn.tree import DecisionTreeClassifier


# Ensembling a number of base models using the Stacking Generalization:
STACK = ensemble.FullStack(  # here FullStack is the particular ensembler implementation
    GradientBoostingClassifier(random_state=42),
    SVC(kernel='rbf', random_state=42, probability=True),
    SVC(kernel='linear', random_state=42, probability=True),
    KNeighborsClassifier(n_neighbors=5, metric='minkowski', p=2),
    GaussianNB(),
    RandomForestClassifier(n_estimators=10, criterion='entropy', random_state=42),
    DecisionTreeClassifier(criterion='entropy', random_state=42),
    # and selecting ``StratifiedKFold`` as the cross-validator for generating the stack folds
    crossvalidator=model_selection.StratifiedKFold(n_splits=2, shuffle=True, random_state=42),
)


# And finally, let's put together the actual pipeline composition using our preprocessing operators
# and the model ensemble:
PIPELINE = (
    preprocessing.Impute(random_state=42)
    # >> payload.Dump(path='/tmp/titanic/impute-$mode-$seq.csv')
    >> preprocessing.ParseTitle(source='Name', target='Title')  # pylint: disable=no-value-for-parameter
    >> preprocessing.Encode(columns=['Sex', 'Embarked', 'Title', 'Pclass'])
    >> StandardScaler(copy=False)
    # >> payload.Dump(path='/tmp/titanic/pretrain-$mode-$seq.csv')
    >> STACK
    # >> payload.Dump(path='/tmp/titanic/stack-$mode-$seq.csv')
    >> LogisticRegression(random_state=42)
)

# Registering the pipeline
project.setup(PIPELINE)

Component-wise, this makes our project complete, allowing us to further progress its lifecycle.