Workflow Case Study¶
The following sections demonstrate all the individual workflow features engaged
together to implement a practical pipeline. Even though the data actors could be implemented to work
with arbitrary data types, we choose for simplicity the
pandas.DataFrame
as our case study payload plus we assume it is all numeric values.
Simple Workflow¶
Starting with a basic use case, we want to implement a simple workflow with the following logic:
when in the train-mode:
applying a custom binarize transformation to the input labels
training and applying a custom mean-removal scaler
training a
Sklearn LogisticRegression
classifier
when in the apply-mode:
applying the trained scaler transformer
making a prediction with the
Sklearn LogisticRegression
classifier
We are going to implement the actors and operators with the help of the decorator wrappers:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
|
This straightforward implementation produces a PIPELINE
represented using a flow.Trunk
with the following visualization:
flowchart TD
subgraph Train Mode
btl(["Binarizer.apply()"]) -- L --> stt["Scaler.train()"] & ltt["LogisticRegression.train()"]
sta(["Scaler.apply()"]) --> ltt
stt -. state .-> sta
end
subgraph Apply Mode
saa(["Scaler.apply()"]) --> laa(["LogisticRegression.apply()"])
stt -. state .-> saa
ltt -. state .-> laa
end
subgraph Trunk Heads
ti((T)) --> stt & sta
li((L)) -- L --> btl
ai((A)) --> saa
end
subgraph Trunk Tails
sta --> to((T))
btl -- L --> lo((L))
laa --> ao((A))
end
Complex Operator¶
To demonstrate the true power of the composition concept, let’s
implement a more complex operator - we can call it KFoldWrapper
- with the following logic:
prepends the train part of the composition scope with a 1:N stateless range-based splitter Actor
clones the task graph in the composition scope N-times and with each of its train segments:
attach the head to the matching splitter output port
attach the tail to the matching stacker input port
finally sends the apply-mode outputs from all of these N branches to the N:1 reducer Actor
The idea behind this operator is to train+apply the preceding scope in multiple parallel instances on the range-split part of the data and stacking these partial results back together in the train-mode using the stacker while reducing them into a single value using the reducer when in the apply-mode.
Such an operator can be implemented by extending the flow.Operator
as
follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
|
Note how it uses the Future nodes to create the virtual heads for
some of its segments to prepend the entire composition scope. In each
iteration, the for
loop expands the left side of the composition scope producing the branch
task graph to be wrapped. Its train and label input segments get attached to the relevant
splitter ports, while the apply segment goes directly to the main apply-mode head node.
Final Pipeline¶
Let’s now upgrade our pipeline expression with this operator to demonstrate the full
composition functionality. For complete illustration, we also provide the possible
implementations of Splitter
and Mean
actors.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
|
We deliberately chose (by applying the parentheses) the composition scope to include just the preceding Scaler
operator without the Binarizer
.
For the readability of the following visualization, we set the nfolds
(which results in the
number of branches) to just 2
. That leads to the following diagram:
flowchart TD
subgraph Train Mode
btl(["Binarizer.apply()"]) -- L --> ftl(["Splitter[L].apply()"]) & ltt["LogisticRegression.train()"]
fta(["Splitter[F].apply()"]) -- F1 --> s1tt["Scaler[1].train()"] & s1ta(["Scaler[1].apply()"])
fta -- F2 --> s2tt["Scaler[2].train()"] & s2ta(["Scaler[2].apply()"])
ftl -- L1 --> s1tt
ftl -- L2 --> s2tt
s1ta & s2ta --> cta
cta(["Concat.apply()"]) --> ltt
s1tt -. state .-> s1ta
s2tt -. state .-> s2ta
end
subgraph Apply Mode
s1aa(["Scaler[1].apply()"]) & s2aa(["Scaler[2].apply()"]) --> raa(["Mean.apply()"])
raa --> laa(["LogisticRegression.apply()"])
ltt -. state .-> laa
s1tt -. state .-> s1aa
s2tt -. state .-> s2aa
end
subgraph Trunk Heads
ti((T)) --> fta
li((L)) -- L --> btl
ai((A)) --> s1aa & s2aa
end
subgraph Trunk Tails
btl -- L --> lo((L))
cta --> to((T))
laa --> ao((A))
end
As you can see, there remains to be a single instance of the Binarizer
as well as the
LogisticRegression
classifier, while the inner part of the task graph now forks into
two branches and merges back together by Concat
in the train-mode (where each branch receives
distinct train data) and Mean
in the apply-mode (where the branches receive the same data).