Pipeline Implementation
After finishing the exploration, we can proceed to implement the actual
ML solution in form of the ForML project pipeline component.
For cleaner structuring, we change the pipeline
component from a flat module to a hierarchical
package (which is semantically identical) and add a skeleton for our
titanic.pipeline.preprocessing
module together with its unit tests under
tests/pipeline/test_preprocessing.py
that we are going to implement later.
The project structure now looks as follows:
$ tree forml-tutorial-titanic
forml-tutorial-titanic
├── notebooks
│ └── exploration.ipynb
├── pyproject.toml
├── tests
│ └── __init__.py
└── titanic
├── __init__.py
├── evaluation.py
├── pipeline
│ ├── __init__.py
│ └── preprocessing.py
└── source.py
Important
Make sure to remove the original titanic/pipeline.py
created initially along with the
default project structure.
Custom Preprocessing Operators
In addition to the Imputer
operator we have created in the scope of our exploration, let’s improve our preprocessing with a couple more operators. We stick to the
simple @wrap
technique for implementing actors and operators eventually.
ParseTitle
Let’s start with a simple stateless transformer extracting the title from the name creating a
new column (target
) and dropping the original (source
) with the name:
titanic/pipeline/preprocessing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 | @wrap.Operator.mapper
@wrap.Actor.apply
def ParseTitle(
features: pandas.DataFrame,
*,
source: str,
target: str,
) -> pandas.DataFrame:
"""Transformer extracting a person's title from the name string."""
def get_title(name: str) -> str:
"""Auxiliary method for extracting the title."""
if '.' in name:
return name.split(',')[1].split('.')[0].strip().lower()
return 'n/a'
features[target] = features[source].map(get_title)
return features.drop(columns=source)
|
Encode
The OneHotEncoder
we used in our
baseline workflow was applied bluntly to all columns including those
non-categorical ones. Let’s improve it by creating a custom operator with a parameterized
selection of the encoded columns:
titanic/pipeline/preprocessing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 | @wrap.Actor.train
def Encode(
state: typing.Optional[preprocessing.OneHotEncoder],
features: pandas.DataFrame,
labels: pandas.Series,
columns: typing.Sequence[str],
) -> preprocessing.OneHotEncoder:
"""Train part of a stateful encoder for the various categorical features."""
encoder = preprocessing.OneHotEncoder(handle_unknown='infrequent_if_exist', sparse_output=False)
encoder.fit(features[columns])
return encoder
@wrap.Operator.mapper
@Encode.apply
def Encode(
state: preprocessing.OneHotEncoder, features: pandas.DataFrame, columns: typing.Sequence[str]
) -> pandas.DataFrame:
"""Apply part of a stateful encoder for the various categorical features."""
onehot = pandas.DataFrame(state.transform(features[columns]))
result = pandas.concat((features.drop(columns=columns), onehot), axis='columns')
result.columns = [str(c) for c in result.columns]
return result
|
Writing Unit Tests
As a best practice, let’s define unit tests for our operators.
tests/pipeline/test_preprocessing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47 | import numpy
import pandas
from titanic.pipeline import preprocessing
from forml import testing
class TestParseTitle(testing.operator(preprocessing.ParseTitle)):
"""Unit testing the stateless TitleParser transformer."""
# Dataset fixtures
INPUT = pandas.DataFrame({'Name': ['Smith, Mr. John', 'Black, Ms. Jane', 'Brown, Mrs. Jo', 'White, Ian']})
EXPECTED = pandas.DataFrame({'Title': ['mr', 'ms', 'mrs', 'n/a']})
# Test scenarios
invalid_params = testing.Case(foo='bar').raises(TypeError, "got an unexpected keyword argument 'foo'")
invalid_source = testing.Case(source='Foo', target='Bar').apply(INPUT).raises(KeyError, 'Foo')
valid_parsing = testing.Case(source='Name', target='Title').apply(INPUT).returns(EXPECTED, testing.pandas_equals)
class TestImpute(testing.operator(preprocessing.Impute)):
"""NaN Imputer unit tests."""
def matcher(expected: pandas.DataFrame, actual: pandas.DataFrame) -> bool: # pylint: disable=no-self-argument
"""Custom matcher to verify the actual imputations."""
assert actual.notna().all().all()
# pylint: disable=unsubscriptable-object
if not testing.pandas_equals(expected['Embarked'], actual['Embarked']) or not testing.pandas_equals(
expected['Fare'], actual['Fare']
):
return False
source_age = TestImpute.FEATURES['Age']
source_lo = source_age.mean() - source_age.std(ddof=0)
source_hi = source_age.mean() + source_age.std(ddof=0)
imputed_age = actual['Age'][source_age.isna()]
return ((imputed_age >= source_lo) & (imputed_age <= source_hi)).all()
# Dataset fixtures
FEATURES = pandas.DataFrame(
{'Age': [1.0, numpy.nan, 3.0], 'Embarked': ['X', numpy.nan, 'Y'], 'Fare': [1.0, numpy.nan, 3.0]}
)
EXPECTED = pandas.DataFrame({'foo': [1.0, 4.0, 2.0], 'Embarked': ['X', 'S', 'Y'], 'Fare': [1.0, 2.0, 3.0]})
# Test scenarios
invalid_params = testing.Case('foo').raises(TypeError, 'takes 1 positional argument but 2 were given')
not_trained = testing.Case().apply(FEATURES).raises(RuntimeError, 'not trained')
valid_imputation = testing.Case().train(FEATURES).apply(FEATURES).returns(EXPECTED, matcher)
|
Caution
Don’t forget to create the empty __init__.py
module in the tests/pipeline/
directory.
Pipeline Expression
With all of the preprocessing operators now ready, we can proceed to define the actual
workflow expression for the pipeline component within the titanic/pipeline/__init__.py
.
Unlike in our baseline workflow, we are going to use multiple classifiers
stacked together using the pipeline.ensemble.FullStack
ensembler. Each of the individual models is a native Scikit-learn classifier auto-wrapped into a ForML operator using the
pipeline.wrap.importer()
context manager.
The file again ends with a call to the project.setup()
to register
the component within the framework.
titanic/pipeline/__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49 | from sklearn import model_selection
from titanic.pipeline import preprocessing
from forml import project
from forml.pipeline import ensemble, wrap
# Let's import a number of the sklearn classifiers:
# using the ``wrap.importer``, they'll transparently get converted into ForML operators
with wrap.importer():
# pylint: disable=ungrouped-imports
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
# Ensembling a number of base models using the Stacking Generalization:
STACK = ensemble.FullStack( # here FullStack is the particular ensembler implementation
GradientBoostingClassifier(random_state=42),
SVC(kernel='rbf', random_state=42, probability=True),
SVC(kernel='linear', random_state=42, probability=True),
KNeighborsClassifier(n_neighbors=5, metric='minkowski', p=2),
GaussianNB(),
RandomForestClassifier(n_estimators=10, criterion='entropy', random_state=42),
DecisionTreeClassifier(criterion='entropy', random_state=42),
# and selecting ``StratifiedKFold`` as the cross-validator for generating the stack folds
crossvalidator=model_selection.StratifiedKFold(n_splits=2, shuffle=True, random_state=42),
)
# And finally, let's put together the actual pipeline composition using our preprocessing operators
# and the model ensemble:
PIPELINE = (
preprocessing.Impute(random_state=42)
# >> payload.Dump(path='/tmp/titanic/impute-$mode-$seq.csv')
>> preprocessing.ParseTitle(source='Name', target='Title') # pylint: disable=no-value-for-parameter
>> preprocessing.Encode(columns=['Sex', 'Embarked', 'Title', 'Pclass'])
>> StandardScaler(copy=False)
# >> payload.Dump(path='/tmp/titanic/pretrain-$mode-$seq.csv')
>> STACK
# >> payload.Dump(path='/tmp/titanic/stack-$mode-$seq.csv')
>> LogisticRegression(random_state=42)
)
# Registering the pipeline
project.setup(PIPELINE)
|
Component-wise, this makes our project complete, allowing us to
further progress its lifecycle.