Pipelines

exception kedro_mlflow.pipeline.pipeline_ml.KedroMlflowPipelineMLError

Bases: Exception

Error raised when the KedroPipelineModel construction fails

class kedro_mlflow.pipeline.pipeline_ml.PipelineML(nodes: Iterable[Union[kedro.pipeline.node.Node, kedro.pipeline.pipeline.Pipeline]], *args, tags: Optional[Union[str, Iterable[str]]] = None, inference: kedro.pipeline.pipeline.Pipeline, input_name: str, kpm_kwargs: Optional[Dict[str, str]] = None, log_model_kwargs: Optional[Dict[str, str]] = None)

Bases: kedro.pipeline.pipeline.Pipeline

IMPORTANT NOTE : THIS CLASS IS NOT INTENDED TO BE USED DIRECTLY IN A KEDRO PROJECT. YOU SHOULD USE pipeline_ml_factory FUNCTION FOR MODULAR PIPELINE WHICH IS MORE FLEXIBLE AND USER FRIENDLY. SEE INSERT_DOC_URL

A PipelineML is a kedro Pipeline which we assume is a “training” (in the machine learning way) pipeline. Basically, “training” is a higher order function (it generates another function). It implies that: - the outputs of this pipeline are considered as “fitted models”, i.e. inputs of another inference pipeline (it is very likely that there are several outputs because we need to store any object that depends on the train data (e.g encoders, binarizers, vectorizer, machine learning models…) - These outputs will feed another “inference” pipeline (to be used for prediction purpose) whose inputs

are the outputs of the “training” pipeline, except for one of them (the new data to predict).

This class enables to “link” a training pipeline and an inference pipeline in order to package them in mlflow easily. The goal is to call the MLflowPipelineHook hook after a PipelineMl is called in order to trigger mlflow packaging.

KPM_KWARGS_DEFAULT = {}
LOG_MODEL_KWARGS_DEFAULT = {'artifact_path': 'model', 'signature': 'auto'}
__init__(nodes: Iterable[Union[kedro.pipeline.node.Node, kedro.pipeline.pipeline.Pipeline]], *args, tags: Optional[Union[str, Iterable[str]]] = None, inference: kedro.pipeline.pipeline.Pipeline, input_name: str, kpm_kwargs: Optional[Dict[str, str]] = None, log_model_kwargs: Optional[Dict[str, str]] = None)

Store all necessary information for calling mlflow.log_model in the pipeline.

Parameters
  • nodes (Iterable[Union[Node, Pipeline]]) – The `node`s of the training pipeline.

  • tags (Union[str, Iterable[str]], optional) – Optional set of tags to be applied to all the pipeline nodes. Defaults to None.

  • inference (Pipeline) – A Pipeline object which will be stored in mlflow and use the output(s) of the training pipeline (namely, the model) to predict the outcome.

  • input_name (str, optional) – The name of the dataset in the catalog.yml which the model’s user must provide for prediction (i.e. the data). Defaults to None.

  • kpm_kwargs

    extra arguments to be passed to KedroPipelineModel when the PipelineML object is automatically saved at the end of a run. This includes:

    • copy_mode: the copy_mode to be used for underlying dataset

    when loaded in memory - runner: the kedro runner to run the model with

  • log_model_kwargs – extra arguments to be passed to mlflow.pyfunc.log_model - “signature” accepts an extra “auto” which automatically infer the signature based on “input_name” dataset

filter(tags: Optional[Iterable[str]] = None, from_nodes: Optional[Iterable[str]] = None, to_nodes: Optional[Iterable[str]] = None, node_names: Optional[Iterable[str]] = None, from_inputs: Optional[Iterable[str]] = None, to_outputs: Optional[Iterable[str]] = None, node_namespace: Optional[str] = None) kedro.pipeline.pipeline.Pipeline

Creates a new Pipeline object with the nodes that meet all of the specified filtering conditions.

The new pipeline object is the intersection of pipelines that meet each filtering condition. This is distinct from chaining multiple filters together.

Parameters
  • tags – A list of node tags which should be used to lookup the nodes of the new Pipeline.

  • from_nodes – A list of node names which should be used as a starting point of the new Pipeline.

  • to_nodes – A list of node names which should be used as an end point of the new Pipeline.

  • node_names – A list of node names which should be selected for the new Pipeline.

  • from_inputs – A list of inputs which should be used as a starting point of the new Pipeline

  • to_outputs – A list of outputs which should be the final outputs of the new Pipeline.

  • node_namespace – One node namespace which should be used to select nodes in the new Pipeline.

Returns

A new Pipeline object with nodes that meet all of the specified

filtering conditions.

Raises

ValueError – The filtered Pipeline has no nodes.

Example:

>>> pipeline = Pipeline(
>>>     [
>>>         node(func, "A", "B", name="node1"),
>>>         node(func, "B", "C", name="node2"),
>>>         node(func, "C", "D", name="node3"),
>>>     ]
>>> )
>>> pipeline.filter(node_names=["node1", "node3"], from_inputs=["A"])
>>> # Gives a new pipeline object containing node1 and node3.
from_inputs(*inputs: str) kedro_mlflow.pipeline.pipeline_ml.PipelineML

Create a new Pipeline object with the nodes which depend directly or transitively on the provided inputs. If provided a name, but no format, for a transcoded input, it includes all the nodes that use inputs with that name, otherwise it matches to the fully-qualified name only (i.e. name@format).

Parameters

*inputs – A list of inputs which should be used as a starting point of the new Pipeline

Raises

ValueError – Raised when any of the given inputs do not exist in the Pipeline object.

Returns

A new Pipeline object, containing a subset of the

nodes of the current one such that only nodes depending directly or transitively on the provided inputs are being copied.

from_nodes(*node_names: str) kedro_mlflow.pipeline.pipeline_ml.PipelineML

Create a new Pipeline object with the nodes which depend directly or transitively on the provided nodes.

Parameters

*node_names – A list of node_names which should be used as a starting point of the new Pipeline.

Raises

ValueError – Raised when any of the given names do not exist in the Pipeline object.

Returns

A new Pipeline object, containing a subset of the nodes of

the current one such that only nodes depending directly or transitively on the provided nodes are being copied.

property inference: str
property input_name: str
only_nodes(*node_names: str) kedro.pipeline.pipeline.Pipeline

Create a new Pipeline which will contain only the specified nodes by name.

Parameters

*node_names – One or more node names. The returned Pipeline will only contain these nodes.

Raises

ValueError – When some invalid node name is given.

Returns

A new Pipeline, containing only nodes.

only_nodes_with_inputs(*inputs: str) kedro_mlflow.pipeline.pipeline_ml.PipelineML

Create a new Pipeline object with the nodes which depend directly on the provided inputs. If provided a name, but no format, for a transcoded input, it includes all the nodes that use inputs with that name, otherwise it matches to the fully-qualified name only (i.e. name@format).

Parameters

*inputs – A list of inputs which should be used as a starting point of the new Pipeline.

Raises

ValueError – Raised when any of the given inputs do not exist in the Pipeline object.

Returns

A new Pipeline object, containing a subset of the

nodes of the current one such that only nodes depending directly on the provided inputs are being copied.

only_nodes_with_namespace(node_namespace: str) kedro.pipeline.pipeline.Pipeline

Creates a new Pipeline containing only nodes with the specified namespace.

Parameters

node_namespace – One node namespace.

Raises

ValueError – When pipeline contains no nodes with the specified namespace.

Returns

A new Pipeline containing nodes with the specified namespace.

only_nodes_with_outputs(*outputs: str) kedro_mlflow.pipeline.pipeline_ml.PipelineML

Create a new Pipeline object with the nodes which are directly required to produce the provided outputs. If provided a name, but no format, for a transcoded dataset, it includes all the nodes that output to that name, otherwise it matches to the fully-qualified name only (i.e. name@format).

Parameters

*outputs – A list of outputs which should be the final outputs of the new Pipeline.

Raises

ValueError – Raised when any of the given outputs do not exist in the Pipeline object.

Returns

A new Pipeline object, containing a subset of the nodes of the current one such that only nodes which are directly required to produce the provided outputs are being copied.

only_nodes_with_tags(*tags: str) kedro_mlflow.pipeline.pipeline_ml.PipelineML

Creates a new Pipeline object with the nodes which contain any of the provided tags. The resulting Pipeline is empty if no tags are provided.

Parameters

*tags – A list of node tags which should be used to lookup the nodes of the new Pipeline.

Returns

A new Pipeline object, containing a subset of the

nodes of the current one such that only nodes containing any of the tags provided are being copied.

Return type

Pipeline

tag(tags: Union[str, Iterable[str]]) kedro_mlflow.pipeline.pipeline_ml.PipelineML

Tags all the nodes in the pipeline.

Parameters

tags – The tags to be added to the nodes.

Returns

New Pipeline object with nodes tagged.

to_nodes(*node_names: str) kedro_mlflow.pipeline.pipeline_ml.PipelineML

Create a new Pipeline object with the nodes required directly or transitively by the provided nodes.

Parameters

*node_names – A list of node_names which should be used as an end point of the new Pipeline.

Raises

ValueError – Raised when any of the given names do not exist in the Pipeline object.

Returns

A new Pipeline object, containing a subset of the nodes of the

current one such that only nodes required directly or transitively by the provided nodes are being copied.

to_outputs(*outputs: str) kedro_mlflow.pipeline.pipeline_ml.PipelineML

Create a new Pipeline object with the nodes which are directly or transitively required to produce the provided outputs. If provided a name, but no format, for a transcoded dataset, it includes all the nodes that output to that name, otherwise it matches to the fully-qualified name only (i.e. name@format).

Parameters

*outputs – A list of outputs which should be the final outputs of the new Pipeline.

Raises

ValueError – Raised when any of the given outputs do not exist in the Pipeline object.

Returns

A new Pipeline object, containing a subset of the nodes of the current one such that only nodes which are directly or transitively required to produce the provided outputs are being copied.

property training: kedro.pipeline.pipeline.Pipeline
kedro_mlflow.pipeline.pipeline_ml_factory.pipeline_ml_factory(training: kedro.pipeline.pipeline.Pipeline, inference: kedro.pipeline.pipeline.Pipeline, input_name: Optional[str] = None, kpm_kwargs=None, log_model_kwargs=None) kedro_mlflow.pipeline.pipeline_ml.PipelineML

This function is a helper to create PipelineML object directly from two Kedro Pipelines (one of training and one of inference) .

Parameters
  • training (Pipeline) – The Pipeline object that creates all mlflow artifacts for prediction (the model, but also encoders, binarizers, tokenizers…). These artifacts must be persisted in the catalog.yml.

  • inference (Pipeline) – A Pipeline object which will be stored in mlflow and use the output(s) of the training pipeline (namely, the model) to predict the outcome.

  • input_name (str, optional) – The name of the dataset in the catalog.yml which the model’s user must provide for prediction (i.e. the data). Defaults to None.

  • kpm_kwargs

    extra arguments to be passed to KedroPipelineModel when the PipelineML object is automatically saved at the end of a run. This includes:

    • copy_mode: the copy_mode to be used for underlying dataset

    when loaded in memory - runner: the kedro runner to run the model with

  • logging_kwargs – extra arguments to be passed to mlflow.pyfunc.log_model when the PipelineML object is automatically saved at the end of a run. See mlflow documentation to see all available options: https://www.mlflow.org/docs/latest/python_api/mlflow.pyfunc.html#mlflow.pyfunc.log_model

Returns

A PipelineML which is automatically

discovered by the MlflowPipelineHook and contains all the information for logging the inference pipeline as a Mlflow Model.

Return type

PipelineML