Skip to content

Orchestration

tfx.v1.orchestration

TFX orchestration module.

MODULE DESCRIPTION
experimental

TFX orchestration.experimental module.

metadata

Public API for metadata.

CLASS DESCRIPTION
LocalDagRunner

Local TFX DAG runner.

Classes

LocalDagRunner

LocalDagRunner()

Bases: IrBasedRunner

Local TFX DAG runner.

Initializes LocalDagRunner as a TFX orchestrator.

METHOD DESCRIPTION
run

See TfxRunner.

run_with_ir

Runs given pipeline locally.

Source code in tfx/orchestration/local/local_dag_runner.py
def __init__(self):
  """Initializes LocalDagRunner as a TFX orchestrator."""
  pass
Functions
run
run(pipeline: Pipeline, run_options: Optional[RunOptions] = None, **kwargs: Any) -> Optional[Any]

See TfxRunner.

Source code in tfx/orchestration/portable/tfx_runner.py
def run(
    self,
    pipeline: pipeline_py.Pipeline,
    run_options: Optional[pipeline_py.RunOptions] = None,
    **kwargs: Any,
) -> Optional[Any]:
  """See TfxRunner."""
  pipeline_pb = _make_pipeline_proto(pipeline)
  if run_options:
    run_options_pb = _run_opts_to_proto(run_options)
  else:
    run_options_pb = None
  return self.run_with_ir(pipeline_pb, run_options=run_options_pb, **kwargs)
run_with_ir
run_with_ir(pipeline: Pipeline, run_options: Optional[RunOptions] = None) -> None

Runs given pipeline locally.

PARAMETER DESCRIPTION
pipeline

Pipeline IR containing pipeline args and components.

TYPE: Pipeline

run_options

Optional args for the run.

TYPE: Optional[RunOptions] DEFAULT: None

RAISES DESCRIPTION
ValueError

If run_options is provided, and partial_run_options.from_nodes and partial_run_options.to_nodes are both empty.

Source code in tfx/orchestration/local/local_dag_runner.py
@doc_controls.do_not_generate_docs
def run_with_ir(
    self,
    pipeline: pipeline_pb2.Pipeline,
    run_options: Optional[pipeline_pb2.RunOptions] = None,
) -> None:
  """Runs given pipeline locally.

  Args:
    pipeline: Pipeline IR containing pipeline args and components.
    run_options: Optional args for the run.

  Raises:
    ValueError: If run_options is provided, and partial_run_options.from_nodes
      and partial_run_options.to_nodes are both empty.
  """
  # Substitute the runtime parameter to be a concrete run_id
  runtime_parameter_utils.substitute_runtime_parameter(
      pipeline, {
          constants.PIPELINE_RUN_ID_PARAMETER_NAME:
              datetime.datetime.now().isoformat(),
      })

  deployment_config = runner_utils.extract_local_deployment_config(pipeline)
  connection_config = getattr(
      deployment_config.metadata_connection_config,
      deployment_config.metadata_connection_config.WhichOneof(
          'connection_config'))

  logging.info('Using deployment config:\n %s', deployment_config)
  logging.info('Using connection config:\n %s', connection_config)

  if run_options:
    logging.info('Using run_options:\n %s', run_options)
    pr_opts = run_options.partial_run
    partial_run_utils.mark_pipeline(
        pipeline,
        from_nodes=pr_opts.from_nodes or None,
        to_nodes=pr_opts.to_nodes or None,
        snapshot_settings=pr_opts.snapshot_settings)

  with telemetry_utils.scoped_labels(
      {telemetry_utils.LABEL_TFX_RUNNER: 'local'}):
    # Run each component. Note that the pipeline.components list is in
    # topological order.
    #
    # TODO(b/171319478): After IR-based execution is used, used multi-threaded
    # execution so that independent components can be run in parallel.
    for node in pipeline.nodes:
      pipeline_node = node.pipeline_node
      node_id = pipeline_node.node_info.id
      if pipeline_node.execution_options.HasField('skip'):
        logging.info('Skipping component %s.', node_id)
        continue
      executor_spec = runner_utils.extract_executor_spec(
          deployment_config, node_id)
      custom_driver_spec = runner_utils.extract_custom_driver_spec(
          deployment_config, node_id)

      component_launcher = launcher.Launcher(
          pipeline_node=pipeline_node,
          mlmd_connection=metadata.Metadata(connection_config),
          pipeline_info=pipeline.pipeline_info,
          pipeline_runtime_spec=pipeline.runtime_spec,
          executor_spec=executor_spec,
          custom_driver_spec=custom_driver_spec)
      logging.info('Component %s is running.', node_id)
      if pipeline_node.execution_options.run.perform_snapshot:
        with metadata.Metadata(connection_config) as mlmd_handle:
          partial_run_utils.snapshot(mlmd_handle, pipeline)
      component_launcher.launch()
      logging.info('Component %s is finished.', node_id)

Modules

experimental

TFX orchestration.experimental module.

CLASS DESCRIPTION
FinalStatusStr

FinalStatusStr: is the type for parameter receiving PipelineTaskFinalStatus.

KubeflowV2DagRunner

Kubeflow V2 pipeline runner (currently for managed pipelines).

KubeflowV2DagRunnerConfig

Runtime configuration specific to execution on Kubeflow V2 pipelines.

FUNCTION DESCRIPTION
exit_handler

Creates an exit handler from a typehint-annotated Python function.

Classes
FinalStatusStr

Bases: str

FinalStatusStr: is the type for parameter receiving PipelineTaskFinalStatus.

Vertex AI backend passes in jsonlized string of kfp.pipeline_spec.pipeline_spec_pb2.PipelineTaskFinalStatus.

Example

This is example usage of FinalStatusStr definition:

exit_handler = exit_handler_component(
    final_status=tfx.dsl.experimental.FinalStatusStr()
)
KubeflowV2DagRunner
KubeflowV2DagRunner(config: KubeflowV2DagRunnerConfig, output_dir: Optional[str] = None, output_filename: Optional[str] = None)

Bases: TfxRunner

Kubeflow V2 pipeline runner (currently for managed pipelines).

Builds a pipeline job spec in json format based on TFX pipeline DSL object.

Constructs an KubeflowV2DagRunner for compiling pipelines.

PARAMETER DESCRIPTION
config

An KubeflowV2DagRunnerConfig object to specify runtime configuration when running the pipeline in Kubeflow.

TYPE: KubeflowV2DagRunnerConfig

output_dir

An optional output directory into which to output the pipeline definition files. Defaults to the current working directory.

TYPE: Optional[str] DEFAULT: None

output_filename

An optional output file name for the pipeline definition file. The file output format will be a JSON-serialized or YAML-serialized PipelineJob pb message. Defaults to 'pipeline.json'.

TYPE: Optional[str] DEFAULT: None

METHOD DESCRIPTION
run

Compiles a pipeline DSL object into pipeline file.

set_exit_handler

Set exit handler components for the Kubeflow V2(Vertex AI) dag runner.

ATTRIBUTE DESCRIPTION
config

TYPE: PipelineConfig

Source code in tfx/orchestration/kubeflow/v2/kubeflow_v2_dag_runner.py
def __init__(
    self,
    config: KubeflowV2DagRunnerConfig,
    output_dir: Optional[str] = None,
    output_filename: Optional[str] = None,
):
  """Constructs an KubeflowV2DagRunner for compiling pipelines.

  Args:
    config: An KubeflowV2DagRunnerConfig object to specify runtime
      configuration when running the pipeline in Kubeflow.
    output_dir: An optional output directory into which to output the pipeline
      definition files. Defaults to the current working directory.
    output_filename: An optional output file name for the pipeline definition
      file. The file output format will be a JSON-serialized or
      YAML-serialized PipelineJob pb message. Defaults to 'pipeline.json'.
  """
  if not isinstance(config, KubeflowV2DagRunnerConfig):
    raise TypeError('config must be type of KubeflowV2DagRunnerConfig.')
  super().__init__()
  self._config = config
  self._output_dir = output_dir or os.getcwd()
  self._output_filename = output_filename or 'pipeline.json'
  self._exit_handler = None
Attributes
config property
config: PipelineConfig
Functions
run
run(pipeline: Pipeline, parameter_values: Optional[Dict[str, Any]] = None, write_out: Optional[bool] = True) -> Dict[str, Any]

Compiles a pipeline DSL object into pipeline file.

PARAMETER DESCRIPTION
pipeline

TFX pipeline object.

TYPE: Pipeline

parameter_values

mapping from runtime parameter names to its values.

TYPE: Optional[Dict[str, Any]] DEFAULT: None

write_out

set to True to actually write out the file to the place designated by output_dir and output_filename. Otherwise return the JSON-serialized pipeline job spec.

TYPE: Optional[bool] DEFAULT: True

RETURNS DESCRIPTION
Dict[str, Any]

Returns the JSON/YAML pipeline job spec.

RAISES DESCRIPTION
RuntimeError

if trying to write out to a place occupied by an existing

Source code in tfx/orchestration/kubeflow/v2/kubeflow_v2_dag_runner.py
def run(
    self,
    pipeline: tfx_pipeline.Pipeline,
    parameter_values: Optional[Dict[str, Any]] = None,
    write_out: Optional[bool] = True,
) -> Dict[str, Any]:
  """Compiles a pipeline DSL object into pipeline file.

  Args:
    pipeline: TFX pipeline object.
    parameter_values: mapping from runtime parameter names to its values.
    write_out: set to True to actually write out the file to the place
      designated by output_dir and output_filename. Otherwise return the
      JSON-serialized pipeline job spec.

  Returns:
    Returns the JSON/YAML pipeline job spec.

  Raises:
    RuntimeError: if trying to write out to a place occupied by an existing
    file.
  """
  for component in pipeline.components:
    # TODO(b/187122662): Pass through pip dependencies as a first-class
    # component flag.
    if isinstance(component, base_component.BaseComponent):
      component._resolve_pip_dependencies(  # pylint: disable=protected-access
          pipeline.pipeline_info.pipeline_root
      )

  # TODO(b/166343606): Support user-provided labels.
  # TODO(b/169095387): Deprecate .run() method in favor of the unified API
  # client.
  display_name = (
      self._config.display_name or pipeline.pipeline_info.pipeline_name
  )
  pipeline_spec = pipeline_builder.PipelineBuilder(
      tfx_pipeline=pipeline,
      default_image=self._config.default_image,
      default_commands=self._config.default_commands,
      exit_handler=self._exit_handler,
      use_pipeline_spec_2_1=self._config.use_pipeline_spec_2_1,
  ).build()
  pipeline_spec.sdk_version = 'tfx-{}'.format(version.__version__)
  if self._config.use_pipeline_spec_2_1:
    pipeline_spec.schema_version = _SCHEMA_VERSION_2_1
  else:
    pipeline_spec.schema_version = _SCHEMA_VERSION_2_0
  runtime_config = pipeline_builder.RuntimeConfigBuilder(
      pipeline_info=pipeline.pipeline_info,
      parameter_values=parameter_values,
      use_pipeline_spec_2_1=self._config.use_pipeline_spec_2_1,
  ).build()
  with telemetry_utils.scoped_labels(
      {telemetry_utils.LABEL_TFX_RUNNER: 'kubeflow_v2'}
  ):
    result = pipeline_spec_pb2.PipelineJob(
        display_name=display_name or pipeline.pipeline_info.pipeline_name,
        labels=telemetry_utils.make_labels_dict(),
        runtime_config=runtime_config,
    )
  result.pipeline_spec.update(json_format.MessageToDict(pipeline_spec))
  pipeline_json_dict = json_format.MessageToDict(result)
  if write_out:
    if fileio.exists(self._output_dir) and not fileio.isdir(self._output_dir):
      raise RuntimeError(
          'Output path: %s is pointed to a file.' % self._output_dir
      )
    if not fileio.exists(self._output_dir):
      fileio.makedirs(self._output_dir)

    _write_pipeline_spec_to_file(
        pipeline_json_dict,
        'This is converted from TFX pipeline from tfx-{}.'.format(
            version.__version__
        ),
        os.path.join(self._output_dir, self._output_filename),
    )

  return pipeline_json_dict
set_exit_handler
set_exit_handler(exit_handler: BaseNode)

Set exit handler components for the Kubeflow V2(Vertex AI) dag runner.

This feature is currently experimental without backward compatibility gaurantee.

PARAMETER DESCRIPTION
exit_handler

exit handler component.

TYPE: BaseNode

Source code in tfx/orchestration/kubeflow/v2/kubeflow_v2_dag_runner.py
def set_exit_handler(self, exit_handler: base_node.BaseNode):
  """Set exit handler components for the Kubeflow V2(Vertex AI) dag runner.

  This feature is currently experimental without backward compatibility
  gaurantee.

  Args:
    exit_handler: exit handler component.
  """
  if not exit_handler:
    logging.error('Setting empty exit handler is not allowed.')
    return
  self._exit_handler = exit_handler
KubeflowV2DagRunnerConfig
KubeflowV2DagRunnerConfig(display_name: Optional[str] = None, default_image: Optional[Union[str, MutableMapping[str, str]]] = None, default_commands: Optional[List[str]] = None, use_pipeline_spec_2_1: bool = False, **kwargs)

Bases: PipelineConfig

Runtime configuration specific to execution on Kubeflow V2 pipelines.

Constructs a Kubeflow V2 runner config.

PARAMETER DESCRIPTION
display_name

Optional human-readable pipeline name. Defaults to the pipeline name passed into KubeflowV2DagRunner.run().

TYPE: Optional[str] DEFAULT: None

default_image

The default TFX image to be used if not overriden by per component specification. It can be a map whose key is a component id and value is an image path to set the image by a component level.

TYPE: Optional[Union[str, MutableMapping[str, str]]] DEFAULT: None

default_commands

Optionally specifies the commands of the provided container image. When not provided, the default ENTRYPOINT specified in the docker image is used. Note: the commands here refers to the K8S container command, which maps to Docker entrypoint field. If one supplies command but no args are provided for the container, the container will be invoked with the provided command, ignoring the ENTRYPOINT and CMD defined in the Dockerfile. One can find more details regarding the difference between K8S and Docker conventions at https://kubernetes.io/docs/tasks/inject-data-application/define-command-argument-container/#notes

TYPE: Optional[List[str]] DEFAULT: None

use_pipeline_spec_2_1

Use the KFP pipeline spec schema 2.1 to support Vertex ML pipeline teamplate gallary.

TYPE: bool DEFAULT: False

**kwargs

Additional args passed to base PipelineConfig.

DEFAULT: {}

ATTRIBUTE DESCRIPTION
component_config_overrides

default_commands

default_component_configs

default_image

display_name

supported_launcher_classes

use_pipeline_spec_2_1

Source code in tfx/orchestration/kubeflow/v2/kubeflow_v2_dag_runner.py
def __init__(
    self,
    display_name: Optional[str] = None,
    default_image: Optional[Union[str, MutableMapping[str, str]]] = None,
    default_commands: Optional[List[str]] = None,
    use_pipeline_spec_2_1: bool = False,
    **kwargs,
):
  """Constructs a Kubeflow V2 runner config.

  Args:
    display_name: Optional human-readable pipeline name. Defaults to the
      pipeline name passed into `KubeflowV2DagRunner.run()`.
    default_image: The default TFX image to be used if not overriden by per
      component specification. It can be a map whose key is a component id and
      value is an image path to set the image by a component level.
    default_commands: Optionally specifies the commands of the provided
      container image. When not provided, the default `ENTRYPOINT` specified
      in the docker image is used. Note: the commands here refers to the K8S
      container command, which maps to Docker entrypoint field. If one
      supplies command but no args are provided for the container, the
      container will be invoked with the provided command, ignoring the
      `ENTRYPOINT` and `CMD` defined in the Dockerfile. One can find more
      details regarding the difference between K8S and Docker conventions at
      https://kubernetes.io/docs/tasks/inject-data-application/define-command-argument-container/#notes
    use_pipeline_spec_2_1: Use the KFP pipeline spec schema 2.1 to support
      Vertex ML pipeline teamplate gallary.
    **kwargs: Additional args passed to base PipelineConfig.
  """
  super().__init__(**kwargs)
  self.display_name = display_name
  self.default_image = default_image or _KUBEFLOW_TFX_IMAGE
  if (
      isinstance(self.default_image, MutableMapping)
      and self.default_image.get(_DEFAULT_IMAGE_PATH_KEY) is None
  ):
    self.default_image[_DEFAULT_IMAGE_PATH_KEY] = _KUBEFLOW_TFX_IMAGE
  if default_commands is None:
    self.default_commands = KUBEFLOW_TFX_CMD
  else:
    self.default_commands = default_commands
  self.use_pipeline_spec_2_1 = use_pipeline_spec_2_1
Attributes
component_config_overrides instance-attribute
component_config_overrides = component_config_overrides or {}
default_commands instance-attribute
default_commands = KUBEFLOW_TFX_CMD
default_component_configs instance-attribute
default_component_configs = default_component_configs or []
default_image instance-attribute
default_image = default_image or _KUBEFLOW_TFX_IMAGE
display_name instance-attribute
display_name = display_name
supported_launcher_classes instance-attribute
supported_launcher_classes = supported_launcher_classes or [InProcessComponentLauncher]
use_pipeline_spec_2_1 instance-attribute
use_pipeline_spec_2_1 = use_pipeline_spec_2_1
Functions
Functions
exit_handler
exit_handler(func: FunctionType) -> Callable[..., Any]

Creates an exit handler from a typehint-annotated Python function.

This decorator creates an exit handler wrapping the component typehint annotation - typehint annotations specified for the arguments and return value for a Python function. Exit handler is to annotate the component for post actions of a pipeline, only supported in Vertex AI. Specifically, function arguments can be annotated with the following types and associated semantics supported in component. In order to get in the final status of dependent pipeline, parameter should be defined as Parameter[str], passing in FinalStatusStr type when initializing the component.

Example

This is example usage of component definition using this decorator:

from tfx import v1 as tfx


@tfx.orchestration.experimental.exit_handler
def MyExitHandlerComponent(final_status: tfx.dsl.components.Parameter[str]):
    # parse the final status
    pipeline_task_status = pipeline_pb2.PipelineTaskFinalStatus()
    proto_utils.json_to_proto(final_status, pipeline_task_status)
    print(pipeline_task_status)

Example

Example usage in a Vertex AI graph definition:

exit_handler = exit_handler_component(
    final_status=tfx.dsl.experimental.FinalStatusStr()
)

dsl_pipeline = tfx.dsl.Pipeline(...)

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(...)
runner.set_exit_handler([exit_handler])
runner.run(pipeline=dsl_pipeline)

Experimental: no backwards compatibility guarantees.

PARAMETER DESCRIPTION
func

Typehint-annotated component executor function.

TYPE: FunctionType

RETURNS DESCRIPTION
Callable[..., Any]

base_component.BaseComponent subclass for the given component executor

Callable[..., Any]

function.

Source code in tfx/orchestration/kubeflow/decorators.py
def exit_handler(func: types.FunctionType) -> Callable[..., Any]:
  """Creates an exit handler from a typehint-annotated Python function.

  This decorator creates an exit handler wrapping the component typehint
  annotation - typehint annotations specified for the arguments and return value
  for a Python function.
  Exit handler is to annotate the component for post actions of a pipeline,
  only supported in Vertex AI. Specifically, function
  arguments can be annotated with the following types and associated semantics
  supported in component. In order to get in the final status of dependent
  pipeline, parameter should be defined as Parameter[str], passing in
  FinalStatusStr type when initializing the component.

  !!! example
      This is example usage of component definition using this decorator:
      ``` python
      from tfx import v1 as tfx


      @tfx.orchestration.experimental.exit_handler
      def MyExitHandlerComponent(final_status: tfx.dsl.components.Parameter[str]):
          # parse the final status
          pipeline_task_status = pipeline_pb2.PipelineTaskFinalStatus()
          proto_utils.json_to_proto(final_status, pipeline_task_status)
          print(pipeline_task_status)
      ```

  !!! example
      Example usage in a Vertex AI graph definition:
      ```python
      exit_handler = exit_handler_component(
          final_status=tfx.dsl.experimental.FinalStatusStr()
      )

      dsl_pipeline = tfx.dsl.Pipeline(...)

      runner = tfx.orchestration.experimental.KubeflowV2DagRunner(...)
      runner.set_exit_handler([exit_handler])
      runner.run(pipeline=dsl_pipeline)
      ```
  Experimental: no backwards compatibility guarantees.

  Args:
    func: Typehint-annotated component executor function.

  Returns:
    [`base_component.BaseComponent`][tfx.v1.types.BaseComponent] subclass for the given component executor
    function.
  """
  return component(func)

metadata

Public API for metadata.

FUNCTION DESCRIPTION
mysql_metadata_connection_config

Convenience function to create mysql-based metadata connection config.

sqlite_metadata_connection_config

Convenience function to create file based metadata connection config.

ATTRIBUTE DESCRIPTION
ConnectionConfigType

Attributes
ConnectionConfigType module-attribute
ConnectionConfigType = Union[ConnectionConfig, MetadataStoreClientConfig]
Functions
mysql_metadata_connection_config
mysql_metadata_connection_config(host: str, port: int, database: str, username: str, password: str) -> ConnectionConfig

Convenience function to create mysql-based metadata connection config.

PARAMETER DESCRIPTION
host

The name or network address of the instance of MySQL to connect to.

TYPE: str

port

The port MySQL is using to listen for connections.

TYPE: int

database

The name of the database to use.

TYPE: str

username

The MySQL login account being used.

TYPE: str

password

The password for the MySQL account being used.

TYPE: str

RETURNS DESCRIPTION
ConnectionConfig

A metadata_store_pb2.ConnectionConfig based on given metadata db uri.

Source code in tfx/orchestration/metadata.py
def mysql_metadata_connection_config(
    host: str, port: int, database: str, username: str,
    password: str) -> metadata_store_pb2.ConnectionConfig:
  """Convenience function to create mysql-based metadata connection config.

  Args:
    host: The name or network address of the instance of MySQL to connect to.
    port: The port MySQL is using to listen for connections.
    database: The name of the database to use.
    username: The MySQL login account being used.
    password: The password for the MySQL account being used.

  Returns:
    A metadata_store_pb2.ConnectionConfig based on given metadata db uri.
  """
  return metadata_store_pb2.ConnectionConfig(
      mysql=metadata_store_pb2.MySQLDatabaseConfig(
          host=host,
          port=port,
          database=database,
          user=username,
          password=password))
sqlite_metadata_connection_config
sqlite_metadata_connection_config(metadata_db_uri: str) -> ConnectionConfig

Convenience function to create file based metadata connection config.

PARAMETER DESCRIPTION
metadata_db_uri

uri to metadata db.

TYPE: str

RETURNS DESCRIPTION
ConnectionConfig

A metadata_store_pb2.ConnectionConfig based on given metadata db uri.

Source code in tfx/orchestration/metadata.py
def sqlite_metadata_connection_config(
    metadata_db_uri: str) -> metadata_store_pb2.ConnectionConfig:
  """Convenience function to create file based metadata connection config.

  Args:
    metadata_db_uri: uri to metadata db.

  Returns:
    A metadata_store_pb2.ConnectionConfig based on given metadata db uri.
  """
  fileio.makedirs(os.path.dirname(metadata_db_uri))
  connection_config = metadata_store_pb2.ConnectionConfig()
  connection_config.sqlite.filename_uri = metadata_db_uri
  connection_config.sqlite.connection_mode = (
      metadata_store_pb2.SqliteMetadataSourceConfig.READWRITE_OPENCREATE)
  return connection_config