Pipelines

exception kedro_mlflow.pipeline.pipeline_ml.KedroMlflowPipelineMLError

Bases: Exception

Error raised when the KedroPipelineModel construction fails

class kedro_mlflow.pipeline.pipeline_ml.PipelineML(nodes: Iterable[Union[kedro.pipeline.node.Node, kedro.pipeline.pipeline.Pipeline]], *args, tags: Optional[Union[str, Iterable[str]]] = None, inference: kedro.pipeline.pipeline.Pipeline, input_name: str, kpm_kwargs: Optional[Dict[str, str]] = None, log_model_kwargs: Optional[Dict[str, str]] = None)

Bases: kedro.pipeline.pipeline.Pipeline

IMPORTANT NOTE : THIS CLASS IS NOT INTENDED TO BE USED DIRECTLY IN A KEDRO PROJECT. YOU SHOULD USE pipeline_ml_factory FUNCTION FOR MODULAR PIPELINE WHICH IS MORE FLEXIBLE AND USER FRIENDLY. SEE INSERT_DOC_URL

A PipelineML is a kedro Pipeline which we assume is a “training” (in the machine learning way) pipeline. Basically, “training” is a higher order function (it generates another function). It implies that: - the outputs of this pipeline are considered as “fitted models”, i.e. inputs of another inference pipeline (it is very likely that there are several outputs because we need to store any object that depends on the train data (e.g encoders, binarizers, vectorizer, machine learning models…) - These outputs will feed another “inference” pipeline (to be used for prediction purpose) whose inputs

are the outputs of the “training” pipeline, except for one of them (the new data to predict).

This class enables to “link” a training pipeline and an inference pipeline in order to package them in mlflow easily. The goal is to call the MLflowPipelineHook hook after a PipelineMl is called in order to trigger mlflow packaging.

KPM_KWARGS_DEFAULT = {}
LOG_MODEL_KWARGS_DEFAULT = {'artifact_path': 'model', 'signature': 'auto'}
__init__(nodes: Iterable[Union[kedro.pipeline.node.Node, kedro.pipeline.pipeline.Pipeline]], *args, tags: Optional[Union[str, Iterable[str]]] = None, inference: kedro.pipeline.pipeline.Pipeline, input_name: str, kpm_kwargs: Optional[Dict[str, str]] = None, log_model_kwargs: Optional[Dict[str, str]] = None)

Store all necessary information for calling mlflow.log_model in the pipeline.

Parameters
  • nodes (Iterable[Union[Node, Pipeline]]) – The `node`s of the training pipeline.

  • tags (Union[str, Iterable[str]], optional) – Optional set of tags to be applied to all the pipeline nodes. Defaults to None.

  • inference (Pipeline) – A Pipeline object which will be stored in mlflow and use the output(s) of the training pipeline (namely, the model) to predict the outcome.

  • input_name (str, optional) – The name of the dataset in the catalog.yml which the model’s user must provide for prediction (i.e. the data). Defaults to None.

  • kpm_kwargs

    extra arguments to be passed to KedroPipelineModel when the PipelineML object is automatically saved at the end of a run. This includes:

    • copy_mode: the copy_mode to be used for underlying dataset

    when loaded in memory - runner: the kedro runner to run the model with

  • log_model_kwargs – extra arguments to be passed to mlflow.pyfunc.log_model - “signature” accepts an extra “auto” which automatically infer the signature based on “input_name” dataset

decorate(*decorators: Callable) kedro_mlflow.pipeline.pipeline_ml.PipelineML

Create a new Pipeline by applying the provided decorators to all the nodes in the pipeline. If no decorators are passed, it will return a copy of the current Pipeline object.

Parameters

*decorators – Decorators to be applied on all node functions in the pipeline, always applied from right to left.

Returns

A new Pipeline object with all nodes decorated with the provided decorators.

from_inputs(*inputs: str) kedro_mlflow.pipeline.pipeline_ml.PipelineML

Create a new Pipeline object with the nodes which depend directly or transitively on the provided inputs. If provided a name, but no format, for a transcoded input, it includes all the nodes that use inputs with that name, otherwise it matches to the fully-qualified name only (i.e. name@format).

Parameters

*inputs – A list of inputs which should be used as a starting point of the new Pipeline

Raises

ValueError – Raised when any of the given inputs do not exist in the Pipeline object.

Returns

A new Pipeline object, containing a subset of the

nodes of the current one such that only nodes depending directly or transitively on the provided inputs are being copied.

from_nodes(*node_names: str) kedro_mlflow.pipeline.pipeline_ml.PipelineML

Create a new Pipeline object with the nodes which depend directly or transitively on the provided nodes.

Parameters

*node_names – A list of node_names which should be used as a starting point of the new Pipeline.

Raises

ValueError – Raised when any of the given names do not exist in the Pipeline object.

Returns

A new Pipeline object, containing a subset of the nodes of

the current one such that only nodes depending directly or transitively on the provided nodes are being copied.

property inference: str
property input_name: str
only_nodes_with_inputs(*inputs: str) kedro_mlflow.pipeline.pipeline_ml.PipelineML

Create a new Pipeline object with the nodes which depend directly on the provided inputs. If provided a name, but no format, for a transcoded input, it includes all the nodes that use inputs with that name, otherwise it matches to the fully-qualified name only (i.e. name@format).

Parameters

*inputs – A list of inputs which should be used as a starting point of the new Pipeline.

Raises

ValueError – Raised when any of the given inputs do not exist in the Pipeline object.

Returns

A new Pipeline object, containing a subset of the

nodes of the current one such that only nodes depending directly on the provided inputs are being copied.

only_nodes_with_outputs(*outputs: str) kedro_mlflow.pipeline.pipeline_ml.PipelineML

Create a new Pipeline object with the nodes which are directly required to produce the provided outputs. If provided a name, but no format, for a transcoded dataset, it includes all the nodes that output to that name, otherwise it matches to the fully-qualified name only (i.e. name@format).

Parameters

*outputs – A list of outputs which should be the final outputs of the new Pipeline.

Raises

ValueError – Raised when any of the given outputs do not exist in the Pipeline object.

Returns

A new Pipeline object, containing a subset of the nodes of the current one such that only nodes which are directly required to produce the provided outputs are being copied.

only_nodes_with_tags(*tags: str) kedro_mlflow.pipeline.pipeline_ml.PipelineML

Create a new Pipeline object with the nodes which contain any of the provided tags. The resulting Pipeline is empty if no tags are provided.

Parameters

*tags – A list of node tags which should be used to lookup the nodes of the new Pipeline.

Returns

A new Pipeline object, containing a subset of the

nodes of the current one such that only nodes containing any of the tags provided are being copied.

Return type

Pipeline

tag(tags: Union[str, Iterable[str]]) kedro_mlflow.pipeline.pipeline_ml.PipelineML

Return a copy of the pipeline, with each node tagged accordingly. :param tags: The tags to be added to the nodes. :return: New Pipeline object.

to_nodes(*node_names: str) kedro_mlflow.pipeline.pipeline_ml.PipelineML

Create a new Pipeline object with the nodes required directly or transitively by the provided nodes.

Parameters

*node_names – A list of node_names which should be used as an end point of the new Pipeline.

Raises

ValueError – Raised when any of the given names do not exist in the Pipeline object.

Returns

A new Pipeline object, containing a subset of the nodes of the

current one such that only nodes required directly or transitively by the provided nodes are being copied.

to_outputs(*outputs: str) kedro_mlflow.pipeline.pipeline_ml.PipelineML

Create a new Pipeline object with the nodes which are directly or transitively required to produce the provided outputs. If provided a name, but no format, for a transcoded dataset, it includes all the nodes that output to that name, otherwise it matches to the fully-qualified name only (i.e. name@format).

Parameters

*outputs – A list of outputs which should be the final outputs of the new Pipeline.

Raises

ValueError – Raised when any of the given outputs do not exist in the Pipeline object.

Returns

A new Pipeline object, containing a subset of the nodes of the current one such that only nodes which are directly or transitively required to produce the provided outputs are being copied.

property training: kedro.pipeline.pipeline.Pipeline
kedro_mlflow.pipeline.pipeline_ml_factory.pipeline_ml_factory(training: kedro.pipeline.pipeline.Pipeline, inference: kedro.pipeline.pipeline.Pipeline, input_name: Optional[str] = None, kpm_kwargs=None, log_model_kwargs=None) kedro_mlflow.pipeline.pipeline_ml.PipelineML

This function is a helper to create PipelineML object directly from two Kedro Pipelines (one of training and one of inference) .

Parameters
  • training (Pipeline) – The Pipeline object that creates all mlflow artifacts for prediction (the model, but also encoders, binarizers, tokenizers…). These artifacts must be persisted in the catalog.yml.

  • inference (Pipeline) – A Pipeline object which will be stored in mlflow and use the output(s) of the training pipeline (namely, the model) to predict the outcome.

  • input_name (str, optional) – The name of the dataset in the catalog.yml which the model’s user must provide for prediction (i.e. the data). Defaults to None.

  • kpm_kwargs

    extra arguments to be passed to KedroPipelineModel when the PipelineML object is automatically saved at the end of a run. This includes:

    • copy_mode: the copy_mode to be used for underlying dataset

    when loaded in memory - runner: the kedro runner to run the model with

  • logging_kwargs – extra arguments to be passed to mlflow.pyfunc.log_model when the PipelineML object is automatically saved at the end of a run. See mlflow documentation to see all available options: https://www.mlflow.org/docs/latest/python_api/mlflow.pyfunc.html#mlflow.pyfunc.log_model

Returns

A PipelineML which is automatically

discovered by the MlflowPipelineHook and contains all the information for logging the inference pipeline as a Mlflow Model.

Return type

PipelineML