Pipelines
PipelineML
and pipeline_ml_factory
PipelineML
is a new class which extends Pipeline
and enable to bind two pipelines (one of training, one of inference) together. This class comes with a KedroPipelineModel
class for logging it in mlflow. A pipeline logged as a mlflow model can be served using mlflow models serve
and mlflow models predict
command.
The PipelineML
class is not intended to be used directly. A pipeline_ml_factory
factory is provided for user friendly interface.
Example within kedro template:
# in src/PYTHON_PACKAGE/pipeline.py
from PYTHON_PACKAGE.pipelines import data_science as ds
def create_pipelines(**kwargs) -> Dict[str, Pipeline]:
data_science_pipeline = ds.create_pipeline()
training_pipeline = pipeline_ml_factory(
training=data_science_pipeline.only_nodes_with_tags(
"training"
), # or whatever your logic is for filtering
inference=data_science_pipeline.only_nodes_with_tags("inference"),
)
return {
"ds": data_science_pipeline,
"training": training_pipeline,
"__default__": data_engineering_pipeline + data_science_pipeline,
}
Now each time you will run kedro run --pipeline=training
(provided you registered MlflowHook
in you run.py
), the full inference pipeline will be registered as a mlflow model (with all the outputs produced by training as artifacts : the machine learning model, but also the scaler, vectorizer, imputer, or whatever object fitted on data you create in training
and that is used in inference
).
Note that:
the
inference
pipelineinput_name
can be aMemoryDataset
and it belongs to inference pipelineinputs
Apart form
input_name
, all otherinference
pipelineinputs
must be persisted locally on disk (i.e. it must not beMemoryDataset
and must have a localfilepath
)the
inference
pipelineinputs
must belong to trainingoutputs
(vectorizer, binarizer, machine learning model…)the
inference
pipeline must have one and only oneoutput
Caution
PipelineML
objects do not implement all filtering methods of a regular Pipeline
, and you cannot add or substract 2 PipelineML
together. The rationale is that a filtered PipelineML
is not a PipelineML
in general, because the filtering is not consistent between training and inference. You can see the ones which are supported in the code.
You can also directly log a PipelineML
object in mlflow
programatically:
from pathlib import Path
from kedro.framework.context import load_context
from kedro_mlflow.mlflow import KedroPipelineModel
from mlflow.models import ModelSignature
# pipeline_training is your PipelineML object, created as previsously
catalog = load_context(".").io
# artifacts are all the inputs of the inference pipelines that are persisted in the catalog
artifacts = pipeline_training.extract_pipeline_artifacts(catalog)
# get the schema of the input dataset
input_data = catalog.load(pipeline_training.input_name)
model_signature = infer_signature(model_input=input_data)
mlflow.pyfunc.log_model(
artifact_path="model",
python_model=KedroPipelineModel(pipeline=pipeline_training, catalog=catalog),
artifacts=artifacts,
conda_env={"python": "3.10.0", dependencies: ["kedro==0.18.11"]},
signature=model_signature,
)
It is also possible to pass arguments to KedroPipelineModel
to specify the runner or the copy_mode of MemoryDataset
for the inference Pipeline
. This may be faster especially for compiled model (e.g keras, tensorflow…), and more suitable for an API serving pattern. Since kedro-mlflow==0.12.0
, copy_mode="assign"
has become the default.
KedroPipelineModel(pipeline=pipeline_training, catalog=catalog, copy_mode="assign")
Available copy_mode
are assign
, copy
and deepcopy
. It is possible to pass a dictionary to specify different copy mode for each dataset.