Skip to content

Extension

tfx.v1.extensions

TFX extensions module.

MODULE DESCRIPTION
google_cloud_ai_platform

Google cloud AI platform module.

google_cloud_big_query

Google Cloud Big Query module.

Modules

google_cloud_ai_platform

Google cloud AI platform module.

MODULE DESCRIPTION
experimental

Types used in Google Cloud AI Platform under experimental stage.

CLASS DESCRIPTION
BulkInferrer

A Cloud AI component to do batch inference on a remote hosted model.

Pusher

Component for pushing model to Cloud AI Platform serving.

Trainer

Cloud AI Platform Trainer component.

Tuner

TFX component for model hyperparameter tuning on AI Platform Training.

ATTRIBUTE DESCRIPTION
ENABLE_UCAIP_KEY

ENABLE_VERTEX_KEY

JOB_ID_KEY

LABELS_KEY

SERVING_ARGS_KEY

TRAINING_ARGS_KEY

UCAIP_REGION_KEY

VERTEX_CONTAINER_IMAGE_URI_KEY

VERTEX_REGION_KEY

Attributes
ENABLE_UCAIP_KEY module-attribute
ENABLE_UCAIP_KEY = documented(obj='ai_platform_training_enable_ucaip', doc='Deprecated. Please use ENABLE_VERTEX_KEY instead. Keys to the items in custom_config of Trainer for enabling uCAIP Training. ')
ENABLE_VERTEX_KEY module-attribute
ENABLE_VERTEX_KEY = documented(obj='ai_platform_enable_vertex', doc='Keys to the items in custom_config of Trainer and Pusher for enabling Vertex AI.')
JOB_ID_KEY module-attribute
JOB_ID_KEY = documented(obj='ai_platform_training_job_id', doc='Keys to the items in custom_config of Trainer for specifying job id.')
LABELS_KEY module-attribute
LABELS_KEY = documented(obj='ai_platform_training_labels', doc='Keys to the items in custom_config of Trainer for specifying labels for training jobs on the AI Platform only. Not applicable for Vertex AI, where labels are specified in the CustomJob as defined in: https://cloud.google.com/vertex-ai/docs/reference/rest/v1/projects.locations.customJobs#CustomJob.')
SERVING_ARGS_KEY module-attribute
SERVING_ARGS_KEY = documented(obj='ai_platform_serving_args', doc='Keys to the items in custom_config of Pusher/BulkInferrer for passing serving args to AI Platform.')
TRAINING_ARGS_KEY module-attribute
TRAINING_ARGS_KEY = documented(obj='ai_platform_training_args', doc='Keys to the items in custom_config of Trainer for passing training_job to AI Platform, and the GCP project under which the training job will be executed. In Vertex AI, this corresponds to a CustomJob as defined in:https://cloud.google.com/vertex-ai/docs/reference/rest/v1/projects.locations.customJobs#CustomJob.In CAIP, this corresponds to TrainingInputs as defined in:https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs#TrainingInput')
UCAIP_REGION_KEY module-attribute
UCAIP_REGION_KEY = documented(obj='ai_platform_training_ucaip_region', doc='Deprecated. Please use VERTEX_REGION_KEY instead. Keys to the items in custom_config of Trainer for specifying the region of uCAIP.')
VERTEX_CONTAINER_IMAGE_URI_KEY module-attribute
VERTEX_CONTAINER_IMAGE_URI_KEY = documented(obj='ai_platform_vertex_container_image_uri', doc='Keys to the items in custom_config of Pusher/BulkInferrer for the serving container image URI in Vertex AI.')
VERTEX_REGION_KEY module-attribute
VERTEX_REGION_KEY = documented(obj='ai_platform_vertex_region', doc='Keys to the items in custom_config of Trainer and Pusher for specifying the region of Vertex AI.')
Classes
BulkInferrer
BulkInferrer(examples: Channel, model: Optional[Channel] = None, model_blessing: Optional[Channel] = None, data_spec: Optional[Union[DataSpec, RuntimeParameter]] = None, output_example_spec: Optional[Union[OutputExampleSpec, RuntimeParameter]] = None, custom_config: Optional[Dict[str, Any]] = None)

Bases: BaseComponent

A Cloud AI component to do batch inference on a remote hosted model.

BulkInferrer component will push a model to Google Cloud AI Platform, consume examples data, send request to the remote hosted model, and produces the inference results to an external location as PredictionLog proto. After inference, it will delete the model from Google Cloud AI Platform.

TODO(b/155325467): Creates a end-to-end test for this component.

Component outputs contains:

Construct an BulkInferrer component.

PARAMETER DESCRIPTION
examples

A Channel of type standard_artifacts.Examples, usually produced by an ExampleGen component. required

TYPE: Channel

model

A Channel of type standard_artifacts.Model, usually produced by a Trainer component.

TYPE: Optional[Channel] DEFAULT: None

model_blessing

A Channel of type standard_artifacts.ModelBlessing, usually produced by a ModelValidator component.

TYPE: Optional[Channel] DEFAULT: None

data_spec

bulk_inferrer_pb2.DataSpec instance that describes data selection.

TYPE: Optional[Union[DataSpec, RuntimeParameter]] DEFAULT: None

output_example_spec

bulk_inferrer_pb2.OutputExampleSpec instance, specify if you want BulkInferrer to output examples instead of inference result.

TYPE: Optional[Union[OutputExampleSpec, RuntimeParameter]] DEFAULT: None

custom_config

A dict which contains the deployment job parameters to be passed to Google Cloud AI Platform. custom_config.ai_platform_serving_args need to contain the serving job parameters. For the full set of parameters, refer to https://cloud.google.com/ml-engine/reference/rest/v1/projects.models

TYPE: Optional[Dict[str, Any]] DEFAULT: None

RAISES DESCRIPTION
ValueError

Must not specify inference_result or output_examples depends on whether output_example_spec is set or not.

ATTRIBUTE DESCRIPTION
EXECUTOR_SPEC

SPEC_CLASS

Source code in tfx/extensions/google_cloud_ai_platform/bulk_inferrer/component.py
def __init__(
    self,
    examples: types.Channel,
    model: Optional[types.Channel] = None,
    model_blessing: Optional[types.Channel] = None,
    data_spec: Optional[Union[bulk_inferrer_pb2.DataSpec,
                              data_types.RuntimeParameter]] = None,
    output_example_spec: Optional[Union[bulk_inferrer_pb2.OutputExampleSpec,
                                        data_types.RuntimeParameter]] = None,
    custom_config: Optional[Dict[str, Any]] = None):
  """Construct an BulkInferrer component.

  Args:
    examples: A Channel of type [`standard_artifacts.Examples`][tfx.v1.types.standard_artifacts.Examples], usually
      produced by an ExampleGen component. _required_
    model: A Channel of type [`standard_artifacts.Model`][tfx.v1.types.standard_artifacts.Model], usually produced by
      a Trainer component.
    model_blessing: A Channel of type [`standard_artifacts.ModelBlessing`][tfx.v1.types.standard_artifacts.ModelBlessing],
      usually produced by a ModelValidator component.
    data_spec: bulk_inferrer_pb2.DataSpec instance that describes data
      selection.
    output_example_spec: bulk_inferrer_pb2.OutputExampleSpec instance, specify
      if you want BulkInferrer to output examples instead of inference result.
    custom_config: A dict which contains the deployment job parameters to be
      passed to Google Cloud AI Platform.
      custom_config.ai_platform_serving_args need to contain the serving job
      parameters. For the full set of parameters, refer to
      [https://cloud.google.com/ml-engine/reference/rest/v1/projects.models](https://cloud.google.com/ml-engine/reference/rest/v1/projects.models)

  Raises:
    ValueError: Must not specify inference_result or output_examples depends
      on whether output_example_spec is set or not.
  """
  if output_example_spec:
    output_examples = types.Channel(type=standard_artifacts.Examples)
    inference_result = None
  else:
    inference_result = types.Channel(type=standard_artifacts.InferenceResult)
    output_examples = None

  spec = CloudAIBulkInferrerComponentSpec(
      examples=examples,
      model=model,
      model_blessing=model_blessing,
      data_spec=data_spec or bulk_inferrer_pb2.DataSpec(),
      output_example_spec=output_example_spec,
      custom_config=json_utils.dumps(custom_config),
      inference_result=inference_result,
      output_examples=output_examples)
  super().__init__(spec=spec)
Attributes
EXECUTOR_SPEC class-attribute instance-attribute
EXECUTOR_SPEC = ExecutorClassSpec(Executor)
SPEC_CLASS class-attribute instance-attribute
SPEC_CLASS = CloudAIBulkInferrerComponentSpec
Functions
Pusher
Pusher(model: Optional[Channel] = None, model_blessing: Optional[Channel] = None, infra_blessing: Optional[Channel] = None, custom_config: Optional[Dict[str, Any]] = None)

Bases: Pusher

Component for pushing model to Cloud AI Platform serving.

Construct a Pusher component.

PARAMETER DESCRIPTION
model

An optional Channel of type standard_artifacts.Model, usually produced by a Trainer component, representing the model used for training.

TYPE: Optional[Channel] DEFAULT: None

model_blessing

An optional Channel of type standard_artifacts.ModelBlessing, usually produced from an Evaluator component, containing the blessing model.

TYPE: Optional[Channel] DEFAULT: None

infra_blessing

An optional Channel of type standard_artifacts.InfraBlessing, usually produced from an InfraValidator component, containing the validation result.

TYPE: Optional[Channel] DEFAULT: None

custom_config

A dict which contains the deployment job parameters to be passed to Cloud platforms.

TYPE: Optional[Dict[str, Any]] DEFAULT: None

METHOD DESCRIPTION
add_downstream_node

Experimental: Add another component that must run after this one.

add_downstream_nodes

Experimental: Add another component that must run after this one.

add_upstream_node

Experimental: Add another component that must run before this one.

add_upstream_nodes

Experimental: Add components that must run before this one.

from_json_dict

Convert from dictionary data to an object.

get_class_type
remove_downstream_node
remove_upstream_node
to_json_dict

Convert from an object to a JSON serializable dictionary.

with_id
with_node_execution_options
with_platform_config

Attaches a proto-form platform config to a component.

ATTRIBUTE DESCRIPTION
DRIVER_CLASS

EXECUTOR_SPEC

POST_EXECUTABLE_SPEC

PRE_EXECUTABLE_SPEC

SPEC_CLASS

component_id

TYPE: str

component_type

TYPE: str

downstream_nodes

driver_class

exec_properties

TYPE: Dict[str, Any]

executor_spec

id

Node id, unique across all TFX nodes in a pipeline.

TYPE: str

inputs

TYPE: Dict[str, Channel]

node_execution_options

TYPE: Optional[NodeExecutionOptions]

outputs

Component's output channel dict.

TYPE: Dict[str, OutputChannel]

platform_config

spec

type

TYPE: str

type_annotation

TYPE: Optional[Type[SystemExecution]]

upstream_nodes

Source code in tfx/extensions/google_cloud_ai_platform/pusher/component.py
def __init__(self,
             model: Optional[types.Channel] = None,
             model_blessing: Optional[types.Channel] = None,
             infra_blessing: Optional[types.Channel] = None,
             custom_config: Optional[Dict[str, Any]] = None):
  """Construct a Pusher component.

  Args:
    model: An optional Channel of type [`standard_artifacts.Model`][tfx.v1.types.standard_artifacts.Model], usually
      produced by a [Trainer][tfx.v1.components.Trainer] component, representing the model used for
      training.
    model_blessing: An optional Channel of type
      [`standard_artifacts.ModelBlessing`][tfx.v1.types.standard_artifacts.ModelBlessing], usually produced from an [Evaluator][tfx.v1.components.Evaluator]
      component, containing the blessing model.
    infra_blessing: An optional Channel of type
      [`standard_artifacts.InfraBlessing`][tfx.v1.types.standard_artifacts.InfraBlessing], usually produced from an
      [InfraValidator][tfx.v1.components.InfraValidator] component, containing the validation result.
    custom_config: A dict which contains the deployment job parameters to be
      passed to Cloud platforms.
  """
  super().__init__(
      model=model,
      model_blessing=model_blessing,
      infra_blessing=infra_blessing,
      custom_config=custom_config)
Attributes
DRIVER_CLASS class-attribute instance-attribute
DRIVER_CLASS = BaseDriver
EXECUTOR_SPEC class-attribute instance-attribute
EXECUTOR_SPEC = ExecutorClassSpec(Executor)
POST_EXECUTABLE_SPEC class-attribute instance-attribute
POST_EXECUTABLE_SPEC = None
PRE_EXECUTABLE_SPEC class-attribute instance-attribute
PRE_EXECUTABLE_SPEC = None
SPEC_CLASS class-attribute instance-attribute
SPEC_CLASS = PusherSpec
component_id property
component_id: str
component_type property
component_type: str
downstream_nodes property
downstream_nodes
driver_class instance-attribute
driver_class = driver_class
exec_properties property
exec_properties: Dict[str, Any]
executor_spec instance-attribute
executor_spec = executor_spec
id property writable
id: str

Node id, unique across all TFX nodes in a pipeline.

If id is set by the user, return it directly. Otherwise, return .

RETURNS DESCRIPTION
str

node id.

inputs property
inputs: Dict[str, Channel]
node_execution_options property writable
node_execution_options: Optional[NodeExecutionOptions]
outputs property
outputs: Dict[str, OutputChannel]

Component's output channel dict.

platform_config instance-attribute
platform_config = None
spec instance-attribute
spec = spec
type property
type: str
type_annotation property
type_annotation: Optional[Type[SystemExecution]]
upstream_nodes property
upstream_nodes
Functions
add_downstream_node
add_downstream_node(downstream_node)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_node.

PARAMETER DESCRIPTION
downstream_node

a component that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_node(self, downstream_node):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_node`.

  Args:
    downstream_node: a component that must run after this node.
  """
  self._downstream_nodes.add(downstream_node)
  if self not in downstream_node.upstream_nodes:
    downstream_node.add_upstream_node(self)
add_downstream_nodes
add_downstream_nodes(downstream_nodes)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_nodes.

PARAMETER DESCRIPTION
downstream_nodes

a list of components that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_nodes(self, downstream_nodes):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_nodes`.

  Args:
    downstream_nodes: a list of components that must run after this node.
  """
  self._downstream_nodes.update(downstream_nodes)
  for downstream_node in downstream_nodes:
    if self not in downstream_node.upstream_nodes:
      downstream_node.add_upstream_node(self)
add_upstream_node
add_upstream_node(upstream_node)

Experimental: Add another component that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_downstream_node.

PARAMETER DESCRIPTION
upstream_node

a component that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_node(self, upstream_node):
  """Experimental: Add another component that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_downstream_node`.

  Args:
    upstream_node: a component that must run before this node.
  """
  self._upstream_nodes.add(upstream_node)
  if self not in upstream_node.downstream_nodes:
    upstream_node.add_downstream_node(self)
add_upstream_nodes
add_upstream_nodes(upstream_nodes)

Experimental: Add components that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

PARAMETER DESCRIPTION
upstream_nodes

a list of components that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_nodes(self, upstream_nodes):
  """Experimental: Add components that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.


  Args:
    upstream_nodes: a list of components that must run before this node.
  """
  self._upstream_nodes.update(upstream_nodes)
  for upstream_node in upstream_nodes:
    if self not in upstream_node.downstream_nodes:
      upstream_node.add_downstream_node(self)
from_json_dict classmethod
from_json_dict(dict_data: Dict[str, Any]) -> Any

Convert from dictionary data to an object.

Source code in tfx/utils/json_utils.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def from_json_dict(cls, dict_data: Dict[str, Any]) -> Any:
  """Convert from dictionary data to an object."""
  instance = cls.__new__(cls)
  instance.__dict__ = dict_data
  return instance
get_class_type classmethod
get_class_type() -> str
Source code in tfx/dsl/components/base/base_node.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def get_class_type(cls) -> str:
  nondeprecated_class = deprecation_utils.get_first_nondeprecated_class(cls)
  # TODO(b/221166027): Turn strict_check=True once failing tests are fixed.
  return name_utils.get_full_name(nondeprecated_class, strict_check=False)
remove_downstream_node
remove_downstream_node(downstream_node)
Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def remove_downstream_node(self, downstream_node):
  self._downstream_nodes.remove(downstream_node)
  if self in downstream_node.upstream_nodes:
    downstream_node.remove_upstream_node(self)
remove_upstream_node
remove_upstream_node(upstream_node)
Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def remove_upstream_node(self, upstream_node):
  self._upstream_nodes.remove(upstream_node)
  if self in upstream_node.downstream_nodes:
    upstream_node.remove_downstream_node(self)
to_json_dict
to_json_dict() -> Dict[str, Any]

Convert from an object to a JSON serializable dictionary.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def to_json_dict(self) -> Dict[str, Any]:
  """Convert from an object to a JSON serializable dictionary."""
  return dict((k, v)
              for k, v in self.__dict__.items()
              if k not in ['_upstream_nodes', '_downstream_nodes'])
with_id
with_id(id: str) -> Self
Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def with_id(self, id: str) -> typing_extensions.Self:  # pylint: disable=redefined-builtin
  self._id = id
  return self
with_node_execution_options
with_node_execution_options(node_execution_options: NodeExecutionOptions) -> Self
Source code in tfx/dsl/components/base/base_node.py
def with_node_execution_options(
    self, node_execution_options: utils.NodeExecutionOptions
) -> typing_extensions.Self:
  self.node_execution_options = node_execution_options
  return self
with_platform_config
with_platform_config(config: Message) -> Self

Attaches a proto-form platform config to a component.

The config will be a per-node platform-specific config.

PARAMETER DESCRIPTION
config

platform config to attach to the component.

TYPE: Message

RETURNS DESCRIPTION
Self

the same component itself.

Source code in tfx/dsl/components/base/base_component.py
@doc_controls.do_not_doc_in_subclasses
def with_platform_config(
    self, config: message.Message
) -> typing_extensions.Self:
  """Attaches a proto-form platform config to a component.

  The config will be a per-node platform-specific config.

  Args:
    config: platform config to attach to the component.

  Returns:
    the same component itself.
  """
  self.platform_config = config
  return self
Trainer
Trainer(examples: Optional[Channel] = None, transformed_examples: Optional[Channel] = None, transform_graph: Optional[Channel] = None, schema: Optional[Channel] = None, base_model: Optional[Channel] = None, hyperparameters: Optional[Channel] = None, module_file: Optional[Union[str, RuntimeParameter]] = None, run_fn: Optional[Union[str, RuntimeParameter]] = None, train_args: Optional[Union[TrainArgs, RuntimeParameter]] = None, eval_args: Optional[Union[EvalArgs, RuntimeParameter]] = None, custom_config: Optional[Dict[str, Any]] = None)

Bases: Trainer

Cloud AI Platform Trainer component.

Construct a Trainer component.

PARAMETER DESCRIPTION
examples

A Channel of type standard_artifacts.Examples, serving as the source of examples used in training (required). May be raw or transformed.

TYPE: Optional[Channel] DEFAULT: None

transformed_examples

Deprecated field. Please set examples instead.

TYPE: Optional[Channel] DEFAULT: None

transform_graph

An optional Channel of type standard_artifacts.TransformGraph, serving as the input transform graph if present.

TYPE: Optional[Channel] DEFAULT: None

schema

An optional Channel of type standard_artifacts.Schema, serving as the schema of training and eval data. Schema is optional when 1) transform_graph is provided which contains schema. 2) user module bypasses the usage of schema, e.g., hardcoded.

TYPE: Optional[Channel] DEFAULT: None

base_model

A Channel of type Model, containing model that will be used for training. This can be used for warmstart, transfer learning or model ensembling.

TYPE: Optional[Channel] DEFAULT: None

hyperparameters

A Channel of type standard_artifacts.HyperParameters, serving as the hyperparameters for training module. Tuner's output best hyperparameters can be feed into this.

TYPE: Optional[Channel] DEFAULT: None

module_file

A path to python module file containing UDF model definition. The module_file must implement a function named run_fn at its top level with function signature:

def run_fn(trainer.fn_args_utils.FnArgs): ...

TYPE: Optional[Union[str, RuntimeParameter]] DEFAULT: None

run_fn

A python path to UDF model definition function for generic trainer. See 'module_file' for details. Exactly one of 'module_file' or 'run_fn' must be supplied if Trainer uses GenericExecutor (default).

TYPE: Optional[Union[str, RuntimeParameter]] DEFAULT: None

train_args

A proto.TrainArgs instance, containing args used for training Currently only splits and num_steps are available. Default behavior (when splits is empty) is train on train split.

TYPE: Optional[Union[TrainArgs, RuntimeParameter]] DEFAULT: None

eval_args

A proto.EvalArgs instance, containing args used for evaluation. Currently only splits and num_steps are available. Default behavior (when splits is empty) is evaluate on eval split.

TYPE: Optional[Union[EvalArgs, RuntimeParameter]] DEFAULT: None

custom_config

A dict which contains addtional training job parameters that will be passed into user module.

TYPE: Optional[Dict[str, Any]] DEFAULT: None

METHOD DESCRIPTION
add_downstream_node

Experimental: Add another component that must run after this one.

add_downstream_nodes

Experimental: Add another component that must run after this one.

add_upstream_node

Experimental: Add another component that must run before this one.

add_upstream_nodes

Experimental: Add components that must run before this one.

from_json_dict

Convert from dictionary data to an object.

get_class_type
remove_downstream_node
remove_upstream_node
to_json_dict

Convert from an object to a JSON serializable dictionary.

with_id
with_node_execution_options
with_platform_config

Attaches a proto-form platform config to a component.

ATTRIBUTE DESCRIPTION
DRIVER_CLASS

EXECUTOR_SPEC

POST_EXECUTABLE_SPEC

PRE_EXECUTABLE_SPEC

SPEC_CLASS

component_id

TYPE: str

component_type

TYPE: str

downstream_nodes

driver_class

exec_properties

TYPE: Dict[str, Any]

executor_spec

id

Node id, unique across all TFX nodes in a pipeline.

TYPE: str

inputs

TYPE: Dict[str, Channel]

node_execution_options

TYPE: Optional[NodeExecutionOptions]

outputs

Component's output channel dict.

TYPE: Dict[str, OutputChannel]

platform_config

spec

type

TYPE: str

type_annotation

TYPE: Optional[Type[SystemExecution]]

upstream_nodes

Source code in tfx/extensions/google_cloud_ai_platform/trainer/component.py
def __init__(self,
             examples: Optional[types.Channel] = None,
             transformed_examples: Optional[types.Channel] = None,
             transform_graph: Optional[types.Channel] = None,
             schema: Optional[types.Channel] = None,
             base_model: Optional[types.Channel] = None,
             hyperparameters: Optional[types.Channel] = None,
             module_file: Optional[Union[str,
                                         data_types.RuntimeParameter]] = None,
             run_fn: Optional[Union[str, data_types.RuntimeParameter]] = None,
             train_args: Optional[Union[trainer_pb2.TrainArgs,
                                        data_types.RuntimeParameter]] = None,
             eval_args: Optional[Union[trainer_pb2.EvalArgs,
                                       data_types.RuntimeParameter]] = None,
             custom_config: Optional[Dict[str, Any]] = None):
  """Construct a Trainer component.

  Args:
    examples: A Channel of type [`standard_artifacts.Examples`][tfx.v1.types.standard_artifacts.Examples], serving as the
      source of examples used in training (required). May be raw or
      transformed.
    transformed_examples: Deprecated field. Please set `examples` instead.
    transform_graph: An optional Channel of type
      [`standard_artifacts.TransformGraph`][tfx.v1.types.standard_artifacts.TransformGraph], serving as the input transform
      graph if present.
    schema:  An optional Channel of type [`standard_artifacts.Schema`][tfx.v1.types.standard_artifacts.Schema], serving
      as the schema of training and eval data. Schema is optional when 1)
      transform_graph is provided which contains schema. 2) user module
      bypasses the usage of schema, e.g., hardcoded.
    base_model: A Channel of type [`Model`][tfx.v1.types.standard_artifacts.Model], containing model that will be used
      for training. This can be used for warmstart, transfer learning or model
      ensembling.
    hyperparameters: A Channel of type [`standard_artifacts.HyperParameters`][tfx.v1.types.standard_artifacts.HyperParameters],
      serving as the hyperparameters for training module. Tuner's output best
      hyperparameters can be feed into this.
    module_file: A path to python module file containing UDF model definition.
      The module_file must implement a function named `run_fn` at its top
      level with function signature:
      ```python
      def run_fn(trainer.fn_args_utils.FnArgs): ...
      ```
    run_fn:  A python path to UDF model definition function for generic
      trainer. See 'module_file' for details. Exactly one of 'module_file' or
      'run_fn' must be supplied if Trainer uses GenericExecutor (default).
    train_args: A proto.TrainArgs instance, containing args used for training
      Currently only splits and num_steps are available. Default behavior
      (when splits is empty) is train on `train` split.
    eval_args: A proto.EvalArgs instance, containing args used for evaluation.
      Currently only splits and num_steps are available. Default behavior
      (when splits is empty) is evaluate on `eval` split.
    custom_config: A dict which contains addtional training job parameters
      that will be passed into user module.
  """
  super().__init__(
      examples=examples,
      transformed_examples=transformed_examples,
      transform_graph=transform_graph,
      schema=schema,
      base_model=base_model,
      hyperparameters=hyperparameters,
      train_args=train_args,
      eval_args=eval_args,
      module_file=module_file,
      run_fn=run_fn,
      custom_config=custom_config)
Attributes
DRIVER_CLASS class-attribute instance-attribute
DRIVER_CLASS = BaseDriver
EXECUTOR_SPEC class-attribute instance-attribute
EXECUTOR_SPEC = ExecutorClassSpec(GenericExecutor)
POST_EXECUTABLE_SPEC class-attribute instance-attribute
POST_EXECUTABLE_SPEC = None
PRE_EXECUTABLE_SPEC class-attribute instance-attribute
PRE_EXECUTABLE_SPEC = None
SPEC_CLASS class-attribute instance-attribute
SPEC_CLASS = TrainerSpec
component_id property
component_id: str
component_type property
component_type: str
downstream_nodes property
downstream_nodes
driver_class instance-attribute
driver_class = driver_class
exec_properties property
exec_properties: Dict[str, Any]
executor_spec instance-attribute
executor_spec = executor_spec
id property writable
id: str

Node id, unique across all TFX nodes in a pipeline.

If id is set by the user, return it directly. Otherwise, return .

RETURNS DESCRIPTION
str

node id.

inputs property
inputs: Dict[str, Channel]
node_execution_options property writable
node_execution_options: Optional[NodeExecutionOptions]
outputs property
outputs: Dict[str, OutputChannel]

Component's output channel dict.

platform_config instance-attribute
platform_config = None
spec instance-attribute
spec = spec
type property
type: str
type_annotation property
type_annotation: Optional[Type[SystemExecution]]
upstream_nodes property
upstream_nodes
Functions
add_downstream_node
add_downstream_node(downstream_node)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_node.

PARAMETER DESCRIPTION
downstream_node

a component that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_node(self, downstream_node):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_node`.

  Args:
    downstream_node: a component that must run after this node.
  """
  self._downstream_nodes.add(downstream_node)
  if self not in downstream_node.upstream_nodes:
    downstream_node.add_upstream_node(self)
add_downstream_nodes
add_downstream_nodes(downstream_nodes)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_nodes.

PARAMETER DESCRIPTION
downstream_nodes

a list of components that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_nodes(self, downstream_nodes):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_nodes`.

  Args:
    downstream_nodes: a list of components that must run after this node.
  """
  self._downstream_nodes.update(downstream_nodes)
  for downstream_node in downstream_nodes:
    if self not in downstream_node.upstream_nodes:
      downstream_node.add_upstream_node(self)
add_upstream_node
add_upstream_node(upstream_node)

Experimental: Add another component that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_downstream_node.

PARAMETER DESCRIPTION
upstream_node

a component that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_node(self, upstream_node):
  """Experimental: Add another component that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_downstream_node`.

  Args:
    upstream_node: a component that must run before this node.
  """
  self._upstream_nodes.add(upstream_node)
  if self not in upstream_node.downstream_nodes:
    upstream_node.add_downstream_node(self)
add_upstream_nodes
add_upstream_nodes(upstream_nodes)

Experimental: Add components that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

PARAMETER DESCRIPTION
upstream_nodes

a list of components that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_nodes(self, upstream_nodes):
  """Experimental: Add components that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.


  Args:
    upstream_nodes: a list of components that must run before this node.
  """
  self._upstream_nodes.update(upstream_nodes)
  for upstream_node in upstream_nodes:
    if self not in upstream_node.downstream_nodes:
      upstream_node.add_downstream_node(self)
from_json_dict classmethod
from_json_dict(dict_data: Dict[str, Any]) -> Any

Convert from dictionary data to an object.

Source code in tfx/utils/json_utils.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def from_json_dict(cls, dict_data: Dict[str, Any]) -> Any:
  """Convert from dictionary data to an object."""
  instance = cls.__new__(cls)
  instance.__dict__ = dict_data
  return instance
get_class_type classmethod
get_class_type() -> str
Source code in tfx/dsl/components/base/base_node.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def get_class_type(cls) -> str:
  nondeprecated_class = deprecation_utils.get_first_nondeprecated_class(cls)
  # TODO(b/221166027): Turn strict_check=True once failing tests are fixed.
  return name_utils.get_full_name(nondeprecated_class, strict_check=False)
remove_downstream_node
remove_downstream_node(downstream_node)
Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def remove_downstream_node(self, downstream_node):
  self._downstream_nodes.remove(downstream_node)
  if self in downstream_node.upstream_nodes:
    downstream_node.remove_upstream_node(self)
remove_upstream_node
remove_upstream_node(upstream_node)
Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def remove_upstream_node(self, upstream_node):
  self._upstream_nodes.remove(upstream_node)
  if self in upstream_node.downstream_nodes:
    upstream_node.remove_downstream_node(self)
to_json_dict
to_json_dict() -> Dict[str, Any]

Convert from an object to a JSON serializable dictionary.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def to_json_dict(self) -> Dict[str, Any]:
  """Convert from an object to a JSON serializable dictionary."""
  return dict((k, v)
              for k, v in self.__dict__.items()
              if k not in ['_upstream_nodes', '_downstream_nodes'])
with_id
with_id(id: str) -> Self
Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def with_id(self, id: str) -> typing_extensions.Self:  # pylint: disable=redefined-builtin
  self._id = id
  return self
with_node_execution_options
with_node_execution_options(node_execution_options: NodeExecutionOptions) -> Self
Source code in tfx/dsl/components/base/base_node.py
def with_node_execution_options(
    self, node_execution_options: utils.NodeExecutionOptions
) -> typing_extensions.Self:
  self.node_execution_options = node_execution_options
  return self
with_platform_config
with_platform_config(config: Message) -> Self

Attaches a proto-form platform config to a component.

The config will be a per-node platform-specific config.

PARAMETER DESCRIPTION
config

platform config to attach to the component.

TYPE: Message

RETURNS DESCRIPTION
Self

the same component itself.

Source code in tfx/dsl/components/base/base_component.py
@doc_controls.do_not_doc_in_subclasses
def with_platform_config(
    self, config: message.Message
) -> typing_extensions.Self:
  """Attaches a proto-form platform config to a component.

  The config will be a per-node platform-specific config.

  Args:
    config: platform config to attach to the component.

  Returns:
    the same component itself.
  """
  self.platform_config = config
  return self
Tuner
Tuner(examples: BaseChannel, schema: Optional[BaseChannel] = None, transform_graph: Optional[BaseChannel] = None, base_model: Optional[BaseChannel] = None, module_file: Optional[str] = None, tuner_fn: Optional[str] = None, train_args: Optional[TrainArgs] = None, eval_args: Optional[EvalArgs] = None, tune_args: Optional[TuneArgs] = None, custom_config: Optional[Dict[str, Any]] = None)

Bases: Tuner

TFX component for model hyperparameter tuning on AI Platform Training.

Construct a Tuner component.

PARAMETER DESCRIPTION
examples

A BaseChannel of type standard_artifacts.Examples, serving as the source of examples that are used in tuning (required).

TYPE: BaseChannel

schema

An optional BaseChannel of type standard_artifacts.Schema, serving as the schema of training and eval data. This is used when raw examples are provided.

TYPE: Optional[BaseChannel] DEFAULT: None

transform_graph

An optional BaseChannel of type standard_artifacts.TransformGraph, serving as the input transform graph if present. This is used when transformed examples are provided.

TYPE: Optional[BaseChannel] DEFAULT: None

base_model

A BaseChannel of type Model, containing model that will be used for training. This can be used for warmstart, transfer learning or model ensembling.

TYPE: Optional[BaseChannel] DEFAULT: None

module_file

A path to python module file containing UDF tuner definition. The module_file must implement a function named tuner_fn at its top level. The function must have the following signature.

def tuner_fn(fn_args: FnArgs) -> TunerFnResult:
    ...
Exactly one of 'module_file' or 'tuner_fn' must be supplied.

TYPE: Optional[str] DEFAULT: None

tuner_fn

A python path to UDF model definition function. See 'module_file' for the required signature of the UDF. Exactly one of 'module_file' or 'tuner_fn' must be supplied.

TYPE: Optional[str] DEFAULT: None

train_args

A trainer_pb2.TrainArgs instance, containing args used for training. Currently only splits and num_steps are available. Default behavior (when splits is empty) is train on train split.

TYPE: Optional[TrainArgs] DEFAULT: None

eval_args

A trainer_pb2.EvalArgs instance, containing args used for eval. Currently only splits and num_steps are available. Default behavior (when splits is empty) is evaluate on eval split.

TYPE: Optional[EvalArgs] DEFAULT: None

tune_args

A tuner_pb2.TuneArgs instance, containing args used for tuning. Currently only num_parallel_trials is available.

TYPE: Optional[TuneArgs] DEFAULT: None

custom_config

A dict which contains addtional training job parameters that will be passed into user module.

TYPE: Optional[Dict[str, Any]] DEFAULT: None

METHOD DESCRIPTION
add_downstream_node

Experimental: Add another component that must run after this one.

add_downstream_nodes

Experimental: Add another component that must run after this one.

add_upstream_node

Experimental: Add another component that must run before this one.

add_upstream_nodes

Experimental: Add components that must run before this one.

from_json_dict

Convert from dictionary data to an object.

get_class_type
remove_downstream_node
remove_upstream_node
to_json_dict

Convert from an object to a JSON serializable dictionary.

with_id
with_node_execution_options
with_platform_config

Attaches a proto-form platform config to a component.

ATTRIBUTE DESCRIPTION
DRIVER_CLASS

EXECUTOR_SPEC

POST_EXECUTABLE_SPEC

PRE_EXECUTABLE_SPEC

SPEC_CLASS

component_id

TYPE: str

component_type

TYPE: str

downstream_nodes

driver_class

exec_properties

TYPE: Dict[str, Any]

executor_spec

id

Node id, unique across all TFX nodes in a pipeline.

TYPE: str

inputs

TYPE: Dict[str, Channel]

node_execution_options

TYPE: Optional[NodeExecutionOptions]

outputs

Component's output channel dict.

TYPE: Dict[str, OutputChannel]

platform_config

spec

type

TYPE: str

type_annotation

TYPE: Optional[Type[SystemExecution]]

upstream_nodes

Source code in tfx/components/tuner/component.py
def __init__(self,
             examples: types.BaseChannel,
             schema: Optional[types.BaseChannel] = None,
             transform_graph: Optional[types.BaseChannel] = None,
             base_model: Optional[types.BaseChannel] = None,
             module_file: Optional[str] = None,
             tuner_fn: Optional[str] = None,
             train_args: Optional[trainer_pb2.TrainArgs] = None,
             eval_args: Optional[trainer_pb2.EvalArgs] = None,
             tune_args: Optional[tuner_pb2.TuneArgs] = None,
             custom_config: Optional[Dict[str, Any]] = None):
  """Construct a Tuner component.

  Args:
    examples: A [BaseChannel][tfx.v1.types.BaseChannel] of type [`standard_artifacts.Examples`][tfx.v1.types.standard_artifacts.Examples], serving as
      the source of examples that are used in tuning (required).
    schema:  An optional [BaseChannel][tfx.v1.types.BaseChannel] of type [`standard_artifacts.Schema`][tfx.v1.types.standard_artifacts.Schema],
      serving as the schema of training and eval data. This is used when raw
      examples are provided.
    transform_graph: An optional [BaseChannel][tfx.v1.types.BaseChannel] of type
      [`standard_artifacts.TransformGraph`][tfx.v1.types.standard_artifacts.TransformGraph], serving as the input transform
      graph if present. This is used when transformed examples are provided.
    base_model: A [BaseChannel][tfx.v1.types.BaseChannel] of type [`Model`][tfx.v1.types.standard_artifacts.Model], containing model that will be
      used for training. This can be used for warmstart, transfer learning or
      model ensembling.
    module_file: A path to python module file containing UDF tuner definition.
      The module_file must implement a function named `tuner_fn` at its top
      level. The function must have the following signature.
          ``` {.python .no-copy}
          def tuner_fn(fn_args: FnArgs) -> TunerFnResult:
              ...
          ```
          Exactly one of 'module_file' or 'tuner_fn' must be supplied.
    tuner_fn:  A python path to UDF model definition function. See
      'module_file' for the required signature of the UDF. Exactly one of
      'module_file' or 'tuner_fn' must be supplied.
    train_args: A trainer_pb2.TrainArgs instance, containing args used for
      training. Currently only splits and num_steps are available. Default
      behavior (when splits is empty) is train on `train` split.
    eval_args: A trainer_pb2.EvalArgs instance, containing args used for eval.
      Currently only splits and num_steps are available. Default behavior
      (when splits is empty) is evaluate on `eval` split.
    tune_args: A tuner_pb2.TuneArgs instance, containing args used for tuning.
      Currently only num_parallel_trials is available.
    custom_config: A dict which contains addtional training job parameters
      that will be passed into user module.
  """
  if bool(module_file) == bool(tuner_fn):
    raise ValueError(
        "Exactly one of 'module_file' or 'tuner_fn' must be supplied")

  best_hyperparameters = types.Channel(
      type=standard_artifacts.HyperParameters)
  tuner_results = types.Channel(type=standard_artifacts.TunerResults)
  spec = standard_component_specs.TunerSpec(
      examples=examples,
      schema=schema,
      transform_graph=transform_graph,
      base_model=base_model,
      module_file=module_file,
      tuner_fn=tuner_fn,
      train_args=train_args or trainer_pb2.TrainArgs(),
      eval_args=eval_args or trainer_pb2.EvalArgs(),
      tune_args=tune_args,
      best_hyperparameters=best_hyperparameters,
      tuner_results=tuner_results,
      custom_config=json_utils.dumps(custom_config),
  )
  super().__init__(spec=spec)

  if udf_utils.should_package_user_modules():
    # In this case, the `MODULE_PATH_KEY` execution property will be injected
    # as a reference to the given user module file after packaging, at which
    # point the `MODULE_FILE_KEY` execution property will be removed.
    udf_utils.add_user_module_dependency(
        self, standard_component_specs.MODULE_FILE_KEY,
        standard_component_specs.MODULE_PATH_KEY)
Attributes
DRIVER_CLASS class-attribute instance-attribute
DRIVER_CLASS = BaseDriver
EXECUTOR_SPEC class-attribute instance-attribute
EXECUTOR_SPEC = ExecutorClassSpec(Executor)
POST_EXECUTABLE_SPEC class-attribute instance-attribute
POST_EXECUTABLE_SPEC = None
PRE_EXECUTABLE_SPEC class-attribute instance-attribute
PRE_EXECUTABLE_SPEC = None
SPEC_CLASS class-attribute instance-attribute
SPEC_CLASS = TunerSpec
component_id property
component_id: str
component_type property
component_type: str
downstream_nodes property
downstream_nodes
driver_class instance-attribute
driver_class = driver_class
exec_properties property
exec_properties: Dict[str, Any]
executor_spec instance-attribute
executor_spec = executor_spec
id property writable
id: str

Node id, unique across all TFX nodes in a pipeline.

If id is set by the user, return it directly. Otherwise, return .

RETURNS DESCRIPTION
str

node id.

inputs property
inputs: Dict[str, Channel]
node_execution_options property writable
node_execution_options: Optional[NodeExecutionOptions]
outputs property
outputs: Dict[str, OutputChannel]

Component's output channel dict.

platform_config instance-attribute
platform_config = None
spec instance-attribute
spec = spec
type property
type: str
type_annotation property
type_annotation: Optional[Type[SystemExecution]]
upstream_nodes property
upstream_nodes
Functions
add_downstream_node
add_downstream_node(downstream_node)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_node.

PARAMETER DESCRIPTION
downstream_node

a component that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_node(self, downstream_node):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_node`.

  Args:
    downstream_node: a component that must run after this node.
  """
  self._downstream_nodes.add(downstream_node)
  if self not in downstream_node.upstream_nodes:
    downstream_node.add_upstream_node(self)
add_downstream_nodes
add_downstream_nodes(downstream_nodes)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_nodes.

PARAMETER DESCRIPTION
downstream_nodes

a list of components that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_nodes(self, downstream_nodes):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_nodes`.

  Args:
    downstream_nodes: a list of components that must run after this node.
  """
  self._downstream_nodes.update(downstream_nodes)
  for downstream_node in downstream_nodes:
    if self not in downstream_node.upstream_nodes:
      downstream_node.add_upstream_node(self)
add_upstream_node
add_upstream_node(upstream_node)

Experimental: Add another component that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_downstream_node.

PARAMETER DESCRIPTION
upstream_node

a component that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_node(self, upstream_node):
  """Experimental: Add another component that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_downstream_node`.

  Args:
    upstream_node: a component that must run before this node.
  """
  self._upstream_nodes.add(upstream_node)
  if self not in upstream_node.downstream_nodes:
    upstream_node.add_downstream_node(self)
add_upstream_nodes
add_upstream_nodes(upstream_nodes)

Experimental: Add components that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

PARAMETER DESCRIPTION
upstream_nodes

a list of components that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_nodes(self, upstream_nodes):
  """Experimental: Add components that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.


  Args:
    upstream_nodes: a list of components that must run before this node.
  """
  self._upstream_nodes.update(upstream_nodes)
  for upstream_node in upstream_nodes:
    if self not in upstream_node.downstream_nodes:
      upstream_node.add_downstream_node(self)
from_json_dict classmethod
from_json_dict(dict_data: Dict[str, Any]) -> Any

Convert from dictionary data to an object.

Source code in tfx/utils/json_utils.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def from_json_dict(cls, dict_data: Dict[str, Any]) -> Any:
  """Convert from dictionary data to an object."""
  instance = cls.__new__(cls)
  instance.__dict__ = dict_data
  return instance
get_class_type classmethod
get_class_type() -> str
Source code in tfx/dsl/components/base/base_node.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def get_class_type(cls) -> str:
  nondeprecated_class = deprecation_utils.get_first_nondeprecated_class(cls)
  # TODO(b/221166027): Turn strict_check=True once failing tests are fixed.
  return name_utils.get_full_name(nondeprecated_class, strict_check=False)
remove_downstream_node
remove_downstream_node(downstream_node)
Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def remove_downstream_node(self, downstream_node):
  self._downstream_nodes.remove(downstream_node)
  if self in downstream_node.upstream_nodes:
    downstream_node.remove_upstream_node(self)
remove_upstream_node
remove_upstream_node(upstream_node)
Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def remove_upstream_node(self, upstream_node):
  self._upstream_nodes.remove(upstream_node)
  if self in upstream_node.downstream_nodes:
    upstream_node.remove_downstream_node(self)
to_json_dict
to_json_dict() -> Dict[str, Any]

Convert from an object to a JSON serializable dictionary.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def to_json_dict(self) -> Dict[str, Any]:
  """Convert from an object to a JSON serializable dictionary."""
  return dict((k, v)
              for k, v in self.__dict__.items()
              if k not in ['_upstream_nodes', '_downstream_nodes'])
with_id
with_id(id: str) -> Self
Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def with_id(self, id: str) -> typing_extensions.Self:  # pylint: disable=redefined-builtin
  self._id = id
  return self
with_node_execution_options
with_node_execution_options(node_execution_options: NodeExecutionOptions) -> Self
Source code in tfx/dsl/components/base/base_node.py
def with_node_execution_options(
    self, node_execution_options: utils.NodeExecutionOptions
) -> typing_extensions.Self:
  self.node_execution_options = node_execution_options
  return self
with_platform_config
with_platform_config(config: Message) -> Self

Attaches a proto-form platform config to a component.

The config will be a per-node platform-specific config.

PARAMETER DESCRIPTION
config

platform config to attach to the component.

TYPE: Message

RETURNS DESCRIPTION
Self

the same component itself.

Source code in tfx/dsl/components/base/base_component.py
@doc_controls.do_not_doc_in_subclasses
def with_platform_config(
    self, config: message.Message
) -> typing_extensions.Self:
  """Attaches a proto-form platform config to a component.

  The config will be a per-node platform-specific config.

  Args:
    config: platform config to attach to the component.

  Returns:
    the same component itself.
  """
  self.platform_config = config
  return self
Modules
experimental

Types used in Google Cloud AI Platform under experimental stage.

ATTRIBUTE DESCRIPTION
BULK_INFERRER_SERVING_ARGS_KEY

ENDPOINT_ARGS_KEY

PUSHER_SERVING_ARGS_KEY

REMOTE_TRIALS_WORKING_DIR_KEY

TUNING_ARGS_KEY

Attributes
BULK_INFERRER_SERVING_ARGS_KEY module-attribute
BULK_INFERRER_SERVING_ARGS_KEY = documented(obj='ai_platform_serving_args', doc='Keys to the items in custom_config of Bulk Inferrer for passing bulkinferrer args to AI Platform.')
ENDPOINT_ARGS_KEY module-attribute
ENDPOINT_ARGS_KEY = documented(obj='endpoint', doc='Keys to the items in custom_config of Pusher/BulkInferrer for optional endpoint override (CAIP).')
PUSHER_SERVING_ARGS_KEY module-attribute
PUSHER_SERVING_ARGS_KEY = documented(obj='ai_platform_serving_args', doc='Keys to the items in custom_config of Pusher/BulkInferrer for passing serving args to AI Platform.')
REMOTE_TRIALS_WORKING_DIR_KEY module-attribute
REMOTE_TRIALS_WORKING_DIR_KEY = documented(obj='remote_trials_working_dir', doc='Keys to the items in custom_config of Tuner for specifying a working dir for remote trial.')
TUNING_ARGS_KEY module-attribute
TUNING_ARGS_KEY = documented(obj='ai_platform_tuning_args', doc='Keys to the items in custom_config of Tuner for passing training_job to AI Platform, and the GCP project under which the training job will be executed. In Vertex AI, this corresponds to a CustomJob as defined in:https://cloud.google.com/vertex-ai/docs/reference/rest/v1/projects.locations.customJobs#CustomJob.In CAIP, this corresponds to TrainingInputs as defined in:https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs#TrainingInput')

google_cloud_big_query

Google Cloud Big Query module.

CLASS DESCRIPTION
BigQueryExampleGen

Cloud BigQueryExampleGen component.

Pusher

Cloud Big Query Pusher component.

ATTRIBUTE DESCRIPTION
PUSHER_SERVING_ARGS_KEY

Attributes
PUSHER_SERVING_ARGS_KEY module-attribute
PUSHER_SERVING_ARGS_KEY = documented(obj='bigquery_serving_args', doc='Keys to the items in custom_config of Pusher for passing serving args to Big Query.')
Classes
BigQueryExampleGen
BigQueryExampleGen(query: Optional[str] = None, input_config: Optional[Union[Input, RuntimeParameter]] = None, output_config: Optional[Union[Output, RuntimeParameter]] = None, range_config: Optional[Union[RangeConfig, RuntimeParameter, Placeholder]] = None, custom_executor_spec: Optional[ExecutorSpec] = None, custom_config: Optional[Union[CustomConfig, RuntimeParameter]] = None)

Bases: QueryBasedExampleGen

Cloud BigQueryExampleGen component.

The BigQuery examplegen component takes a query, and generates train and eval examples for downstream components.

Component outputs contains:

Constructs a BigQueryExampleGen component.

PARAMETER DESCRIPTION
query

BigQuery sql string, query result will be treated as a single split, can be overwritten by input_config.

TYPE: Optional[str] DEFAULT: None

input_config

An example_gen_pb2.Input instance with Split.pattern as BigQuery sql string. If set, it overwrites the 'query' arg, and allows different queries per split. If any field is provided as a RuntimeParameter, input_config should be constructed as a dict with the same field names as Input proto message.

TYPE: Optional[Union[Input, RuntimeParameter]] DEFAULT: None

output_config

An example_gen_pb2.Output instance, providing output configuration. If unset, default splits will be 'train' and 'eval' with size 2:1. If any field is provided as a RuntimeParameter, input_config should be constructed as a dict with the same field names as Output proto message.

TYPE: Optional[Union[Output, RuntimeParameter]] DEFAULT: None

range_config

An optional range_config_pb2.RangeConfig instance, specifying the range of span values to consider.

TYPE: Optional[Union[RangeConfig, RuntimeParameter, Placeholder]] DEFAULT: None

custom_executor_spec

Optional custom executor spec overriding the default executor spec specified in the component attribute.

TYPE: Optional[ExecutorSpec] DEFAULT: None

custom_config

An example_gen_pb2.CustomConfig instance, providing custom configuration for ExampleGen.

TYPE: Optional[Union[CustomConfig, RuntimeParameter]] DEFAULT: None

RAISES DESCRIPTION
RuntimeError

Only one of query and input_config should be set.

METHOD DESCRIPTION
add_downstream_node

Experimental: Add another component that must run after this one.

add_downstream_nodes

Experimental: Add another component that must run after this one.

add_upstream_node

Experimental: Add another component that must run before this one.

add_upstream_nodes

Experimental: Add components that must run before this one.

from_json_dict

Convert from dictionary data to an object.

get_class_type
remove_downstream_node
remove_upstream_node
to_json_dict

Convert from an object to a JSON serializable dictionary.

with_beam_pipeline_args

Add per component Beam pipeline args.

with_id
with_node_execution_options
with_platform_config

Attaches a proto-form platform config to a component.

ATTRIBUTE DESCRIPTION
DRIVER_CLASS

EXECUTOR_SPEC

POST_EXECUTABLE_SPEC

PRE_EXECUTABLE_SPEC

SPEC_CLASS

component_id

TYPE: str

component_type

TYPE: str

downstream_nodes

driver_class

exec_properties

TYPE: Dict[str, Any]

executor_spec

id

Node id, unique across all TFX nodes in a pipeline.

TYPE: str

inputs

TYPE: Dict[str, Channel]

node_execution_options

TYPE: Optional[NodeExecutionOptions]

outputs

Component's output channel dict.

TYPE: Dict[str, OutputChannel]

platform_config

spec

type

TYPE: str

type_annotation

TYPE: Optional[Type[SystemExecution]]

upstream_nodes

Source code in tfx/extensions/google_cloud_big_query/example_gen/component.py
def __init__(
    self,
    query: Optional[str] = None,
    input_config: Optional[
        Union[example_gen_pb2.Input, data_types.RuntimeParameter]
    ] = None,
    output_config: Optional[
        Union[example_gen_pb2.Output, data_types.RuntimeParameter]
    ] = None,
    range_config: Optional[
        Union[
            range_config_pb2.RangeConfig,
            data_types.RuntimeParameter,
            ph.Placeholder,
        ]
    ] = None,
    custom_executor_spec: Optional[executor_spec.ExecutorSpec] = None,
    custom_config: Optional[
        Union[example_gen_pb2.CustomConfig, data_types.RuntimeParameter]
    ] = None,
):
  """Constructs a BigQueryExampleGen component.

  Args:
    query: BigQuery sql string, query result will be treated as a single
      split, can be overwritten by input_config.
    input_config: An example_gen_pb2.Input instance with Split.pattern as
      BigQuery sql string. If set, it overwrites the 'query' arg, and allows
      different queries per split. If any field is provided as a
      RuntimeParameter, input_config should be constructed as a dict with the
      same field names as Input proto message.
    output_config: An example_gen_pb2.Output instance, providing output
      configuration. If unset, default splits will be 'train' and 'eval' with
      size 2:1. If any field is provided as a RuntimeParameter, input_config
      should be constructed as a dict with the same field names as Output
      proto message.
    range_config: An optional range_config_pb2.RangeConfig instance,
      specifying the range of span values to consider.
    custom_executor_spec: Optional custom executor spec overriding the default
      executor spec specified in the component attribute.
    custom_config: An
      [example_gen_pb2.CustomConfig](https://github.com/tensorflow/tfx/blob/master/tfx/proto/example_gen.proto)
      instance, providing custom configuration for ExampleGen.

  Raises:
    RuntimeError: Only one of query and input_config should be set.
  """
  if bool(query) == bool(input_config):
    raise RuntimeError('Exactly one of query and input_config should be set.')
  input_config = input_config or utils.make_default_input_config(query)
  super().__init__(
      input_config=input_config,
      output_config=output_config,
      custom_config=custom_config,
      range_config=range_config,
      custom_executor_spec=custom_executor_spec,
  )
Attributes
DRIVER_CLASS class-attribute instance-attribute
DRIVER_CLASS = QueryBasedDriver
EXECUTOR_SPEC class-attribute instance-attribute
EXECUTOR_SPEC = BeamExecutorSpec(Executor)
POST_EXECUTABLE_SPEC class-attribute instance-attribute
POST_EXECUTABLE_SPEC = None
PRE_EXECUTABLE_SPEC class-attribute instance-attribute
PRE_EXECUTABLE_SPEC = None
SPEC_CLASS class-attribute instance-attribute
SPEC_CLASS = QueryBasedExampleGenSpec
component_id property
component_id: str
component_type property
component_type: str
downstream_nodes property
downstream_nodes
driver_class instance-attribute
driver_class = driver_class
exec_properties property
exec_properties: Dict[str, Any]
executor_spec instance-attribute
executor_spec = executor_spec
id property writable
id: str

Node id, unique across all TFX nodes in a pipeline.

If id is set by the user, return it directly. Otherwise, return .

RETURNS DESCRIPTION
str

node id.

inputs property
inputs: Dict[str, Channel]
node_execution_options property writable
node_execution_options: Optional[NodeExecutionOptions]
outputs property
outputs: Dict[str, OutputChannel]

Component's output channel dict.

platform_config instance-attribute
platform_config = None
spec instance-attribute
spec = spec
type property
type: str
type_annotation property
type_annotation: Optional[Type[SystemExecution]]
upstream_nodes property
upstream_nodes
Functions
add_downstream_node
add_downstream_node(downstream_node)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_node.

PARAMETER DESCRIPTION
downstream_node

a component that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_node(self, downstream_node):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_node`.

  Args:
    downstream_node: a component that must run after this node.
  """
  self._downstream_nodes.add(downstream_node)
  if self not in downstream_node.upstream_nodes:
    downstream_node.add_upstream_node(self)
add_downstream_nodes
add_downstream_nodes(downstream_nodes)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_nodes.

PARAMETER DESCRIPTION
downstream_nodes

a list of components that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_nodes(self, downstream_nodes):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_nodes`.

  Args:
    downstream_nodes: a list of components that must run after this node.
  """
  self._downstream_nodes.update(downstream_nodes)
  for downstream_node in downstream_nodes:
    if self not in downstream_node.upstream_nodes:
      downstream_node.add_upstream_node(self)
add_upstream_node
add_upstream_node(upstream_node)

Experimental: Add another component that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_downstream_node.

PARAMETER DESCRIPTION
upstream_node

a component that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_node(self, upstream_node):
  """Experimental: Add another component that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_downstream_node`.

  Args:
    upstream_node: a component that must run before this node.
  """
  self._upstream_nodes.add(upstream_node)
  if self not in upstream_node.downstream_nodes:
    upstream_node.add_downstream_node(self)
add_upstream_nodes
add_upstream_nodes(upstream_nodes)

Experimental: Add components that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

PARAMETER DESCRIPTION
upstream_nodes

a list of components that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_nodes(self, upstream_nodes):
  """Experimental: Add components that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.


  Args:
    upstream_nodes: a list of components that must run before this node.
  """
  self._upstream_nodes.update(upstream_nodes)
  for upstream_node in upstream_nodes:
    if self not in upstream_node.downstream_nodes:
      upstream_node.add_downstream_node(self)
from_json_dict classmethod
from_json_dict(dict_data: Dict[str, Any]) -> Any

Convert from dictionary data to an object.

Source code in tfx/utils/json_utils.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def from_json_dict(cls, dict_data: Dict[str, Any]) -> Any:
  """Convert from dictionary data to an object."""
  instance = cls.__new__(cls)
  instance.__dict__ = dict_data
  return instance
get_class_type classmethod
get_class_type() -> str
Source code in tfx/dsl/components/base/base_node.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def get_class_type(cls) -> str:
  nondeprecated_class = deprecation_utils.get_first_nondeprecated_class(cls)
  # TODO(b/221166027): Turn strict_check=True once failing tests are fixed.
  return name_utils.get_full_name(nondeprecated_class, strict_check=False)
remove_downstream_node
remove_downstream_node(downstream_node)
Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def remove_downstream_node(self, downstream_node):
  self._downstream_nodes.remove(downstream_node)
  if self in downstream_node.upstream_nodes:
    downstream_node.remove_upstream_node(self)
remove_upstream_node
remove_upstream_node(upstream_node)
Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def remove_upstream_node(self, upstream_node):
  self._upstream_nodes.remove(upstream_node)
  if self in upstream_node.downstream_nodes:
    upstream_node.remove_downstream_node(self)
to_json_dict
to_json_dict() -> Dict[str, Any]

Convert from an object to a JSON serializable dictionary.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def to_json_dict(self) -> Dict[str, Any]:
  """Convert from an object to a JSON serializable dictionary."""
  return dict((k, v)
              for k, v in self.__dict__.items()
              if k not in ['_upstream_nodes', '_downstream_nodes'])
with_beam_pipeline_args
with_beam_pipeline_args(beam_pipeline_args: Iterable[Union[str, Placeholder]]) -> BaseBeamComponent

Add per component Beam pipeline args.

PARAMETER DESCRIPTION
beam_pipeline_args

List of Beam pipeline args to be added to the Beam executor spec.

TYPE: Iterable[Union[str, Placeholder]]

RETURNS DESCRIPTION
BaseBeamComponent

the same component itself.

Source code in tfx/dsl/components/base/base_beam_component.py
def with_beam_pipeline_args(
    self, beam_pipeline_args: Iterable[Union[str, placeholder.Placeholder]]
) -> 'BaseBeamComponent':
  """Add per component Beam pipeline args.

  Args:
    beam_pipeline_args: List of Beam pipeline args to be added to the Beam
      executor spec.

  Returns:
    the same component itself.
  """
  cast(executor_spec.BeamExecutorSpec,
       self.executor_spec).add_beam_pipeline_args(beam_pipeline_args)
  return self
with_id
with_id(id: str) -> Self
Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def with_id(self, id: str) -> typing_extensions.Self:  # pylint: disable=redefined-builtin
  self._id = id
  return self
with_node_execution_options
with_node_execution_options(node_execution_options: NodeExecutionOptions) -> Self
Source code in tfx/dsl/components/base/base_node.py
def with_node_execution_options(
    self, node_execution_options: utils.NodeExecutionOptions
) -> typing_extensions.Self:
  self.node_execution_options = node_execution_options
  return self
with_platform_config
with_platform_config(config: Message) -> Self

Attaches a proto-form platform config to a component.

The config will be a per-node platform-specific config.

PARAMETER DESCRIPTION
config

platform config to attach to the component.

TYPE: Message

RETURNS DESCRIPTION
Self

the same component itself.

Source code in tfx/dsl/components/base/base_component.py
@doc_controls.do_not_doc_in_subclasses
def with_platform_config(
    self, config: message.Message
) -> typing_extensions.Self:
  """Attaches a proto-form platform config to a component.

  The config will be a per-node platform-specific config.

  Args:
    config: platform config to attach to the component.

  Returns:
    the same component itself.
  """
  self.platform_config = config
  return self
Pusher
Pusher(model: Optional[Channel] = None, model_blessing: Optional[Channel] = None, infra_blessing: Optional[Channel] = None, custom_config: Optional[Dict[str, Any]] = None)

Bases: Pusher

Cloud Big Query Pusher component.

Component outputs contains:

  • pushed_model: Channel of type standard_artifacts.PushedModel with result of push.

Construct a Pusher component.

PARAMETER DESCRIPTION
model

An optional Channel of type standard_artifacts.Model, usually produced by a Trainer component.

TYPE: Optional[Channel] DEFAULT: None

model_blessing

An optional Channel of type standard_artifacts.ModelBlessing, usually produced from an Evaluator component.

TYPE: Optional[Channel] DEFAULT: None

infra_blessing

An optional Channel of type standard_artifacts.InfraBlessing, usually produced from an InfraValidator component.

TYPE: Optional[Channel] DEFAULT: None

custom_config

A dict which contains the deployment job parameters to be passed to Cloud platforms.

TYPE: Optional[Dict[str, Any]] DEFAULT: None

METHOD DESCRIPTION
add_downstream_node

Experimental: Add another component that must run after this one.

add_downstream_nodes

Experimental: Add another component that must run after this one.

add_upstream_node

Experimental: Add another component that must run before this one.

add_upstream_nodes

Experimental: Add components that must run before this one.

from_json_dict

Convert from dictionary data to an object.

get_class_type
remove_downstream_node
remove_upstream_node
to_json_dict

Convert from an object to a JSON serializable dictionary.

with_id
with_node_execution_options
with_platform_config

Attaches a proto-form platform config to a component.

ATTRIBUTE DESCRIPTION
DRIVER_CLASS

EXECUTOR_SPEC

POST_EXECUTABLE_SPEC

PRE_EXECUTABLE_SPEC

SPEC_CLASS

component_id

TYPE: str

component_type

TYPE: str

downstream_nodes

driver_class

exec_properties

TYPE: Dict[str, Any]

executor_spec

id

Node id, unique across all TFX nodes in a pipeline.

TYPE: str

inputs

TYPE: Dict[str, Channel]

node_execution_options

TYPE: Optional[NodeExecutionOptions]

outputs

Component's output channel dict.

TYPE: Dict[str, OutputChannel]

platform_config

spec

type

TYPE: str

type_annotation

TYPE: Optional[Type[SystemExecution]]

upstream_nodes

Source code in tfx/extensions/google_cloud_big_query/pusher/component.py
def __init__(self,
             model: Optional[types.Channel] = None,
             model_blessing: Optional[types.Channel] = None,
             infra_blessing: Optional[types.Channel] = None,
             custom_config: Optional[Dict[str, Any]] = None):
  """Construct a Pusher component.

  Args:
    model: An optional Channel of type [`standard_artifacts.Model`][tfx.v1.types.standard_artifacts.Model], usually
      produced by a [Trainer][tfx.v1.components.Trainer] component.
    model_blessing: An optional Channel of type
      [`standard_artifacts.ModelBlessing`][tfx.v1.types.standard_artifacts.ModelBlessing], usually produced from an Evaluator
      component.
    infra_blessing: An optional Channel of type
      [`standard_artifacts.InfraBlessing`][tfx.v1.types.standard_artifacts.InfraBlessing], usually produced from an
      [InfraValidator][tfx.v1.components.InfraValidator] component.
    custom_config: A dict which contains the deployment job parameters to be
      passed to Cloud platforms.
  """
  super().__init__(
      model=model,
      model_blessing=model_blessing,
      infra_blessing=infra_blessing,
      custom_config=custom_config)
Attributes
DRIVER_CLASS class-attribute instance-attribute
DRIVER_CLASS = BaseDriver
EXECUTOR_SPEC class-attribute instance-attribute
EXECUTOR_SPEC = ExecutorClassSpec(Executor)
POST_EXECUTABLE_SPEC class-attribute instance-attribute
POST_EXECUTABLE_SPEC = None
PRE_EXECUTABLE_SPEC class-attribute instance-attribute
PRE_EXECUTABLE_SPEC = None
SPEC_CLASS class-attribute instance-attribute
SPEC_CLASS = PusherSpec
component_id property
component_id: str
component_type property
component_type: str
downstream_nodes property
downstream_nodes
driver_class instance-attribute
driver_class = driver_class
exec_properties property
exec_properties: Dict[str, Any]
executor_spec instance-attribute
executor_spec = executor_spec
id property writable
id: str

Node id, unique across all TFX nodes in a pipeline.

If id is set by the user, return it directly. Otherwise, return .

RETURNS DESCRIPTION
str

node id.

inputs property
inputs: Dict[str, Channel]
node_execution_options property writable
node_execution_options: Optional[NodeExecutionOptions]
outputs property
outputs: Dict[str, OutputChannel]

Component's output channel dict.

platform_config instance-attribute
platform_config = None
spec instance-attribute
spec = spec
type property
type: str
type_annotation property
type_annotation: Optional[Type[SystemExecution]]
upstream_nodes property
upstream_nodes
Functions
add_downstream_node
add_downstream_node(downstream_node)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_node.

PARAMETER DESCRIPTION
downstream_node

a component that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_node(self, downstream_node):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_node`.

  Args:
    downstream_node: a component that must run after this node.
  """
  self._downstream_nodes.add(downstream_node)
  if self not in downstream_node.upstream_nodes:
    downstream_node.add_upstream_node(self)
add_downstream_nodes
add_downstream_nodes(downstream_nodes)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_nodes.

PARAMETER DESCRIPTION
downstream_nodes

a list of components that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_nodes(self, downstream_nodes):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_nodes`.

  Args:
    downstream_nodes: a list of components that must run after this node.
  """
  self._downstream_nodes.update(downstream_nodes)
  for downstream_node in downstream_nodes:
    if self not in downstream_node.upstream_nodes:
      downstream_node.add_upstream_node(self)
add_upstream_node
add_upstream_node(upstream_node)

Experimental: Add another component that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_downstream_node.

PARAMETER DESCRIPTION
upstream_node

a component that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_node(self, upstream_node):
  """Experimental: Add another component that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_downstream_node`.

  Args:
    upstream_node: a component that must run before this node.
  """
  self._upstream_nodes.add(upstream_node)
  if self not in upstream_node.downstream_nodes:
    upstream_node.add_downstream_node(self)
add_upstream_nodes
add_upstream_nodes(upstream_nodes)

Experimental: Add components that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

PARAMETER DESCRIPTION
upstream_nodes

a list of components that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_nodes(self, upstream_nodes):
  """Experimental: Add components that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.


  Args:
    upstream_nodes: a list of components that must run before this node.
  """
  self._upstream_nodes.update(upstream_nodes)
  for upstream_node in upstream_nodes:
    if self not in upstream_node.downstream_nodes:
      upstream_node.add_downstream_node(self)
from_json_dict classmethod
from_json_dict(dict_data: Dict[str, Any]) -> Any

Convert from dictionary data to an object.

Source code in tfx/utils/json_utils.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def from_json_dict(cls, dict_data: Dict[str, Any]) -> Any:
  """Convert from dictionary data to an object."""
  instance = cls.__new__(cls)
  instance.__dict__ = dict_data
  return instance
get_class_type classmethod
get_class_type() -> str
Source code in tfx/dsl/components/base/base_node.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def get_class_type(cls) -> str:
  nondeprecated_class = deprecation_utils.get_first_nondeprecated_class(cls)
  # TODO(b/221166027): Turn strict_check=True once failing tests are fixed.
  return name_utils.get_full_name(nondeprecated_class, strict_check=False)
remove_downstream_node
remove_downstream_node(downstream_node)
Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def remove_downstream_node(self, downstream_node):
  self._downstream_nodes.remove(downstream_node)
  if self in downstream_node.upstream_nodes:
    downstream_node.remove_upstream_node(self)
remove_upstream_node
remove_upstream_node(upstream_node)
Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def remove_upstream_node(self, upstream_node):
  self._upstream_nodes.remove(upstream_node)
  if self in upstream_node.downstream_nodes:
    upstream_node.remove_downstream_node(self)
to_json_dict
to_json_dict() -> Dict[str, Any]

Convert from an object to a JSON serializable dictionary.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def to_json_dict(self) -> Dict[str, Any]:
  """Convert from an object to a JSON serializable dictionary."""
  return dict((k, v)
              for k, v in self.__dict__.items()
              if k not in ['_upstream_nodes', '_downstream_nodes'])
with_id
with_id(id: str) -> Self
Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def with_id(self, id: str) -> typing_extensions.Self:  # pylint: disable=redefined-builtin
  self._id = id
  return self
with_node_execution_options
with_node_execution_options(node_execution_options: NodeExecutionOptions) -> Self
Source code in tfx/dsl/components/base/base_node.py
def with_node_execution_options(
    self, node_execution_options: utils.NodeExecutionOptions
) -> typing_extensions.Self:
  self.node_execution_options = node_execution_options
  return self
with_platform_config
with_platform_config(config: Message) -> Self

Attaches a proto-form platform config to a component.

The config will be a per-node platform-specific config.

PARAMETER DESCRIPTION
config

platform config to attach to the component.

TYPE: Message

RETURNS DESCRIPTION
Self

the same component itself.

Source code in tfx/dsl/components/base/base_component.py
@doc_controls.do_not_doc_in_subclasses
def with_platform_config(
    self, config: message.Message
) -> typing_extensions.Self:
  """Attaches a proto-form platform config to a component.

  The config will be a per-node platform-specific config.

  Args:
    config: platform config to attach to the component.

  Returns:
    the same component itself.
  """
  self.platform_config = config
  return self