Pipelines

exception kedro_mlflow.pipeline.pipeline_ml.KedroMlflowPipelineMLDatasetsError

Bases: Exception

Error raised when the inputs of KedroPipelineMoel are invalid

exception kedro_mlflow.pipeline.pipeline_ml.KedroMlflowPipelineMLInputsError

Bases: Exception

Error raised when the inputs of KedroPipelineModel are invalid

exception kedro_mlflow.pipeline.pipeline_ml.KedroMlflowPipelineMLOutputsError

Bases: Exception

Error raised when the outputs of KedroPipelineModel are invalid

class kedro_mlflow.pipeline.pipeline_ml.PipelineML(nodes: Iterable[Union[kedro.pipeline.node.Node, kedro.pipeline.pipeline.Pipeline]], *args, tags: Optional[Union[str, Iterable[str]]] = None, inference: kedro.pipeline.pipeline.Pipeline, input_name: str, conda_env: Optional[Union[str, pathlib.Path, Dict[str, Any]]] = None, model_name: Optional[str] = 'model', model_signature: Optional[Union[mlflow.models.signature.ModelSignature, str]] = 'auto', **kwargs)

Bases: kedro.pipeline.pipeline.Pipeline

IMPORTANT NOTE : THIS CLASS IS NOT INTENDED TO BE USED DIRECTLY IN A KEDRO PROJECT. YOU SHOULD USE pipeline_ml_factory FUNCTION FOR MODULAR PIPELINE WHICH IS MORE FLEXIBLE AND USER FRIENDLY. SEE INSERT_DOC_URL

A PipelineML is a kedro Pipeline which we assume is a “training” (in the machine learning way) pipeline. Basically, “training” is a higher order function (it generates another function). It implies that: - the outputs of this pipeline are considered as “fitted models”, i.e. inputs of another inference pipeline (it is very likely that there are several outputs because we need to store any object that depends on the train data (e.g encoders, binarizers, vectorizer, machine learning models…) - These outputs will feed another “inference” pipeline (to be used for prediction purpose) whose inputs

are the outputs of the “training” pipeline, except for one of them (the new data to predict).

This class enables to “link” a training pipeline and an inference pipeline in order to package them in mlflow easily. The goal is to call the MLflowPipelineHook hook after a PipelineMl is called in order to trigger mlflow packaging.

__init__(nodes: Iterable[Union[kedro.pipeline.node.Node, kedro.pipeline.pipeline.Pipeline]], *args, tags: Optional[Union[str, Iterable[str]]] = None, inference: kedro.pipeline.pipeline.Pipeline, input_name: str, conda_env: Optional[Union[str, pathlib.Path, Dict[str, Any]]] = None, model_name: Optional[str] = 'model', model_signature: Optional[Union[mlflow.models.signature.ModelSignature, str]] = 'auto', **kwargs)

Store all necessary information for calling mlflow.log_model in the pipeline.

Parameters
  • nodes (Iterable[Union[Node, Pipeline]]) – The `node`s of the training pipeline.

  • tags (Union[str, Iterable[str]], optional) – Optional set of tags to be applied to all the pipeline nodes. Defaults to None.

  • inference (Pipeline) – A Pipeline object which will be stored in mlflow and use the output(s) of the training pipeline (namely, the model) to predict the outcome.

  • input_name (str, optional) – The name of the dataset in the catalog.yml which the model’s user must provide for prediction (i.e. the data). Defaults to None.

  • conda_env (Union[str, Path, Dict[str, Any]], optional) –

    The minimal conda environment necessary for the inference Pipeline. It can be either :

    • a path to a “requirements.txt”: In this case

      the packages are parsed and a conda env with your current python_version and these dependencies is returned.

    • a path to an “environment.yml”the file is

      uploaded “as is”.

    • a Dict : used as the environment

    • None: a base conda environment with your

      current python version and your project version at training time.

    Defaults to None.

  • model_name (Union[str, None], optional) – The name of the folder where the model will be stored in remote mlflow. Defaults to “model”.

  • model_signature (Union[ModelSignature, bool]) –

    The mlflow signature of the input dataframe common to training and inference.

    • If ‘auto’, it is infered automatically

    • If None, no signature is used

    • if a ModelSignature instance, passed

    to the underlying dataframe

  • kwargs

    extra arguments to be passed to KedroPipelineModel when the PipelineML object is automatically saved at the end of a run. This includes:

    • copy_mode: the copy_mode to be used for underlying dataset

    when loaded in memory - runner: the kedro runner to run the model with

decorate(*decorators: Callable) kedro_mlflow.pipeline.pipeline_ml.PipelineML

Create a new Pipeline by applying the provided decorators to all the nodes in the pipeline. If no decorators are passed, it will return a copy of the current Pipeline object.

Parameters

decorators – Decorators to be applied on all node functions in the pipeline, always applied from right to left.

Returns

A new Pipeline object with all nodes decorated with the provided decorators.

extract_pipeline_artifacts(catalog: kedro.io.data_catalog.DataCatalog, temp_folder: pathlib.Path)
from_inputs(*inputs: str) kedro_mlflow.pipeline.pipeline_ml.PipelineML

Create a new Pipeline object with the nodes which depend directly or transitively on the provided inputs. If provided a name, but no format, for a transcoded input, it includes all the nodes that use inputs with that name, otherwise it matches to the fully-qualified name only (i.e. name@format).

Parameters

inputs – A list of inputs which should be used as a starting point of the new Pipeline

Raises

ValueError – Raised when any of the given inputs do not exist in the Pipeline object.

Returns

A new Pipeline object, containing a subset of the

nodes of the current one such that only nodes depending directly or transitively on the provided inputs are being copied.

from_nodes(*node_names: str) kedro_mlflow.pipeline.pipeline_ml.PipelineML

Create a new Pipeline object with the nodes which depend directly or transitively on the provided nodes.

Parameters

node_names – A list of node_names which should be used as a starting point of the new Pipeline.

Raises

ValueError – Raised when any of the given names do not exist in the Pipeline object.

Returns

A new Pipeline object, containing a subset of the nodes of

the current one such that only nodes depending directly or transitively on the provided nodes are being copied.

property inference: str
property input_name: str
property model_signature: str
only_nodes_with_inputs(*inputs: str) kedro_mlflow.pipeline.pipeline_ml.PipelineML

Create a new Pipeline object with the nodes which depend directly on the provided inputs. If provided a name, but no format, for a transcoded input, it includes all the nodes that use inputs with that name, otherwise it matches to the fully-qualified name only (i.e. name@format).

Parameters

inputs – A list of inputs which should be used as a starting point of the new Pipeline.

Raises

ValueError – Raised when any of the given inputs do not exist in the Pipeline object.

Returns

A new Pipeline object, containing a subset of the

nodes of the current one such that only nodes depending directly on the provided inputs are being copied.

only_nodes_with_outputs(*outputs: str) kedro_mlflow.pipeline.pipeline_ml.PipelineML

Create a new Pipeline object with the nodes which are directly required to produce the provided outputs. If provided a name, but no format, for a transcoded dataset, it includes all the nodes that output to that name, otherwise it matches to the fully-qualified name only (i.e. name@format).

Parameters

outputs – A list of outputs which should be the final outputs of the new Pipeline.

Raises

ValueError – Raised when any of the given outputs do not exist in the Pipeline object.

Returns

A new Pipeline object, containing a subset of the nodes of the current one such that only nodes which are directly required to produce the provided outputs are being copied.

only_nodes_with_tags(*tags: str) kedro_mlflow.pipeline.pipeline_ml.PipelineML

Create a new Pipeline object with the nodes which contain any of the provided tags. The resulting Pipeline is empty if no tags are provided.

Parameters

tags – A list of node tags which should be used to lookup the nodes of the new Pipeline.

Returns

A new Pipeline object, containing a subset of the

nodes of the current one such that only nodes containing any of the tags provided are being copied.

Return type

Pipeline

tag(tags: Union[str, Iterable[str]]) kedro_mlflow.pipeline.pipeline_ml.PipelineML

Return a copy of the pipeline, with each node tagged accordingly. :param tags: The tags to be added to the nodes. :return: New Pipeline object.

to_nodes(*node_names: str) kedro_mlflow.pipeline.pipeline_ml.PipelineML

Create a new Pipeline object with the nodes required directly or transitively by the provided nodes.

Parameters

node_names – A list of node_names which should be used as an end point of the new Pipeline.

Raises

ValueError – Raised when any of the given names do not exist in the Pipeline object.

Returns

A new Pipeline object, containing a subset of the nodes of the

current one such that only nodes required directly or transitively by the provided nodes are being copied.

to_outputs(*outputs: str) kedro_mlflow.pipeline.pipeline_ml.PipelineML

Create a new Pipeline object with the nodes which are directly or transitively required to produce the provided outputs. If provided a name, but no format, for a transcoded dataset, it includes all the nodes that output to that name, otherwise it matches to the fully-qualified name only (i.e. name@format).

Parameters

outputs – A list of outputs which should be the final outputs of the new Pipeline.

Raises

ValueError – Raised when any of the given outputs do not exist in the Pipeline object.

Returns

A new Pipeline object, containing a subset of the nodes of the current one such that only nodes which are directly or transitively required to produce the provided outputs are being copied.

property training: kedro.pipeline.pipeline.Pipeline
kedro_mlflow.pipeline.pipeline_ml_factory.pipeline_ml_factory(training: kedro.pipeline.pipeline.Pipeline, inference: kedro.pipeline.pipeline.Pipeline, input_name: Optional[str] = None, conda_env: Optional[Union[str, pathlib.Path, Dict[str, Any]]] = None, model_name: Optional[str] = 'model', model_signature: Optional[Union[mlflow.models.signature.ModelSignature, str]] = 'auto', **kwargs) kedro_mlflow.pipeline.pipeline_ml.PipelineML

This function is a helper to create PipelineML object directly from two Kedro Pipelines (one of training and one of inference) .

Parameters
  • training (Pipeline) – The Pipeline object that creates all mlflow artifacts for prediction (the model, but also encoders, binarizers, tokenizers…). These artifacts must be persisted in the catalog.yml.

  • inference (Pipeline) – A Pipeline object which will be stored in mlflow and use the output(s) of the training pipeline (namely, the model) to predict the outcome.

  • input_name (str, optional) – The name of the dataset in the catalog.yml which the model’s user must provide for prediction (i.e. the data). Defaults to None.

  • conda_env (Union[str, Path, Dict[str, Any]], optional) –

    The minimal conda environment necessary for the inference Pipeline. It can be either :

    • a path to a “requirements.txt”: In this case

      the packages are parsed and a conda env with your current python_version and these dependencies is returned.

    • a path to an “environment.yml”the file is

      uploaded “as is”.

    • a Dict : used as the environment

    • None: a base conda environment with your

      current python version and your project version at training time.

    Defaults to None.

  • model_name (Union[str, None], optional) – The name of the folder where the model will be stored in remote mlflow. Defaults to “model”.

  • model_signature (Union[ModelSignature, bool]) –

    The mlflow signature of the input dataframe common to training and inference.

    • If ‘auto’, it is infered automatically

    • If None, no signature is used

    • if a ModelSignature instance, passed

    to the underlying dataframe

  • kwargs

    extra arguments to be passed to KedroPipelineModel when the PipelineML object is automatically saved at the end of a run. This includes:

    • copy_mode: the copy_mode to be used for underlying dataset

    when loaded in memory - runner: the kedro runner to run the model with

Returns

A PipelineML which is automatically

discovered by the MlflowPipelineHook and contains all the information for logging the inference pipeline as a Mlflow Model.

Return type

PipelineML