Skip to content

Components

tfx.v1.components

TFX components module.

CLASS DESCRIPTION
BulkInferrer

A TFX component to do batch inference on a model with unlabelled examples.

CsvExampleGen

Official TFX CsvExampleGen component.

Evaluator

A TFX component to evaluate models trained by a TFX Trainer component.

ExampleDiff

TFX ExampleDiff component.

ExampleValidator

A TFX component to validate input examples.

FnArgs

Args to pass to user defined training/tuning function(s).

ImportExampleGen

Official TFX ImportExampleGen component.

ImportSchemaGen

A TFX ImportSchemaGen component to import a schema file into the pipeline.

InfraValidator

A TFX component to validate the model against the serving infrastructure.

Pusher

A TFX component to push validated TensorFlow models to a model serving platform.

SchemaGen

A TFX SchemaGen component to generate a schema from the training data.

StatisticsGen

Official TFX StatisticsGen component.

Trainer

A TFX component to train a TensorFlow model.

Transform

A TFX component to transform the input examples.

Tuner

A TFX component for model hyperparameter tuning.

ATTRIBUTE DESCRIPTION
DataAccessor

For accessing the data on disk.

TunerFnResult

Return type of tuner_fn.

Attributes

DataAccessor module-attribute

DataAccessor = NamedTuple('DataAccessor', [('tf_dataset_factory', Callable[[List[str], TensorFlowDatasetOptions, Optional[Schema]], Dataset]), ('record_batch_factory', Callable[[List[str], RecordBatchesOptions, Optional[Schema]], Iterator[RecordBatch]]), ('data_view_decode_fn', Optional[Callable[[Tensor], Dict[str, Any]]])])

For accessing the data on disk.

Contains factories that can create tf.data.Datasets or other means to access the train/eval data. They provide a uniform way of accessing data, regardless of how the data is stored on disk.

TunerFnResult module-attribute

TunerFnResult = NamedTuple('TunerFnResult', [('tuner', BaseTuner), ('fit_kwargs', Dict[str, Any])])

Return type of tuner_fn.

tuner_fn returns a TunerFnResult that contains: - tuner: A BaseTuner that will be used for tuning. - fit_kwargs: Args to pass to tuner's run_trial function for fitting the model , e.g., the training and validation dataset. Required args depend on the tuner's implementation.

Classes

BulkInferrer

BulkInferrer(examples: BaseChannel, model: Optional[BaseChannel] = None, model_blessing: Optional[BaseChannel] = None, data_spec: Optional[Union[DataSpec, RuntimeParameter]] = None, model_spec: Optional[Union[ModelSpec, RuntimeParameter]] = None, output_example_spec: Optional[Union[OutputExampleSpec, RuntimeParameter]] = None)

Bases: BaseBeamComponent

A TFX component to do batch inference on a model with unlabelled examples.

BulkInferrer consumes examples data and a model, and produces the inference results to an external location as PredictionLog proto.

BulkInferrer will infer on validated model.

Example
  # Uses BulkInferrer to inference on examples.
  bulk_inferrer = BulkInferrer(
      examples=example_gen.outputs['examples'],
      model=trainer.outputs['model'])

Component outputs contains:

See the BulkInferrer guide for more details.

Construct an BulkInferrer component.

PARAMETER DESCRIPTION
examples

A BaseChannel of type standard_artifacts.Examples, usually produced by an ExampleGen component. required

TYPE: BaseChannel

model

A BaseChannel of type standard_artifacts.Model, usually produced by a Trainer component.

TYPE: Optional[BaseChannel] DEFAULT: None

model_blessing

A BaseChannel of type standard_artifacts.ModelBlessing, usually produced by a ModelValidator component.

TYPE: Optional[BaseChannel] DEFAULT: None

data_spec

bulk_inferrer_pb2.DataSpec instance that describes data selection.

TYPE: Optional[Union[DataSpec, RuntimeParameter]] DEFAULT: None

model_spec

bulk_inferrer_pb2.ModelSpec instance that describes model specification.

TYPE: Optional[Union[ModelSpec, RuntimeParameter]] DEFAULT: None

output_example_spec

bulk_inferrer_pb2.OutputExampleSpec instance, specify if you want BulkInferrer to output examples instead of inference result.

TYPE: Optional[Union[OutputExampleSpec, RuntimeParameter]] DEFAULT: None

METHOD DESCRIPTION
add_downstream_node

Experimental: Add another component that must run after this one.

add_downstream_nodes

Experimental: Add another component that must run after this one.

add_upstream_node

Experimental: Add another component that must run before this one.

add_upstream_nodes

Experimental: Add components that must run before this one.

from_json_dict

Convert from dictionary data to an object.

to_json_dict

Convert from an object to a JSON serializable dictionary.

with_beam_pipeline_args

Add per component Beam pipeline args.

with_platform_config

Attaches a proto-form platform config to a component.

ATTRIBUTE DESCRIPTION
id

Node id, unique across all TFX nodes in a pipeline.

TYPE: str

outputs

Component's output channel dict.

TYPE: Dict[str, OutputChannel]

Source code in tfx/components/bulk_inferrer/component.py
def __init__(
    self,
    examples: types.BaseChannel,
    model: Optional[types.BaseChannel] = None,
    model_blessing: Optional[types.BaseChannel] = None,
    data_spec: Optional[Union[bulk_inferrer_pb2.DataSpec,
                              data_types.RuntimeParameter]] = None,
    model_spec: Optional[Union[bulk_inferrer_pb2.ModelSpec,
                               data_types.RuntimeParameter]] = None,
    output_example_spec: Optional[Union[bulk_inferrer_pb2.OutputExampleSpec,
                                        data_types.RuntimeParameter]] = None):
  """Construct an BulkInferrer component.

  Args:
    examples: A [BaseChannel][tfx.v1.types.BaseChannel] of type [`standard_artifacts.Examples`][tfx.v1.types.standard_artifacts.Examples], usually
      produced by an ExampleGen component. _required_
    model: A [BaseChannel][tfx.v1.types.BaseChannel] of type [`standard_artifacts.Model`][tfx.v1.types.standard_artifacts.Model], usually produced
      by a [Trainer][tfx.v1.components.Trainer] component.
    model_blessing: A [BaseChannel][tfx.v1.types.BaseChannel] of type [`standard_artifacts.ModelBlessing`][tfx.v1.types.standard_artifacts.ModelBlessing],
      usually produced by a ModelValidator component.
    data_spec: bulk_inferrer_pb2.DataSpec instance that describes data
      selection.
    model_spec: bulk_inferrer_pb2.ModelSpec instance that describes model
      specification.
    output_example_spec: bulk_inferrer_pb2.OutputExampleSpec instance, specify
      if you want BulkInferrer to output examples instead of inference result.
  """
  if output_example_spec:
    output_examples = types.Channel(type=standard_artifacts.Examples)
    inference_result = None
  else:
    inference_result = types.Channel(type=standard_artifacts.InferenceResult)
    output_examples = None

  spec = standard_component_specs.BulkInferrerSpec(
      examples=examples,
      model=model,
      model_blessing=model_blessing,
      data_spec=data_spec or bulk_inferrer_pb2.DataSpec(),
      model_spec=model_spec or bulk_inferrer_pb2.ModelSpec(),
      output_example_spec=output_example_spec,
      inference_result=inference_result,
      output_examples=output_examples)
  super().__init__(spec=spec)
Attributes
id property writable
id: str

Node id, unique across all TFX nodes in a pipeline.

If id is set by the user, return it directly. Otherwise, return .

RETURNS DESCRIPTION
str

node id.

outputs property
outputs: Dict[str, OutputChannel]

Component's output channel dict.

Functions
add_downstream_node
add_downstream_node(downstream_node)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_node.

PARAMETER DESCRIPTION
downstream_node

a component that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_node(self, downstream_node):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_node`.

  Args:
    downstream_node: a component that must run after this node.
  """
  self._downstream_nodes.add(downstream_node)
  if self not in downstream_node.upstream_nodes:
    downstream_node.add_upstream_node(self)
add_downstream_nodes
add_downstream_nodes(downstream_nodes)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_nodes.

PARAMETER DESCRIPTION
downstream_nodes

a list of components that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_nodes(self, downstream_nodes):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_nodes`.

  Args:
    downstream_nodes: a list of components that must run after this node.
  """
  self._downstream_nodes.update(downstream_nodes)
  for downstream_node in downstream_nodes:
    if self not in downstream_node.upstream_nodes:
      downstream_node.add_upstream_node(self)
add_upstream_node
add_upstream_node(upstream_node)

Experimental: Add another component that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_downstream_node.

PARAMETER DESCRIPTION
upstream_node

a component that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_node(self, upstream_node):
  """Experimental: Add another component that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_downstream_node`.

  Args:
    upstream_node: a component that must run before this node.
  """
  self._upstream_nodes.add(upstream_node)
  if self not in upstream_node.downstream_nodes:
    upstream_node.add_downstream_node(self)
add_upstream_nodes
add_upstream_nodes(upstream_nodes)

Experimental: Add components that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

PARAMETER DESCRIPTION
upstream_nodes

a list of components that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_nodes(self, upstream_nodes):
  """Experimental: Add components that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.


  Args:
    upstream_nodes: a list of components that must run before this node.
  """
  self._upstream_nodes.update(upstream_nodes)
  for upstream_node in upstream_nodes:
    if self not in upstream_node.downstream_nodes:
      upstream_node.add_downstream_node(self)
from_json_dict classmethod
from_json_dict(dict_data: Dict[str, Any]) -> Any

Convert from dictionary data to an object.

Source code in tfx/utils/json_utils.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def from_json_dict(cls, dict_data: Dict[str, Any]) -> Any:
  """Convert from dictionary data to an object."""
  instance = cls.__new__(cls)
  instance.__dict__ = dict_data
  return instance
to_json_dict
to_json_dict() -> Dict[str, Any]

Convert from an object to a JSON serializable dictionary.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def to_json_dict(self) -> Dict[str, Any]:
  """Convert from an object to a JSON serializable dictionary."""
  return dict((k, v)
              for k, v in self.__dict__.items()
              if k not in ['_upstream_nodes', '_downstream_nodes'])
with_beam_pipeline_args
with_beam_pipeline_args(beam_pipeline_args: Iterable[Union[str, Placeholder]]) -> BaseBeamComponent

Add per component Beam pipeline args.

PARAMETER DESCRIPTION
beam_pipeline_args

List of Beam pipeline args to be added to the Beam executor spec.

TYPE: Iterable[Union[str, Placeholder]]

RETURNS DESCRIPTION
BaseBeamComponent

the same component itself.

Source code in tfx/dsl/components/base/base_beam_component.py
def with_beam_pipeline_args(
    self, beam_pipeline_args: Iterable[Union[str, placeholder.Placeholder]]
) -> 'BaseBeamComponent':
  """Add per component Beam pipeline args.

  Args:
    beam_pipeline_args: List of Beam pipeline args to be added to the Beam
      executor spec.

  Returns:
    the same component itself.
  """
  cast(executor_spec.BeamExecutorSpec,
       self.executor_spec).add_beam_pipeline_args(beam_pipeline_args)
  return self
with_platform_config
with_platform_config(config: Message) -> Self

Attaches a proto-form platform config to a component.

The config will be a per-node platform-specific config.

PARAMETER DESCRIPTION
config

platform config to attach to the component.

TYPE: Message

RETURNS DESCRIPTION
Self

the same component itself.

Source code in tfx/dsl/components/base/base_component.py
@doc_controls.do_not_doc_in_subclasses
def with_platform_config(
    self, config: message.Message
) -> typing_extensions.Self:
  """Attaches a proto-form platform config to a component.

  The config will be a per-node platform-specific config.

  Args:
    config: platform config to attach to the component.

  Returns:
    the same component itself.
  """
  self.platform_config = config
  return self

CsvExampleGen

CsvExampleGen(input_base: Optional[str] = None, input_config: Optional[Union[Input, RuntimeParameter]] = None, output_config: Optional[Union[Output, RuntimeParameter]] = None, range_config: Optional[Union[Placeholder, RangeConfig, RuntimeParameter]] = None)

Bases: FileBasedExampleGen

Official TFX CsvExampleGen component.

The csv examplegen component takes csv data, and generates train and eval examples for downstream components.

The csv examplegen encodes column values to tf.Example int/float/byte feature. For the case when there's missing cells, the csv examplegen uses:

  • tf.train.Feature(type_list=tf.train.typeList(value=[])), when the type can be inferred.
  • tf.train.Feature() when it cannot infer the type from the column.

Note that the type inferring will be per input split. If input isn't a single split, users need to ensure the column types align in each pre-splits.

For example, given the following csv rows of a split:

header:A,B,C,D
row1:  1,,x,0.1
row2:  2,,y,0.2
row3:  3,,,0.3
row4:

The output example will be

example1: 1(int), empty feature(no type), x(string), 0.1(float)
example2: 2(int), empty feature(no type), x(string), 0.2(float)
example3: 3(int), empty feature(no type), empty list(string), 0.3(float)

Note that the empty feature is tf.train.Feature() while empty list string feature is tf.train.Feature(bytes_list=tf.train.BytesList(value=[])).

Component outputs contains:

Construct a CsvExampleGen component.

PARAMETER DESCRIPTION
input_base

an external directory containing the CSV files.

TYPE: Optional[str] DEFAULT: None

input_config

An example_gen_pb2.Input instance, providing input configuration. If unset, the files under input_base will be treated as a single split.

TYPE: Optional[Union[Input, RuntimeParameter]] DEFAULT: None

output_config

An example_gen_pb2.Output instance, providing output configuration. If unset, default splits will be 'train' and 'eval' with size 2:1.

TYPE: Optional[Union[Output, RuntimeParameter]] DEFAULT: None

range_config

An optional range_config_pb2.RangeConfig instance, specifying the range of span values to consider. If unset, driver will default to searching for latest span with no restrictions.

TYPE: Optional[Union[Placeholder, RangeConfig, RuntimeParameter]] DEFAULT: None

METHOD DESCRIPTION
add_downstream_node

Experimental: Add another component that must run after this one.

add_downstream_nodes

Experimental: Add another component that must run after this one.

add_upstream_node

Experimental: Add another component that must run before this one.

add_upstream_nodes

Experimental: Add components that must run before this one.

from_json_dict

Convert from dictionary data to an object.

to_json_dict

Convert from an object to a JSON serializable dictionary.

with_beam_pipeline_args

Add per component Beam pipeline args.

with_platform_config

Attaches a proto-form platform config to a component.

ATTRIBUTE DESCRIPTION
id

Node id, unique across all TFX nodes in a pipeline.

TYPE: str

outputs

Component's output channel dict.

TYPE: Dict[str, OutputChannel]

Source code in tfx/components/example_gen/csv_example_gen/component.py
def __init__(
    self,
    input_base: Optional[str] = None,
    input_config: Optional[Union[example_gen_pb2.Input,
                                 data_types.RuntimeParameter]] = None,
    output_config: Optional[Union[example_gen_pb2.Output,
                                  data_types.RuntimeParameter]] = None,
    range_config: Optional[Union[placeholder.Placeholder,
                                 range_config_pb2.RangeConfig,
                                 data_types.RuntimeParameter]] = None):
  """Construct a CsvExampleGen component.

  Args:
    input_base: an external directory containing the CSV files.
    input_config: An example_gen_pb2.Input instance, providing input
      configuration. If unset, the files under input_base will be treated as a
      single split.
    output_config: An example_gen_pb2.Output instance, providing output
      configuration. If unset, default splits will be 'train' and 'eval' with
      size 2:1.
    range_config: An optional range_config_pb2.RangeConfig instance,
      specifying the range of span values to consider. If unset, driver will
      default to searching for latest span with no restrictions.
  """
  super().__init__(
      input_base=input_base,
      input_config=input_config,
      output_config=output_config,
      range_config=range_config)
Attributes
id property writable
id: str

Node id, unique across all TFX nodes in a pipeline.

If id is set by the user, return it directly. Otherwise, return .

RETURNS DESCRIPTION
str

node id.

outputs property
outputs: Dict[str, OutputChannel]

Component's output channel dict.

Functions
add_downstream_node
add_downstream_node(downstream_node)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_node.

PARAMETER DESCRIPTION
downstream_node

a component that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_node(self, downstream_node):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_node`.

  Args:
    downstream_node: a component that must run after this node.
  """
  self._downstream_nodes.add(downstream_node)
  if self not in downstream_node.upstream_nodes:
    downstream_node.add_upstream_node(self)
add_downstream_nodes
add_downstream_nodes(downstream_nodes)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_nodes.

PARAMETER DESCRIPTION
downstream_nodes

a list of components that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_nodes(self, downstream_nodes):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_nodes`.

  Args:
    downstream_nodes: a list of components that must run after this node.
  """
  self._downstream_nodes.update(downstream_nodes)
  for downstream_node in downstream_nodes:
    if self not in downstream_node.upstream_nodes:
      downstream_node.add_upstream_node(self)
add_upstream_node
add_upstream_node(upstream_node)

Experimental: Add another component that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_downstream_node.

PARAMETER DESCRIPTION
upstream_node

a component that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_node(self, upstream_node):
  """Experimental: Add another component that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_downstream_node`.

  Args:
    upstream_node: a component that must run before this node.
  """
  self._upstream_nodes.add(upstream_node)
  if self not in upstream_node.downstream_nodes:
    upstream_node.add_downstream_node(self)
add_upstream_nodes
add_upstream_nodes(upstream_nodes)

Experimental: Add components that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

PARAMETER DESCRIPTION
upstream_nodes

a list of components that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_nodes(self, upstream_nodes):
  """Experimental: Add components that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.


  Args:
    upstream_nodes: a list of components that must run before this node.
  """
  self._upstream_nodes.update(upstream_nodes)
  for upstream_node in upstream_nodes:
    if self not in upstream_node.downstream_nodes:
      upstream_node.add_downstream_node(self)
from_json_dict classmethod
from_json_dict(dict_data: Dict[str, Any]) -> Any

Convert from dictionary data to an object.

Source code in tfx/utils/json_utils.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def from_json_dict(cls, dict_data: Dict[str, Any]) -> Any:
  """Convert from dictionary data to an object."""
  instance = cls.__new__(cls)
  instance.__dict__ = dict_data
  return instance
to_json_dict
to_json_dict() -> Dict[str, Any]

Convert from an object to a JSON serializable dictionary.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def to_json_dict(self) -> Dict[str, Any]:
  """Convert from an object to a JSON serializable dictionary."""
  return dict((k, v)
              for k, v in self.__dict__.items()
              if k not in ['_upstream_nodes', '_downstream_nodes'])
with_beam_pipeline_args
with_beam_pipeline_args(beam_pipeline_args: Iterable[Union[str, Placeholder]]) -> BaseBeamComponent

Add per component Beam pipeline args.

PARAMETER DESCRIPTION
beam_pipeline_args

List of Beam pipeline args to be added to the Beam executor spec.

TYPE: Iterable[Union[str, Placeholder]]

RETURNS DESCRIPTION
BaseBeamComponent

the same component itself.

Source code in tfx/dsl/components/base/base_beam_component.py
def with_beam_pipeline_args(
    self, beam_pipeline_args: Iterable[Union[str, placeholder.Placeholder]]
) -> 'BaseBeamComponent':
  """Add per component Beam pipeline args.

  Args:
    beam_pipeline_args: List of Beam pipeline args to be added to the Beam
      executor spec.

  Returns:
    the same component itself.
  """
  cast(executor_spec.BeamExecutorSpec,
       self.executor_spec).add_beam_pipeline_args(beam_pipeline_args)
  return self
with_platform_config
with_platform_config(config: Message) -> Self

Attaches a proto-form platform config to a component.

The config will be a per-node platform-specific config.

PARAMETER DESCRIPTION
config

platform config to attach to the component.

TYPE: Message

RETURNS DESCRIPTION
Self

the same component itself.

Source code in tfx/dsl/components/base/base_component.py
@doc_controls.do_not_doc_in_subclasses
def with_platform_config(
    self, config: message.Message
) -> typing_extensions.Self:
  """Attaches a proto-form platform config to a component.

  The config will be a per-node platform-specific config.

  Args:
    config: platform config to attach to the component.

  Returns:
    the same component itself.
  """
  self.platform_config = config
  return self

Evaluator

Evaluator(examples: BaseChannel, model: Optional[BaseChannel] = None, baseline_model: Optional[BaseChannel] = None, feature_slicing_spec: Optional[Union[FeatureSlicingSpec, RuntimeParameter]] = None, fairness_indicator_thresholds: Optional[Union[List[float], RuntimeParameter]] = None, example_splits: Optional[List[str]] = None, eval_config: Optional[EvalConfig] = None, schema: Optional[BaseChannel] = None, module_file: Optional[str] = None, module_path: Optional[str] = None)

Bases: BaseBeamComponent

A TFX component to evaluate models trained by a TFX Trainer component.

Component outputs contains:

See the Evaluator guide for more details.

Construct an Evaluator component.

PARAMETER DESCRIPTION
examples

A BaseChannel of type standard_artifacts.Examples, usually produced by an ExampleGen component. required

TYPE: BaseChannel

model

A BaseChannel of type standard_artifacts.Model, usually produced by a Trainer component.

TYPE: Optional[BaseChannel] DEFAULT: None

baseline_model

An optional channel of type 'standard_artifacts.Model' as the baseline model for model diff and model validation purpose.

TYPE: Optional[BaseChannel] DEFAULT: None

feature_slicing_spec

Deprecated, please use eval_config instead. Only support estimator. evaluator_pb2.FeatureSlicingSpec instance that describes how Evaluator should slice the data.

TYPE: Optional[Union[FeatureSlicingSpec, RuntimeParameter]] DEFAULT: None

fairness_indicator_thresholds

Optional list of float (or RuntimeParameter) threshold values for use with TFMA fairness indicators. Experimental functionality: this interface and functionality may change at any time. TODO(b/142653905): add a link to additional documentation for TFMA fairness indicators here.

TYPE: Optional[Union[List[float], RuntimeParameter]] DEFAULT: None

example_splits

Names of splits on which the metrics are computed. Default behavior (when example_splits is set to None or Empty) is using the 'eval' split.

TYPE: Optional[List[str]] DEFAULT: None

eval_config

Instance of tfma.EvalConfig containg configuration settings for running the evaluation. This config has options for both estimator and Keras.

TYPE: Optional[EvalConfig] DEFAULT: None

schema

A Schema channel to use for TFXIO.

TYPE: Optional[BaseChannel] DEFAULT: None

module_file

A path to python module file containing UDFs for Evaluator customization. This functionality is experimental and may change at any time. The module_file can implement following functions at its top level.

def custom_eval_shared_model(
   eval_saved_model_path, model_name, eval_config, **kwargs,
) -> tfma.EvalSharedModel:
def custom_extractors(
  eval_shared_model, eval_config, tensor_adapter_config,
) -> List[tfma.extractors.Extractor]:

TYPE: Optional[str] DEFAULT: None

module_path

A python path to the custom module that contains the UDFs. See 'module_file' for the required signature of UDFs. This functionality is experimental and this API may change at any time. Note this can not be set together with module_file.

TYPE: Optional[str] DEFAULT: None

METHOD DESCRIPTION
add_downstream_node

Experimental: Add another component that must run after this one.

add_downstream_nodes

Experimental: Add another component that must run after this one.

add_upstream_node

Experimental: Add another component that must run before this one.

add_upstream_nodes

Experimental: Add components that must run before this one.

from_json_dict

Convert from dictionary data to an object.

to_json_dict

Convert from an object to a JSON serializable dictionary.

with_beam_pipeline_args

Add per component Beam pipeline args.

with_platform_config

Attaches a proto-form platform config to a component.

ATTRIBUTE DESCRIPTION
id

Node id, unique across all TFX nodes in a pipeline.

TYPE: str

outputs

Component's output channel dict.

TYPE: Dict[str, OutputChannel]

Source code in tfx/components/evaluator/component.py
def __init__(
    self,
    examples: types.BaseChannel,
    model: Optional[types.BaseChannel] = None,
    baseline_model: Optional[types.BaseChannel] = None,
    # TODO(b/148618405): deprecate feature_slicing_spec.
    feature_slicing_spec: Optional[Union[evaluator_pb2.FeatureSlicingSpec,
                                         data_types.RuntimeParameter]] = None,
    fairness_indicator_thresholds: Optional[Union[
        List[float], data_types.RuntimeParameter]] = None,
    example_splits: Optional[List[str]] = None,
    eval_config: Optional[tfma.EvalConfig] = None,
    schema: Optional[types.BaseChannel] = None,
    module_file: Optional[str] = None,
    module_path: Optional[str] = None):
  """Construct an Evaluator component.

  Args:
    examples: A [BaseChannel][tfx.v1.types.BaseChannel] of type [`standard_artifacts.Examples`][tfx.v1.types.standard_artifacts.Examples], usually
      produced by an ExampleGen component. _required_
    model: A [BaseChannel][tfx.v1.types.BaseChannel] of type [`standard_artifacts.Model`][tfx.v1.types.standard_artifacts.Model], usually produced
      by a [Trainer][tfx.v1.components.Trainer] component.
    baseline_model: An optional channel of type ['standard_artifacts.Model'][tfx.v1.types.standard_artifacts.Model] as
      the baseline model for model diff and model validation purpose.
    feature_slicing_spec: Deprecated, please use eval_config instead. Only
      support estimator.
      [evaluator_pb2.FeatureSlicingSpec](https://github.com/tensorflow/tfx/blob/master/tfx/proto/evaluator.proto)
      instance that describes how Evaluator should slice the data.
    fairness_indicator_thresholds: Optional list of float (or
      [RuntimeParameter][tfx.v1.dsl.experimental.RuntimeParameter]) threshold values for use with TFMA fairness
        indicators. Experimental functionality: this interface and
        functionality may change at any time. TODO(b/142653905): add a link
        to additional documentation for TFMA fairness indicators here.
    example_splits: Names of splits on which the metrics are computed.
      Default behavior (when example_splits is set to None or Empty) is using
      the 'eval' split.
    eval_config: Instance of tfma.EvalConfig containg configuration settings
      for running the evaluation. This config has options for both estimator
      and Keras.
    schema: A `Schema` channel to use for TFXIO.
    module_file: A path to python module file containing UDFs for Evaluator
      customization. This functionality is experimental and may change at any
      time. The module_file can implement following functions at its top
      level.
        ``` {.py .no-copy}
        def custom_eval_shared_model(
           eval_saved_model_path, model_name, eval_config, **kwargs,
        ) -> tfma.EvalSharedModel:
        ```
        ``` {.py .no-copy}
        def custom_extractors(
          eval_shared_model, eval_config, tensor_adapter_config,
        ) -> List[tfma.extractors.Extractor]:
        ```
    module_path: A python path to the custom module that contains the UDFs.
      See 'module_file' for the required signature of UDFs. This functionality
      is experimental and this API may change at any time. Note this can not
      be set together with module_file.
  """
  if bool(module_file) and bool(module_path):
    raise ValueError(
        'Python module path can not be set together with module file path.')

  if eval_config is not None and feature_slicing_spec is not None:
    raise ValueError("Exactly one of 'eval_config' or 'feature_slicing_spec' "
                     'must be supplied.')
  if eval_config is None and feature_slicing_spec is None:
    feature_slicing_spec = evaluator_pb2.FeatureSlicingSpec()
    logging.info('Neither eval_config nor feature_slicing_spec is passed, '
                 'the model is treated as estimator.')

  if feature_slicing_spec:
    logging.warning('feature_slicing_spec is deprecated, please use '
                    'eval_config instead.')

  blessing = types.Channel(type=standard_artifacts.ModelBlessing)
  evaluation = types.Channel(type=standard_artifacts.ModelEvaluation)
  spec = standard_component_specs.EvaluatorSpec(
      examples=examples,
      model=model,
      baseline_model=baseline_model,
      feature_slicing_spec=feature_slicing_spec,
      fairness_indicator_thresholds=(
          fairness_indicator_thresholds if isinstance(
              fairness_indicator_thresholds, data_types.RuntimeParameter) else
          json_utils.dumps(fairness_indicator_thresholds)),
      example_splits=json_utils.dumps(example_splits),
      evaluation=evaluation,
      eval_config=eval_config,
      blessing=blessing,
      schema=schema,
      module_file=module_file,
      module_path=module_path)
  super().__init__(spec=spec)

  if udf_utils.should_package_user_modules():
    # In this case, the `MODULE_PATH_KEY` execution property will be injected
    # as a reference to the given user module file after packaging, at which
    # point the `MODULE_FILE_KEY` execution property will be removed.
    udf_utils.add_user_module_dependency(
        self, standard_component_specs.MODULE_FILE_KEY,
        standard_component_specs.MODULE_PATH_KEY)
Attributes
id property writable
id: str

Node id, unique across all TFX nodes in a pipeline.

If id is set by the user, return it directly. Otherwise, return .

RETURNS DESCRIPTION
str

node id.

outputs property
outputs: Dict[str, OutputChannel]

Component's output channel dict.

Functions
add_downstream_node
add_downstream_node(downstream_node)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_node.

PARAMETER DESCRIPTION
downstream_node

a component that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_node(self, downstream_node):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_node`.

  Args:
    downstream_node: a component that must run after this node.
  """
  self._downstream_nodes.add(downstream_node)
  if self not in downstream_node.upstream_nodes:
    downstream_node.add_upstream_node(self)
add_downstream_nodes
add_downstream_nodes(downstream_nodes)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_nodes.

PARAMETER DESCRIPTION
downstream_nodes

a list of components that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_nodes(self, downstream_nodes):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_nodes`.

  Args:
    downstream_nodes: a list of components that must run after this node.
  """
  self._downstream_nodes.update(downstream_nodes)
  for downstream_node in downstream_nodes:
    if self not in downstream_node.upstream_nodes:
      downstream_node.add_upstream_node(self)
add_upstream_node
add_upstream_node(upstream_node)

Experimental: Add another component that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_downstream_node.

PARAMETER DESCRIPTION
upstream_node

a component that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_node(self, upstream_node):
  """Experimental: Add another component that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_downstream_node`.

  Args:
    upstream_node: a component that must run before this node.
  """
  self._upstream_nodes.add(upstream_node)
  if self not in upstream_node.downstream_nodes:
    upstream_node.add_downstream_node(self)
add_upstream_nodes
add_upstream_nodes(upstream_nodes)

Experimental: Add components that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

PARAMETER DESCRIPTION
upstream_nodes

a list of components that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_nodes(self, upstream_nodes):
  """Experimental: Add components that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.


  Args:
    upstream_nodes: a list of components that must run before this node.
  """
  self._upstream_nodes.update(upstream_nodes)
  for upstream_node in upstream_nodes:
    if self not in upstream_node.downstream_nodes:
      upstream_node.add_downstream_node(self)
from_json_dict classmethod
from_json_dict(dict_data: Dict[str, Any]) -> Any

Convert from dictionary data to an object.

Source code in tfx/utils/json_utils.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def from_json_dict(cls, dict_data: Dict[str, Any]) -> Any:
  """Convert from dictionary data to an object."""
  instance = cls.__new__(cls)
  instance.__dict__ = dict_data
  return instance
to_json_dict
to_json_dict() -> Dict[str, Any]

Convert from an object to a JSON serializable dictionary.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def to_json_dict(self) -> Dict[str, Any]:
  """Convert from an object to a JSON serializable dictionary."""
  return dict((k, v)
              for k, v in self.__dict__.items()
              if k not in ['_upstream_nodes', '_downstream_nodes'])
with_beam_pipeline_args
with_beam_pipeline_args(beam_pipeline_args: Iterable[Union[str, Placeholder]]) -> BaseBeamComponent

Add per component Beam pipeline args.

PARAMETER DESCRIPTION
beam_pipeline_args

List of Beam pipeline args to be added to the Beam executor spec.

TYPE: Iterable[Union[str, Placeholder]]

RETURNS DESCRIPTION
BaseBeamComponent

the same component itself.

Source code in tfx/dsl/components/base/base_beam_component.py
def with_beam_pipeline_args(
    self, beam_pipeline_args: Iterable[Union[str, placeholder.Placeholder]]
) -> 'BaseBeamComponent':
  """Add per component Beam pipeline args.

  Args:
    beam_pipeline_args: List of Beam pipeline args to be added to the Beam
      executor spec.

  Returns:
    the same component itself.
  """
  cast(executor_spec.BeamExecutorSpec,
       self.executor_spec).add_beam_pipeline_args(beam_pipeline_args)
  return self
with_platform_config
with_platform_config(config: Message) -> Self

Attaches a proto-form platform config to a component.

The config will be a per-node platform-specific config.

PARAMETER DESCRIPTION
config

platform config to attach to the component.

TYPE: Message

RETURNS DESCRIPTION
Self

the same component itself.

Source code in tfx/dsl/components/base/base_component.py
@doc_controls.do_not_doc_in_subclasses
def with_platform_config(
    self, config: message.Message
) -> typing_extensions.Self:
  """Attaches a proto-form platform config to a component.

  The config will be a per-node platform-specific config.

  Args:
    config: platform config to attach to the component.

  Returns:
    the same component itself.
  """
  self.platform_config = config
  return self

ExampleDiff

ExampleDiff(examples_test: BaseChannel, examples_base: BaseChannel, config: ExampleDiffConfig, include_split_pairs: Optional[List[Tuple[str, str]]] = None)

Bases: BaseBeamComponent

TFX ExampleDiff component.

Computes example level diffs according to an ExampleDiffConfig. See TFDV feature_skew_detector.py for more details.

This executor is under development and may change.

Construct an ExampleDiff component.

PARAMETER DESCRIPTION
examples_test

A BaseChannel of ExamplesPath type, as generated by the ExampleGen component. This needs to contain any splits referenced in include_split_pairs.

TYPE: BaseChannel

examples_base

A second BaseChannel of ExamplesPath type to which examples should be compared. This needs to contain any splits referenced in include_split_pairs.

TYPE: BaseChannel

config

A ExampleDiffConfig that defines configuration for the skew detection pipeline.

TYPE: ExampleDiffConfig

include_split_pairs

Pairs of split names that ExampleDiff should be run on. Default behavior if not supplied is to run on all pairs. Order is (test, base) with respect to examples_test, examples_base.

TYPE: Optional[List[Tuple[str, str]]] DEFAULT: None

METHOD DESCRIPTION
add_downstream_node

Experimental: Add another component that must run after this one.

add_downstream_nodes

Experimental: Add another component that must run after this one.

add_upstream_node

Experimental: Add another component that must run before this one.

add_upstream_nodes

Experimental: Add components that must run before this one.

from_json_dict

Convert from dictionary data to an object.

to_json_dict

Convert from an object to a JSON serializable dictionary.

with_beam_pipeline_args

Add per component Beam pipeline args.

with_platform_config

Attaches a proto-form platform config to a component.

ATTRIBUTE DESCRIPTION
id

Node id, unique across all TFX nodes in a pipeline.

TYPE: str

outputs

Component's output channel dict.

TYPE: Dict[str, OutputChannel]

Source code in tfx/components/example_diff/component.py
def __init__(self,
             examples_test: types.BaseChannel,
             examples_base: types.BaseChannel,
             config: example_diff_pb2.ExampleDiffConfig,
             include_split_pairs: Optional[List[Tuple[str, str]]] = None):
  """Construct an ExampleDiff component.

  Args:
    examples_test: A [BaseChannel][tfx.v1.types.BaseChannel] of `ExamplesPath` type, as generated by the
      [ExampleGen component](../../../guide/examplegen).
      This needs to contain any splits referenced in `include_split_pairs`.
    examples_base: A second [BaseChannel][tfx.v1.types.BaseChannel] of `ExamplesPath` type to which
      `examples` should be compared. This needs to contain any splits
      referenced in `include_split_pairs`.
    config: A ExampleDiffConfig that defines configuration for the skew
      detection pipeline.
    include_split_pairs: Pairs of split names that ExampleDiff should be run
      on. Default behavior if not supplied is to run on all pairs. Order is
      (test, base) with respect to examples_test, examples_base.
  """
  if include_split_pairs is None:
    logging.info('Including all split pairs because include_split_pairs is '
                 'not set.')
  diffs = types.Channel(type=standard_artifacts.ExamplesDiff)
  spec = standard_component_specs.ExampleDiffSpec(
      **{
          standard_component_specs.EXAMPLES_KEY:
              examples_test,
          standard_component_specs.BASELINE_EXAMPLES_KEY:
              examples_base,
          standard_component_specs.INCLUDE_SPLIT_PAIRS_KEY:
              json_utils.dumps(include_split_pairs),
          standard_component_specs.EXAMPLE_DIFF_RESULT_KEY:
              diffs,
          standard_component_specs.EXAMPLE_DIFF_CONFIG_KEY: config
      })
  super().__init__(spec=spec)
Attributes
id property writable
id: str

Node id, unique across all TFX nodes in a pipeline.

If id is set by the user, return it directly. Otherwise, return .

RETURNS DESCRIPTION
str

node id.

outputs property
outputs: Dict[str, OutputChannel]

Component's output channel dict.

Functions
add_downstream_node
add_downstream_node(downstream_node)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_node.

PARAMETER DESCRIPTION
downstream_node

a component that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_node(self, downstream_node):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_node`.

  Args:
    downstream_node: a component that must run after this node.
  """
  self._downstream_nodes.add(downstream_node)
  if self not in downstream_node.upstream_nodes:
    downstream_node.add_upstream_node(self)
add_downstream_nodes
add_downstream_nodes(downstream_nodes)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_nodes.

PARAMETER DESCRIPTION
downstream_nodes

a list of components that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_nodes(self, downstream_nodes):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_nodes`.

  Args:
    downstream_nodes: a list of components that must run after this node.
  """
  self._downstream_nodes.update(downstream_nodes)
  for downstream_node in downstream_nodes:
    if self not in downstream_node.upstream_nodes:
      downstream_node.add_upstream_node(self)
add_upstream_node
add_upstream_node(upstream_node)

Experimental: Add another component that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_downstream_node.

PARAMETER DESCRIPTION
upstream_node

a component that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_node(self, upstream_node):
  """Experimental: Add another component that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_downstream_node`.

  Args:
    upstream_node: a component that must run before this node.
  """
  self._upstream_nodes.add(upstream_node)
  if self not in upstream_node.downstream_nodes:
    upstream_node.add_downstream_node(self)
add_upstream_nodes
add_upstream_nodes(upstream_nodes)

Experimental: Add components that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

PARAMETER DESCRIPTION
upstream_nodes

a list of components that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_nodes(self, upstream_nodes):
  """Experimental: Add components that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.


  Args:
    upstream_nodes: a list of components that must run before this node.
  """
  self._upstream_nodes.update(upstream_nodes)
  for upstream_node in upstream_nodes:
    if self not in upstream_node.downstream_nodes:
      upstream_node.add_downstream_node(self)
from_json_dict classmethod
from_json_dict(dict_data: Dict[str, Any]) -> Any

Convert from dictionary data to an object.

Source code in tfx/utils/json_utils.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def from_json_dict(cls, dict_data: Dict[str, Any]) -> Any:
  """Convert from dictionary data to an object."""
  instance = cls.__new__(cls)
  instance.__dict__ = dict_data
  return instance
to_json_dict
to_json_dict() -> Dict[str, Any]

Convert from an object to a JSON serializable dictionary.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def to_json_dict(self) -> Dict[str, Any]:
  """Convert from an object to a JSON serializable dictionary."""
  return dict((k, v)
              for k, v in self.__dict__.items()
              if k not in ['_upstream_nodes', '_downstream_nodes'])
with_beam_pipeline_args
with_beam_pipeline_args(beam_pipeline_args: Iterable[Union[str, Placeholder]]) -> BaseBeamComponent

Add per component Beam pipeline args.

PARAMETER DESCRIPTION
beam_pipeline_args

List of Beam pipeline args to be added to the Beam executor spec.

TYPE: Iterable[Union[str, Placeholder]]

RETURNS DESCRIPTION
BaseBeamComponent

the same component itself.

Source code in tfx/dsl/components/base/base_beam_component.py
def with_beam_pipeline_args(
    self, beam_pipeline_args: Iterable[Union[str, placeholder.Placeholder]]
) -> 'BaseBeamComponent':
  """Add per component Beam pipeline args.

  Args:
    beam_pipeline_args: List of Beam pipeline args to be added to the Beam
      executor spec.

  Returns:
    the same component itself.
  """
  cast(executor_spec.BeamExecutorSpec,
       self.executor_spec).add_beam_pipeline_args(beam_pipeline_args)
  return self
with_platform_config
with_platform_config(config: Message) -> Self

Attaches a proto-form platform config to a component.

The config will be a per-node platform-specific config.

PARAMETER DESCRIPTION
config

platform config to attach to the component.

TYPE: Message

RETURNS DESCRIPTION
Self

the same component itself.

Source code in tfx/dsl/components/base/base_component.py
@doc_controls.do_not_doc_in_subclasses
def with_platform_config(
    self, config: message.Message
) -> typing_extensions.Self:
  """Attaches a proto-form platform config to a component.

  The config will be a per-node platform-specific config.

  Args:
    config: platform config to attach to the component.

  Returns:
    the same component itself.
  """
  self.platform_config = config
  return self

ExampleValidator

ExampleValidator(statistics: BaseChannel, schema: BaseChannel, exclude_splits: Optional[List[str]] = None, custom_validation_config: Optional[CustomValidationConfig] = None)

Bases: BaseComponent

A TFX component to validate input examples.

The ExampleValidator component uses Tensorflow Data Validation to validate the statistics of some splits on input examples against a schema.

The ExampleValidator component identifies anomalies in training and serving data. The component can be configured to detect different classes of anomalies in the data. It can:

  • perform validity checks by comparing data statistics against a schema that codifies expectations of the user.
  • run custom validations based on an optional SQL-based config.
Schema Based Example Validation

The ExampleValidator component identifies any anomalies in the example data by comparing data statistics computed by the StatisticsGen component against a schema. The schema codifies properties which the input data is expected to satisfy, and is provided and maintained by the user.

Example

# Performs anomaly detection based on statistics and data schema.
validate_stats = ExampleValidator(
    statistics=statistics_gen.outputs['statistics'],
    schema=infer_schema.outputs['schema'])

Component outputs contains:

  • anomalies: Channel of type standard_artifacts.ExampleAnomalies.

See the ExampleValidator guide for more details.

Construct an ExampleValidator component.

PARAMETER DESCRIPTION
statistics

TYPE: BaseChannel

schema

A BaseChannel of type [standard_artifacts.Schema]. required

TYPE: BaseChannel

exclude_splits

Names of splits that the example validator should not validate. Default behavior (when exclude_splits is set to None) is excluding no splits.

TYPE: Optional[List[str]] DEFAULT: None

custom_validation_config

Optional configuration for specifying SQL-based custom validations.

TYPE: Optional[CustomValidationConfig] DEFAULT: None

METHOD DESCRIPTION
add_downstream_node

Experimental: Add another component that must run after this one.

add_downstream_nodes

Experimental: Add another component that must run after this one.

add_upstream_node

Experimental: Add another component that must run before this one.

add_upstream_nodes

Experimental: Add components that must run before this one.

from_json_dict

Convert from dictionary data to an object.

to_json_dict

Convert from an object to a JSON serializable dictionary.

with_platform_config

Attaches a proto-form platform config to a component.

ATTRIBUTE DESCRIPTION
id

Node id, unique across all TFX nodes in a pipeline.

TYPE: str

outputs

Component's output channel dict.

TYPE: Dict[str, OutputChannel]

Source code in tfx/components/example_validator/component.py
def __init__(self,
             statistics: types.BaseChannel,
             schema: types.BaseChannel,
             exclude_splits: Optional[List[str]] = None,
             custom_validation_config: Optional[
                 custom_validation_config_pb2.CustomValidationConfig] = None):
  """Construct an ExampleValidator component.

  Args:
    statistics: A [BaseChannel][tfx.v1.types.BaseChannel] of type [`standard_artifacts.ExampleStatistics`][tfx.v1.types.standard_artifacts.ExampleStatistics].
    schema: A [BaseChannel][tfx.v1.types.BaseChannel] of type [`standard_artifacts.Schema`]. _required_
    exclude_splits: Names of splits that the example validator should not
      validate. Default behavior (when exclude_splits is set to None) is
      excluding no splits.
    custom_validation_config: Optional configuration for specifying SQL-based
      custom validations.
  """
  if exclude_splits is None:
    exclude_splits = []
    logging.info('Excluding no splits because exclude_splits is not set.')
  anomalies = types.Channel(type=standard_artifacts.ExampleAnomalies)
  spec = standard_component_specs.ExampleValidatorSpec(
      statistics=statistics,
      schema=schema,
      exclude_splits=json_utils.dumps(exclude_splits),
      custom_validation_config=custom_validation_config,
      anomalies=anomalies)
  super().__init__(spec=spec)
Attributes
id property writable
id: str

Node id, unique across all TFX nodes in a pipeline.

If id is set by the user, return it directly. Otherwise, return .

RETURNS DESCRIPTION
str

node id.

outputs property
outputs: Dict[str, OutputChannel]

Component's output channel dict.

Functions
add_downstream_node
add_downstream_node(downstream_node)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_node.

PARAMETER DESCRIPTION
downstream_node

a component that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_node(self, downstream_node):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_node`.

  Args:
    downstream_node: a component that must run after this node.
  """
  self._downstream_nodes.add(downstream_node)
  if self not in downstream_node.upstream_nodes:
    downstream_node.add_upstream_node(self)
add_downstream_nodes
add_downstream_nodes(downstream_nodes)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_nodes.

PARAMETER DESCRIPTION
downstream_nodes

a list of components that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_nodes(self, downstream_nodes):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_nodes`.

  Args:
    downstream_nodes: a list of components that must run after this node.
  """
  self._downstream_nodes.update(downstream_nodes)
  for downstream_node in downstream_nodes:
    if self not in downstream_node.upstream_nodes:
      downstream_node.add_upstream_node(self)
add_upstream_node
add_upstream_node(upstream_node)

Experimental: Add another component that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_downstream_node.

PARAMETER DESCRIPTION
upstream_node

a component that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_node(self, upstream_node):
  """Experimental: Add another component that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_downstream_node`.

  Args:
    upstream_node: a component that must run before this node.
  """
  self._upstream_nodes.add(upstream_node)
  if self not in upstream_node.downstream_nodes:
    upstream_node.add_downstream_node(self)
add_upstream_nodes
add_upstream_nodes(upstream_nodes)

Experimental: Add components that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

PARAMETER DESCRIPTION
upstream_nodes

a list of components that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_nodes(self, upstream_nodes):
  """Experimental: Add components that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.


  Args:
    upstream_nodes: a list of components that must run before this node.
  """
  self._upstream_nodes.update(upstream_nodes)
  for upstream_node in upstream_nodes:
    if self not in upstream_node.downstream_nodes:
      upstream_node.add_downstream_node(self)
from_json_dict classmethod
from_json_dict(dict_data: Dict[str, Any]) -> Any

Convert from dictionary data to an object.

Source code in tfx/utils/json_utils.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def from_json_dict(cls, dict_data: Dict[str, Any]) -> Any:
  """Convert from dictionary data to an object."""
  instance = cls.__new__(cls)
  instance.__dict__ = dict_data
  return instance
to_json_dict
to_json_dict() -> Dict[str, Any]

Convert from an object to a JSON serializable dictionary.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def to_json_dict(self) -> Dict[str, Any]:
  """Convert from an object to a JSON serializable dictionary."""
  return dict((k, v)
              for k, v in self.__dict__.items()
              if k not in ['_upstream_nodes', '_downstream_nodes'])
with_platform_config
with_platform_config(config: Message) -> Self

Attaches a proto-form platform config to a component.

The config will be a per-node platform-specific config.

PARAMETER DESCRIPTION
config

platform config to attach to the component.

TYPE: Message

RETURNS DESCRIPTION
Self

the same component itself.

Source code in tfx/dsl/components/base/base_component.py
@doc_controls.do_not_doc_in_subclasses
def with_platform_config(
    self, config: message.Message
) -> typing_extensions.Self:
  """Attaches a proto-form platform config to a component.

  The config will be a per-node platform-specific config.

  Args:
    config: platform config to attach to the component.

  Returns:
    the same component itself.
  """
  self.platform_config = config
  return self

FnArgs

Args to pass to user defined training/tuning function(s).

ATTRIBUTE DESCRIPTION
working_dir

Working dir.

train_files

A list of patterns for train files.

eval_files

A list of patterns for eval files.

train_steps

Number of train steps.

eval_steps

Number of eval steps.

schema_path

A single uri for schema file. Will be None if not specified.

schema_file

Deprecated, use schema_path instead.

transform_graph_path

An optional single uri for transform graph produced by TFT. Will be None if not specified.

transform_output

Deprecated, use transform_graph_path instead.

data_accessor

Contains factories that can create tf.data.Datasets or other means to access the train/eval data. They provide a uniform way of accessing data, regardless of how the data is stored on disk.

serving_model_dir

A single uri for the output directory of the serving model.

eval_model_dir

A single uri for the output directory of the eval model. Note that this is estimator only, Keras doesn't require it for TFMA.

model_run_dir

A single uri for the output directory of model training related files.

base_model

An optional base model path that will be used for this training.

hyperparameters

An optional keras_tuner.HyperParameters config.

custom_config

An optional dictionary passed to the component.

ImportExampleGen

ImportExampleGen(input_base: Optional[str] = None, input_config: Optional[Union[Input, RuntimeParameter]] = None, output_config: Optional[Union[Output, RuntimeParameter]] = None, range_config: Optional[Union[RangeConfig, RuntimeParameter]] = None, payload_format: Optional[int] = FORMAT_TF_EXAMPLE)

Bases: FileBasedExampleGen

Official TFX ImportExampleGen component.

The ImportExampleGen component takes TFRecord files with TF Example data format, and generates train and eval examples for downstream components. This component provides consistent and configurable partition, and it also shuffle the dataset for ML best practice.

Component outputs contains:

Construct an ImportExampleGen component.

PARAMETER DESCRIPTION
input_base

an external directory containing the TFRecord files.

TYPE: Optional[str] DEFAULT: None

input_config

An example_gen_pb2.Input instance, providing input configuration. If unset, the files under input_base will be treated as a single split.

TYPE: Optional[Union[Input, RuntimeParameter]] DEFAULT: None

output_config

An example_gen_pb2.Output instance, providing output configuration. If unset, default splits will be 'train' and 'eval' with size 2:1.

TYPE: Optional[Union[Output, RuntimeParameter]] DEFAULT: None

range_config

An optional range_config_pb2.RangeConfig instance, specifying the range of span values to consider. If unset, driver will default to searching for latest span with no restrictions.

TYPE: Optional[Union[RangeConfig, RuntimeParameter]] DEFAULT: None

payload_format

Payload format of input data. Should be one of example_gen_pb2.PayloadFormat enum. Note that payload format of output data is the same as input.

TYPE: Optional[int] DEFAULT: FORMAT_TF_EXAMPLE

METHOD DESCRIPTION
add_downstream_node

Experimental: Add another component that must run after this one.

add_downstream_nodes

Experimental: Add another component that must run after this one.

add_upstream_node

Experimental: Add another component that must run before this one.

add_upstream_nodes

Experimental: Add components that must run before this one.

from_json_dict

Convert from dictionary data to an object.

to_json_dict

Convert from an object to a JSON serializable dictionary.

with_beam_pipeline_args

Add per component Beam pipeline args.

with_platform_config

Attaches a proto-form platform config to a component.

ATTRIBUTE DESCRIPTION
id

Node id, unique across all TFX nodes in a pipeline.

TYPE: str

outputs

Component's output channel dict.

TYPE: Dict[str, OutputChannel]

Source code in tfx/components/example_gen/import_example_gen/component.py
def __init__(
    self,
    input_base: Optional[str] = None,
    input_config: Optional[Union[example_gen_pb2.Input,
                                 data_types.RuntimeParameter]] = None,
    output_config: Optional[Union[example_gen_pb2.Output,
                                  data_types.RuntimeParameter]] = None,
    range_config: Optional[Union[range_config_pb2.RangeConfig,
                                 data_types.RuntimeParameter]] = None,
    payload_format: Optional[int] = example_gen_pb2.FORMAT_TF_EXAMPLE):
  """Construct an ImportExampleGen component.

  Args:
    input_base: an external directory containing the TFRecord files.
    input_config: An example_gen_pb2.Input instance, providing input
      configuration. If unset, the files under input_base will be treated as a
      single split.
    output_config: An example_gen_pb2.Output instance, providing output
      configuration. If unset, default splits will be 'train' and 'eval' with
      size 2:1.
    range_config: An optional range_config_pb2.RangeConfig instance,
      specifying the range of span values to consider. If unset, driver will
      default to searching for latest span with no restrictions.
    payload_format: Payload format of input data. Should be one of
      example_gen_pb2.PayloadFormat enum. Note that payload format of output
      data is the same as input.
  """
  super().__init__(
      input_base=input_base,
      input_config=input_config,
      output_config=output_config,
      range_config=range_config,
      output_data_format=payload_format)
Attributes
id property writable
id: str

Node id, unique across all TFX nodes in a pipeline.

If id is set by the user, return it directly. Otherwise, return .

RETURNS DESCRIPTION
str

node id.

outputs property
outputs: Dict[str, OutputChannel]

Component's output channel dict.

Functions
add_downstream_node
add_downstream_node(downstream_node)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_node.

PARAMETER DESCRIPTION
downstream_node

a component that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_node(self, downstream_node):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_node`.

  Args:
    downstream_node: a component that must run after this node.
  """
  self._downstream_nodes.add(downstream_node)
  if self not in downstream_node.upstream_nodes:
    downstream_node.add_upstream_node(self)
add_downstream_nodes
add_downstream_nodes(downstream_nodes)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_nodes.

PARAMETER DESCRIPTION
downstream_nodes

a list of components that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_nodes(self, downstream_nodes):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_nodes`.

  Args:
    downstream_nodes: a list of components that must run after this node.
  """
  self._downstream_nodes.update(downstream_nodes)
  for downstream_node in downstream_nodes:
    if self not in downstream_node.upstream_nodes:
      downstream_node.add_upstream_node(self)
add_upstream_node
add_upstream_node(upstream_node)

Experimental: Add another component that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_downstream_node.

PARAMETER DESCRIPTION
upstream_node

a component that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_node(self, upstream_node):
  """Experimental: Add another component that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_downstream_node`.

  Args:
    upstream_node: a component that must run before this node.
  """
  self._upstream_nodes.add(upstream_node)
  if self not in upstream_node.downstream_nodes:
    upstream_node.add_downstream_node(self)
add_upstream_nodes
add_upstream_nodes(upstream_nodes)

Experimental: Add components that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

PARAMETER DESCRIPTION
upstream_nodes

a list of components that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_nodes(self, upstream_nodes):
  """Experimental: Add components that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.


  Args:
    upstream_nodes: a list of components that must run before this node.
  """
  self._upstream_nodes.update(upstream_nodes)
  for upstream_node in upstream_nodes:
    if self not in upstream_node.downstream_nodes:
      upstream_node.add_downstream_node(self)
from_json_dict classmethod
from_json_dict(dict_data: Dict[str, Any]) -> Any

Convert from dictionary data to an object.

Source code in tfx/utils/json_utils.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def from_json_dict(cls, dict_data: Dict[str, Any]) -> Any:
  """Convert from dictionary data to an object."""
  instance = cls.__new__(cls)
  instance.__dict__ = dict_data
  return instance
to_json_dict
to_json_dict() -> Dict[str, Any]

Convert from an object to a JSON serializable dictionary.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def to_json_dict(self) -> Dict[str, Any]:
  """Convert from an object to a JSON serializable dictionary."""
  return dict((k, v)
              for k, v in self.__dict__.items()
              if k not in ['_upstream_nodes', '_downstream_nodes'])
with_beam_pipeline_args
with_beam_pipeline_args(beam_pipeline_args: Iterable[Union[str, Placeholder]]) -> BaseBeamComponent

Add per component Beam pipeline args.

PARAMETER DESCRIPTION
beam_pipeline_args

List of Beam pipeline args to be added to the Beam executor spec.

TYPE: Iterable[Union[str, Placeholder]]

RETURNS DESCRIPTION
BaseBeamComponent

the same component itself.

Source code in tfx/dsl/components/base/base_beam_component.py
def with_beam_pipeline_args(
    self, beam_pipeline_args: Iterable[Union[str, placeholder.Placeholder]]
) -> 'BaseBeamComponent':
  """Add per component Beam pipeline args.

  Args:
    beam_pipeline_args: List of Beam pipeline args to be added to the Beam
      executor spec.

  Returns:
    the same component itself.
  """
  cast(executor_spec.BeamExecutorSpec,
       self.executor_spec).add_beam_pipeline_args(beam_pipeline_args)
  return self
with_platform_config
with_platform_config(config: Message) -> Self

Attaches a proto-form platform config to a component.

The config will be a per-node platform-specific config.

PARAMETER DESCRIPTION
config

platform config to attach to the component.

TYPE: Message

RETURNS DESCRIPTION
Self

the same component itself.

Source code in tfx/dsl/components/base/base_component.py
@doc_controls.do_not_doc_in_subclasses
def with_platform_config(
    self, config: message.Message
) -> typing_extensions.Self:
  """Attaches a proto-form platform config to a component.

  The config will be a per-node platform-specific config.

  Args:
    config: platform config to attach to the component.

  Returns:
    the same component itself.
  """
  self.platform_config = config
  return self

ImportSchemaGen

ImportSchemaGen(schema_file: str)

Bases: BaseComponent

A TFX ImportSchemaGen component to import a schema file into the pipeline.

ImportSchemaGen is a specialized SchemaGen which imports a pre-defined schema file into the pipeline.

In a typical TFX pipeline, users are expected to review the schemas generated with SchemaGen and store them in SCM or equivalent. Those schema files can be brought back to pipelines using ImportSchemaGen.

Here is an example to use the ImportSchemaGen:

schema_gen = ImportSchemaGen(schema_file=schema_path)

Component outputs contains:

  • schema: Channel of type standard_artifacts.Schema for schema result.

See the SchemaGen guide for more details.

ImportSchemaGen works almost similar to Importer except following:

  • schema_file should be the full file path instead of directory holding it.
  • schema_file is copied to the output artifact. This is different from Importer that loads an "Artifact" by setting its URI to the given path.

Init function for the ImportSchemaGen.

PARAMETER DESCRIPTION
schema_file

File path to the input schema file. This file will be copied to the output artifact which is generated inside the pipeline root directory.

TYPE: str

METHOD DESCRIPTION
add_downstream_node

Experimental: Add another component that must run after this one.

add_downstream_nodes

Experimental: Add another component that must run after this one.

add_upstream_node

Experimental: Add another component that must run before this one.

add_upstream_nodes

Experimental: Add components that must run before this one.

from_json_dict

Convert from dictionary data to an object.

to_json_dict

Convert from an object to a JSON serializable dictionary.

with_platform_config

Attaches a proto-form platform config to a component.

ATTRIBUTE DESCRIPTION
id

Node id, unique across all TFX nodes in a pipeline.

TYPE: str

outputs

Component's output channel dict.

TYPE: Dict[str, OutputChannel]

Source code in tfx/components/schema_gen/import_schema_gen/component.py
def __init__(self, schema_file: str):
  """Init function for the ImportSchemaGen.

  Args:
    schema_file: File path to the input schema file. This file will be copied
      to the output artifact which is generated inside the pipeline root
      directory.
  """
  spec = standard_component_specs.ImportSchemaGenSpec(
      schema_file=schema_file,
      schema=types.Channel(type=standard_artifacts.Schema))
  super().__init__(spec=spec)
Attributes
id property writable
id: str

Node id, unique across all TFX nodes in a pipeline.

If id is set by the user, return it directly. Otherwise, return .

RETURNS DESCRIPTION
str

node id.

outputs property
outputs: Dict[str, OutputChannel]

Component's output channel dict.

Functions
add_downstream_node
add_downstream_node(downstream_node)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_node.

PARAMETER DESCRIPTION
downstream_node

a component that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_node(self, downstream_node):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_node`.

  Args:
    downstream_node: a component that must run after this node.
  """
  self._downstream_nodes.add(downstream_node)
  if self not in downstream_node.upstream_nodes:
    downstream_node.add_upstream_node(self)
add_downstream_nodes
add_downstream_nodes(downstream_nodes)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_nodes.

PARAMETER DESCRIPTION
downstream_nodes

a list of components that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_nodes(self, downstream_nodes):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_nodes`.

  Args:
    downstream_nodes: a list of components that must run after this node.
  """
  self._downstream_nodes.update(downstream_nodes)
  for downstream_node in downstream_nodes:
    if self not in downstream_node.upstream_nodes:
      downstream_node.add_upstream_node(self)
add_upstream_node
add_upstream_node(upstream_node)

Experimental: Add another component that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_downstream_node.

PARAMETER DESCRIPTION
upstream_node

a component that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_node(self, upstream_node):
  """Experimental: Add another component that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_downstream_node`.

  Args:
    upstream_node: a component that must run before this node.
  """
  self._upstream_nodes.add(upstream_node)
  if self not in upstream_node.downstream_nodes:
    upstream_node.add_downstream_node(self)
add_upstream_nodes
add_upstream_nodes(upstream_nodes)

Experimental: Add components that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

PARAMETER DESCRIPTION
upstream_nodes

a list of components that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_nodes(self, upstream_nodes):
  """Experimental: Add components that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.


  Args:
    upstream_nodes: a list of components that must run before this node.
  """
  self._upstream_nodes.update(upstream_nodes)
  for upstream_node in upstream_nodes:
    if self not in upstream_node.downstream_nodes:
      upstream_node.add_downstream_node(self)
from_json_dict classmethod
from_json_dict(dict_data: Dict[str, Any]) -> Any

Convert from dictionary data to an object.

Source code in tfx/utils/json_utils.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def from_json_dict(cls, dict_data: Dict[str, Any]) -> Any:
  """Convert from dictionary data to an object."""
  instance = cls.__new__(cls)
  instance.__dict__ = dict_data
  return instance
to_json_dict
to_json_dict() -> Dict[str, Any]

Convert from an object to a JSON serializable dictionary.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def to_json_dict(self) -> Dict[str, Any]:
  """Convert from an object to a JSON serializable dictionary."""
  return dict((k, v)
              for k, v in self.__dict__.items()
              if k not in ['_upstream_nodes', '_downstream_nodes'])
with_platform_config
with_platform_config(config: Message) -> Self

Attaches a proto-form platform config to a component.

The config will be a per-node platform-specific config.

PARAMETER DESCRIPTION
config

platform config to attach to the component.

TYPE: Message

RETURNS DESCRIPTION
Self

the same component itself.

Source code in tfx/dsl/components/base/base_component.py
@doc_controls.do_not_doc_in_subclasses
def with_platform_config(
    self, config: message.Message
) -> typing_extensions.Self:
  """Attaches a proto-form platform config to a component.

  The config will be a per-node platform-specific config.

  Args:
    config: platform config to attach to the component.

  Returns:
    the same component itself.
  """
  self.platform_config = config
  return self

InfraValidator

InfraValidator(model: BaseChannel, serving_spec: ServingSpec, examples: Optional[BaseChannel] = None, request_spec: Optional[RequestSpec] = None, validation_spec: Optional[ValidationSpec] = None)

Bases: BaseComponent

A TFX component to validate the model against the serving infrastructure.

An infra validation is done by loading the model to the exactly same serving binary that is used in production, and additionaly sending some requests to the model server. Such requests can be specified from Examples artifact.

Examples

Full example using TensorFlowServing binary running on local docker.

infra_validator = InfraValidator(
    model=trainer.outputs['model'],
    examples=test_example_gen.outputs['examples'],
    serving_spec=ServingSpec(
        tensorflow_serving=TensorFlowServing(  # Using TF Serving.
            tags=['latest']
        ),
        local_docker=LocalDockerConfig(),  # Running on local docker.
    ),
    validation_spec=ValidationSpec(
        max_loading_time_seconds=60,
        num_tries=5,
    ),
    request_spec=RequestSpec(
        tensorflow_serving=TensorFlowServingRequestSpec(),
        num_examples=1,
    )
)

Minimal example when running on Kubernetes.

infra_validator = InfraValidator(
    model=trainer.outputs['model'],
    examples=test_example_gen.outputs['examples'],
    serving_spec=ServingSpec(
        tensorflow_serving=TensorFlowServing(
            tags=['latest']
        ),
        kubernetes=KubernetesConfig(),  # Running on Kubernetes.
    ),
)

Component outputs contains:

See the InfraValidator guide for more details.

Construct a InfraValidator component.

PARAMETER DESCRIPTION
model

A BaseChannel of ModelExportPath type, usually produced by Trainer component. required

TYPE: BaseChannel

serving_spec

A ServingSpec configuration about serving binary and test platform config to launch model server for validation. required

TYPE: ServingSpec

examples

A BaseChannel of ExamplesPath type, usually produced by ExampleGen component. If not specified, InfraValidator does not issue requests for validation.

TYPE: Optional[BaseChannel] DEFAULT: None

request_spec

Optional RequestSpec configuration about making requests from examples input. If not specified, InfraValidator does not issue requests for validation.

TYPE: Optional[RequestSpec] DEFAULT: None

validation_spec

Optional ValidationSpec configuration.

TYPE: Optional[ValidationSpec] DEFAULT: None

METHOD DESCRIPTION
add_downstream_node

Experimental: Add another component that must run after this one.

add_downstream_nodes

Experimental: Add another component that must run after this one.

add_upstream_node

Experimental: Add another component that must run before this one.

add_upstream_nodes

Experimental: Add components that must run before this one.

from_json_dict

Convert from dictionary data to an object.

to_json_dict

Convert from an object to a JSON serializable dictionary.

with_platform_config

Attaches a proto-form platform config to a component.

ATTRIBUTE DESCRIPTION
id

Node id, unique across all TFX nodes in a pipeline.

TYPE: str

outputs

Component's output channel dict.

TYPE: Dict[str, OutputChannel]

Source code in tfx/components/infra_validator/component.py
def __init__(
    self,
    model: types.BaseChannel,
    serving_spec: infra_validator_pb2.ServingSpec,
    examples: Optional[types.BaseChannel] = None,
    request_spec: Optional[infra_validator_pb2.RequestSpec] = None,
    validation_spec: Optional[infra_validator_pb2.ValidationSpec] = None):
  """Construct a InfraValidator component.

  Args:
    model: A [`BaseChannel`][tfx.v1.types.BaseChannel] of `ModelExportPath` type, usually produced by
      [Trainer](../../../guide/trainer) component.
        _required_
    serving_spec: A `ServingSpec` configuration about serving binary and test
      platform config to launch model server for validation. _required_
    examples: A [`BaseChannel`][tfx.v1.types.BaseChannel] of `ExamplesPath` type, usually produced by
      [ExampleGen](../../../guide/examplegen) component.
        If not specified, InfraValidator does not issue requests for
        validation.
    request_spec: Optional `RequestSpec` configuration about making requests
      from `examples` input. If not specified, InfraValidator does not issue
      requests for validation.
    validation_spec: Optional `ValidationSpec` configuration.
  """
  blessing = types.Channel(type=standard_artifacts.InfraBlessing)
  spec = standard_component_specs.InfraValidatorSpec(
      model=model,
      examples=examples,
      blessing=blessing,
      serving_spec=serving_spec,
      validation_spec=validation_spec,
      request_spec=request_spec)
  super().__init__(spec=spec)
Attributes
id property writable
id: str

Node id, unique across all TFX nodes in a pipeline.

If id is set by the user, return it directly. Otherwise, return .

RETURNS DESCRIPTION
str

node id.

outputs property
outputs: Dict[str, OutputChannel]

Component's output channel dict.

Functions
add_downstream_node
add_downstream_node(downstream_node)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_node.

PARAMETER DESCRIPTION
downstream_node

a component that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_node(self, downstream_node):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_node`.

  Args:
    downstream_node: a component that must run after this node.
  """
  self._downstream_nodes.add(downstream_node)
  if self not in downstream_node.upstream_nodes:
    downstream_node.add_upstream_node(self)
add_downstream_nodes
add_downstream_nodes(downstream_nodes)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_nodes.

PARAMETER DESCRIPTION
downstream_nodes

a list of components that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_nodes(self, downstream_nodes):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_nodes`.

  Args:
    downstream_nodes: a list of components that must run after this node.
  """
  self._downstream_nodes.update(downstream_nodes)
  for downstream_node in downstream_nodes:
    if self not in downstream_node.upstream_nodes:
      downstream_node.add_upstream_node(self)
add_upstream_node
add_upstream_node(upstream_node)

Experimental: Add another component that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_downstream_node.

PARAMETER DESCRIPTION
upstream_node

a component that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_node(self, upstream_node):
  """Experimental: Add another component that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_downstream_node`.

  Args:
    upstream_node: a component that must run before this node.
  """
  self._upstream_nodes.add(upstream_node)
  if self not in upstream_node.downstream_nodes:
    upstream_node.add_downstream_node(self)
add_upstream_nodes
add_upstream_nodes(upstream_nodes)

Experimental: Add components that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

PARAMETER DESCRIPTION
upstream_nodes

a list of components that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_nodes(self, upstream_nodes):
  """Experimental: Add components that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.


  Args:
    upstream_nodes: a list of components that must run before this node.
  """
  self._upstream_nodes.update(upstream_nodes)
  for upstream_node in upstream_nodes:
    if self not in upstream_node.downstream_nodes:
      upstream_node.add_downstream_node(self)
from_json_dict classmethod
from_json_dict(dict_data: Dict[str, Any]) -> Any

Convert from dictionary data to an object.

Source code in tfx/utils/json_utils.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def from_json_dict(cls, dict_data: Dict[str, Any]) -> Any:
  """Convert from dictionary data to an object."""
  instance = cls.__new__(cls)
  instance.__dict__ = dict_data
  return instance
to_json_dict
to_json_dict() -> Dict[str, Any]

Convert from an object to a JSON serializable dictionary.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def to_json_dict(self) -> Dict[str, Any]:
  """Convert from an object to a JSON serializable dictionary."""
  return dict((k, v)
              for k, v in self.__dict__.items()
              if k not in ['_upstream_nodes', '_downstream_nodes'])
with_platform_config
with_platform_config(config: Message) -> Self

Attaches a proto-form platform config to a component.

The config will be a per-node platform-specific config.

PARAMETER DESCRIPTION
config

platform config to attach to the component.

TYPE: Message

RETURNS DESCRIPTION
Self

the same component itself.

Source code in tfx/dsl/components/base/base_component.py
@doc_controls.do_not_doc_in_subclasses
def with_platform_config(
    self, config: message.Message
) -> typing_extensions.Self:
  """Attaches a proto-form platform config to a component.

  The config will be a per-node platform-specific config.

  Args:
    config: platform config to attach to the component.

  Returns:
    the same component itself.
  """
  self.platform_config = config
  return self

Pusher

Pusher(model: Optional[BaseChannel] = None, model_blessing: Optional[BaseChannel] = None, infra_blessing: Optional[BaseChannel] = None, push_destination: Optional[Union[PushDestination, RuntimeParameter]] = None, custom_config: Optional[Dict[str, Any]] = None, custom_executor_spec: Optional[ExecutorSpec] = None)

Bases: BaseComponent

A TFX component to push validated TensorFlow models to a model serving platform.

The Pusher component can be used to push an validated SavedModel from output of the Trainer component to TensorFlow Serving. The Pusher will check the validation results from the Evaluator component and InfraValidator component before deploying the model. If the model has not been blessed, then the model will not be pushed.

Note

The executor for this component can be overriden to enable the model to be pushed to other serving platforms than tf.serving. The Cloud AI Platform custom executor provides an example how to implement this.

Example

# Checks whether the model passed the validation steps and pushes the model
# to a file destination if check passed.
pusher = Pusher(
    model=trainer.outputs['model'],
    model_blessing=evaluator.outputs['blessing'],
    push_destination=proto.PushDestination(
        filesystem=proto.PushDestination.Filesystem(
            base_directory=serving_model_dir,
        )
    ),
)

Component outputs contains:

See the Pusher guide for more details.

Construct a Pusher component.

PARAMETER DESCRIPTION
model

An optional BaseChannel of type standard_artifacts.Model, usually produced by a Trainer component.

TYPE: Optional[BaseChannel] DEFAULT: None

model_blessing

An optional BaseChannel of type standard_artifacts.ModelBlessing, usually produced from an Evaluator component.

TYPE: Optional[BaseChannel] DEFAULT: None

infra_blessing

An optional BaseChannel of type standard_artifacts.InfraBlessing, usually produced from an InfraValidator component.

TYPE: Optional[BaseChannel] DEFAULT: None

push_destination

A pusher_pb2.PushDestination instance, providing info for tensorflow serving to load models. Optional if executor_class doesn't require push_destination.

TYPE: Optional[Union[PushDestination, RuntimeParameter]] DEFAULT: None

custom_config

A dict which contains the deployment job parameters to be passed to Cloud platforms.

TYPE: Optional[Dict[str, Any]] DEFAULT: None

custom_executor_spec

Optional custom executor spec. Deprecated (no compatibility guarantee), please customize component directly.

TYPE: Optional[ExecutorSpec] DEFAULT: None

METHOD DESCRIPTION
add_downstream_node

Experimental: Add another component that must run after this one.

add_downstream_nodes

Experimental: Add another component that must run after this one.

add_upstream_node

Experimental: Add another component that must run before this one.

add_upstream_nodes

Experimental: Add components that must run before this one.

from_json_dict

Convert from dictionary data to an object.

to_json_dict

Convert from an object to a JSON serializable dictionary.

with_platform_config

Attaches a proto-form platform config to a component.

ATTRIBUTE DESCRIPTION
id

Node id, unique across all TFX nodes in a pipeline.

TYPE: str

outputs

Component's output channel dict.

TYPE: Dict[str, OutputChannel]

Source code in tfx/components/pusher/component.py
def __init__(
    self,
    model: Optional[types.BaseChannel] = None,
    model_blessing: Optional[types.BaseChannel] = None,
    infra_blessing: Optional[types.BaseChannel] = None,
    push_destination: Optional[Union[pusher_pb2.PushDestination,
                                     data_types.RuntimeParameter]] = None,
    custom_config: Optional[Dict[str, Any]] = None,
    custom_executor_spec: Optional[executor_spec.ExecutorSpec] = None):
  """Construct a Pusher component.

  Args:
    model: An optional [BaseChannel][tfx.v1.types.BaseChannel] of type `standard_artifacts.Model`, usually
      produced by a [Trainer][tfx.v1.components.Trainer] component.
    model_blessing: An optional [BaseChannel][tfx.v1.types.BaseChannel] of type
      [`standard_artifacts.ModelBlessing`][tfx.v1.types.standard_artifacts.ModelBlessing],
      usually produced from an [Evaluator][tfx.v1.components.Evaluator] component.
    infra_blessing: An optional [BaseChannel][tfx.v1.types.BaseChannel] of type
      [`standard_artifacts.InfraBlessing`][tfx.v1.types.standard_artifacts.InfraBlessing],
      usually produced from an [InfraValidator][tfx.v1.components.InfraValidator] component.
    push_destination: A pusher_pb2.PushDestination instance, providing info
      for tensorflow serving to load models. Optional if executor_class
      doesn't require push_destination.
    custom_config: A dict which contains the deployment job parameters to be
      passed to Cloud platforms.
    custom_executor_spec: Optional custom executor spec. Deprecated (no
      compatibility guarantee), please customize component directly.
  """
  pushed_model = types.Channel(type=standard_artifacts.PushedModel)
  if (push_destination is None and not custom_executor_spec and
      self.EXECUTOR_SPEC.executor_class == executor.Executor):
    raise ValueError('push_destination is required unless a '
                     'custom_executor_spec is supplied that does not require '
                     'it.')
  if custom_executor_spec:
    logging.warning(
        '`custom_executor_spec` is deprecated. Please customize component directly.'
    )
  if model is None and infra_blessing is None:
    raise ValueError(
        'Either one of model or infra_blessing channel should be given. '
        'If infra_blessing is used in place of model, it must have been '
        'created with InfraValidator with RequestSpec.make_warmup = True. '
        'This cannot be checked during pipeline construction time but will '
        'raise runtime error if infra_blessing does not contain a model.')
  spec = standard_component_specs.PusherSpec(
      model=model,
      model_blessing=model_blessing,
      infra_blessing=infra_blessing,
      push_destination=push_destination,
      custom_config=json_utils.dumps(custom_config),
      pushed_model=pushed_model)
  super().__init__(spec=spec, custom_executor_spec=custom_executor_spec)
Attributes
id property writable
id: str

Node id, unique across all TFX nodes in a pipeline.

If id is set by the user, return it directly. Otherwise, return .

RETURNS DESCRIPTION
str

node id.

outputs property
outputs: Dict[str, OutputChannel]

Component's output channel dict.

Functions
add_downstream_node
add_downstream_node(downstream_node)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_node.

PARAMETER DESCRIPTION
downstream_node

a component that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_node(self, downstream_node):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_node`.

  Args:
    downstream_node: a component that must run after this node.
  """
  self._downstream_nodes.add(downstream_node)
  if self not in downstream_node.upstream_nodes:
    downstream_node.add_upstream_node(self)
add_downstream_nodes
add_downstream_nodes(downstream_nodes)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_nodes.

PARAMETER DESCRIPTION
downstream_nodes

a list of components that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_nodes(self, downstream_nodes):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_nodes`.

  Args:
    downstream_nodes: a list of components that must run after this node.
  """
  self._downstream_nodes.update(downstream_nodes)
  for downstream_node in downstream_nodes:
    if self not in downstream_node.upstream_nodes:
      downstream_node.add_upstream_node(self)
add_upstream_node
add_upstream_node(upstream_node)

Experimental: Add another component that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_downstream_node.

PARAMETER DESCRIPTION
upstream_node

a component that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_node(self, upstream_node):
  """Experimental: Add another component that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_downstream_node`.

  Args:
    upstream_node: a component that must run before this node.
  """
  self._upstream_nodes.add(upstream_node)
  if self not in upstream_node.downstream_nodes:
    upstream_node.add_downstream_node(self)
add_upstream_nodes
add_upstream_nodes(upstream_nodes)

Experimental: Add components that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

PARAMETER DESCRIPTION
upstream_nodes

a list of components that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_nodes(self, upstream_nodes):
  """Experimental: Add components that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.


  Args:
    upstream_nodes: a list of components that must run before this node.
  """
  self._upstream_nodes.update(upstream_nodes)
  for upstream_node in upstream_nodes:
    if self not in upstream_node.downstream_nodes:
      upstream_node.add_downstream_node(self)
from_json_dict classmethod
from_json_dict(dict_data: Dict[str, Any]) -> Any

Convert from dictionary data to an object.

Source code in tfx/utils/json_utils.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def from_json_dict(cls, dict_data: Dict[str, Any]) -> Any:
  """Convert from dictionary data to an object."""
  instance = cls.__new__(cls)
  instance.__dict__ = dict_data
  return instance
to_json_dict
to_json_dict() -> Dict[str, Any]

Convert from an object to a JSON serializable dictionary.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def to_json_dict(self) -> Dict[str, Any]:
  """Convert from an object to a JSON serializable dictionary."""
  return dict((k, v)
              for k, v in self.__dict__.items()
              if k not in ['_upstream_nodes', '_downstream_nodes'])
with_platform_config
with_platform_config(config: Message) -> Self

Attaches a proto-form platform config to a component.

The config will be a per-node platform-specific config.

PARAMETER DESCRIPTION
config

platform config to attach to the component.

TYPE: Message

RETURNS DESCRIPTION
Self

the same component itself.

Source code in tfx/dsl/components/base/base_component.py
@doc_controls.do_not_doc_in_subclasses
def with_platform_config(
    self, config: message.Message
) -> typing_extensions.Self:
  """Attaches a proto-form platform config to a component.

  The config will be a per-node platform-specific config.

  Args:
    config: platform config to attach to the component.

  Returns:
    the same component itself.
  """
  self.platform_config = config
  return self

SchemaGen

SchemaGen(statistics: BaseChannel, infer_feature_shape: Optional[Union[bool, RuntimeParameter]] = True, exclude_splits: Optional[List[str]] = None)

Bases: BaseComponent

A TFX SchemaGen component to generate a schema from the training data.

The SchemaGen component uses TensorFlow Data Validation to generate a schema from input statistics. The following TFX libraries use the schema: - TensorFlow Data Validation - TensorFlow Transform - TensorFlow Model Analysis

In a typical TFX pipeline, the SchemaGen component generates a schema which is consumed by the other pipeline components.

Example

# Generates schema based on statistics files.
infer_schema = SchemaGen(statistics=statistics_gen.outputs['statistics'])

Component outputs contains:

See the SchemaGen guide for more details.

Constructs a SchemaGen component.

PARAMETER DESCRIPTION
statistics

A BaseChannel of ExampleStatistics type (required if spec is not passed). This should contain at least a train split. Other splits are currently ignored. required

TYPE: BaseChannel

infer_feature_shape

Boolean (or RuntimeParameter) value indicating whether or not to infer the shape of features. If the feature shape is not inferred, downstream Tensorflow Transform component using the schema will parse input as tf.SparseTensor. Default to True if not set.

TYPE: Optional[Union[bool, RuntimeParameter]] DEFAULT: True

exclude_splits

Names of splits that will not be taken into consideration when auto-generating a schema. Default behavior (when exclude_splits is set to None) is excluding no splits.

TYPE: Optional[List[str]] DEFAULT: None

METHOD DESCRIPTION
add_downstream_node

Experimental: Add another component that must run after this one.

add_downstream_nodes

Experimental: Add another component that must run after this one.

add_upstream_node

Experimental: Add another component that must run before this one.

add_upstream_nodes

Experimental: Add components that must run before this one.

from_json_dict

Convert from dictionary data to an object.

to_json_dict

Convert from an object to a JSON serializable dictionary.

with_platform_config

Attaches a proto-form platform config to a component.

ATTRIBUTE DESCRIPTION
id

Node id, unique across all TFX nodes in a pipeline.

TYPE: str

outputs

Component's output channel dict.

TYPE: Dict[str, OutputChannel]

Source code in tfx/components/schema_gen/component.py
def __init__(
    self,
    statistics: types.BaseChannel,
    infer_feature_shape: Optional[Union[bool,
                                        data_types.RuntimeParameter]] = True,
    exclude_splits: Optional[List[str]] = None):
  """Constructs a SchemaGen component.

  Args:
    statistics: A [BaseChannel][tfx.v1.types.BaseChannel]
      of `ExampleStatistics` type (required if spec is not passed).
      This should contain at least a `train` split. Other splits
      are currently ignored. _required_
    infer_feature_shape: Boolean (or [RuntimeParameter][tfx.v1.dsl.experimental.RuntimeParameter]) value indicating
      whether or not to infer the shape of features. If the feature shape is
      not inferred, downstream Tensorflow Transform component using the schema
      will parse input as tf.SparseTensor. Default to True if not set.
    exclude_splits: Names of splits that will not be taken into consideration
      when auto-generating a schema. Default behavior (when exclude_splits is
      set to None) is excluding no splits.
  """
  if exclude_splits is None:
    exclude_splits = []
    logging.info('Excluding no splits because exclude_splits is not set.')
  schema = types.Channel(type=standard_artifacts.Schema)
  if isinstance(infer_feature_shape, bool):
    infer_feature_shape = int(infer_feature_shape)
  spec = standard_component_specs.SchemaGenSpec(
      statistics=statistics,
      infer_feature_shape=infer_feature_shape,
      exclude_splits=json_utils.dumps(exclude_splits),
      schema=schema)
  super().__init__(spec=spec)
Attributes
id property writable
id: str

Node id, unique across all TFX nodes in a pipeline.

If id is set by the user, return it directly. Otherwise, return .

RETURNS DESCRIPTION
str

node id.

outputs property
outputs: Dict[str, OutputChannel]

Component's output channel dict.

Functions
add_downstream_node
add_downstream_node(downstream_node)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_node.

PARAMETER DESCRIPTION
downstream_node

a component that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_node(self, downstream_node):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_node`.

  Args:
    downstream_node: a component that must run after this node.
  """
  self._downstream_nodes.add(downstream_node)
  if self not in downstream_node.upstream_nodes:
    downstream_node.add_upstream_node(self)
add_downstream_nodes
add_downstream_nodes(downstream_nodes)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_nodes.

PARAMETER DESCRIPTION
downstream_nodes

a list of components that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_nodes(self, downstream_nodes):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_nodes`.

  Args:
    downstream_nodes: a list of components that must run after this node.
  """
  self._downstream_nodes.update(downstream_nodes)
  for downstream_node in downstream_nodes:
    if self not in downstream_node.upstream_nodes:
      downstream_node.add_upstream_node(self)
add_upstream_node
add_upstream_node(upstream_node)

Experimental: Add another component that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_downstream_node.

PARAMETER DESCRIPTION
upstream_node

a component that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_node(self, upstream_node):
  """Experimental: Add another component that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_downstream_node`.

  Args:
    upstream_node: a component that must run before this node.
  """
  self._upstream_nodes.add(upstream_node)
  if self not in upstream_node.downstream_nodes:
    upstream_node.add_downstream_node(self)
add_upstream_nodes
add_upstream_nodes(upstream_nodes)

Experimental: Add components that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

PARAMETER DESCRIPTION
upstream_nodes

a list of components that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_nodes(self, upstream_nodes):
  """Experimental: Add components that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.


  Args:
    upstream_nodes: a list of components that must run before this node.
  """
  self._upstream_nodes.update(upstream_nodes)
  for upstream_node in upstream_nodes:
    if self not in upstream_node.downstream_nodes:
      upstream_node.add_downstream_node(self)
from_json_dict classmethod
from_json_dict(dict_data: Dict[str, Any]) -> Any

Convert from dictionary data to an object.

Source code in tfx/utils/json_utils.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def from_json_dict(cls, dict_data: Dict[str, Any]) -> Any:
  """Convert from dictionary data to an object."""
  instance = cls.__new__(cls)
  instance.__dict__ = dict_data
  return instance
to_json_dict
to_json_dict() -> Dict[str, Any]

Convert from an object to a JSON serializable dictionary.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def to_json_dict(self) -> Dict[str, Any]:
  """Convert from an object to a JSON serializable dictionary."""
  return dict((k, v)
              for k, v in self.__dict__.items()
              if k not in ['_upstream_nodes', '_downstream_nodes'])
with_platform_config
with_platform_config(config: Message) -> Self

Attaches a proto-form platform config to a component.

The config will be a per-node platform-specific config.

PARAMETER DESCRIPTION
config

platform config to attach to the component.

TYPE: Message

RETURNS DESCRIPTION
Self

the same component itself.

Source code in tfx/dsl/components/base/base_component.py
@doc_controls.do_not_doc_in_subclasses
def with_platform_config(
    self, config: message.Message
) -> typing_extensions.Self:
  """Attaches a proto-form platform config to a component.

  The config will be a per-node platform-specific config.

  Args:
    config: platform config to attach to the component.

  Returns:
    the same component itself.
  """
  self.platform_config = config
  return self

StatisticsGen

StatisticsGen(examples: BaseChannel, schema: Optional[BaseChannel] = None, stats_options: Optional[StatsOptions] = None, exclude_splits: Optional[List[str]] = None)

Bases: BaseBeamComponent

Official TFX StatisticsGen component.

The StatisticsGen component generates features statistics and random samples over training data, which can be used for visualization and validation. StatisticsGen uses Apache Beam and approximate algorithms to scale to large datasets.

Example
  # Computes statistics over data for visualization and example validation.
  statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])

Component outputs contains: - statistics: Channel of type standard_artifacts.ExampleStatistics for statistics of each split provided in the input examples.

Please see the StatisticsGen guide for more details.

Construct a StatisticsGen component.

PARAMETER DESCRIPTION
examples

A BaseChannel of ExamplesPath type, likely generated by the ExampleGen component. This needs to contain two splits labeled train and eval. required

TYPE: BaseChannel

schema

A Schema channel to use for automatically configuring the value of stats options passed to TFDV.

TYPE: Optional[BaseChannel] DEFAULT: None

stats_options

The StatsOptions instance to configure optional TFDV behavior. When stats_options.schema is set, it will be used instead of the schema channel input. Due to the requirement that stats_options be serialized, the slicer functions and custom stats generators are not usable, and an error will be raised if either is specified.

TYPE: Optional[StatsOptions] DEFAULT: None

exclude_splits

Names of splits where statistics and sample should not be generated. Default behavior (when exclude_splits is set to None) is excluding no splits.

TYPE: Optional[List[str]] DEFAULT: None

METHOD DESCRIPTION
add_downstream_node

Experimental: Add another component that must run after this one.

add_downstream_nodes

Experimental: Add another component that must run after this one.

add_upstream_node

Experimental: Add another component that must run before this one.

add_upstream_nodes

Experimental: Add components that must run before this one.

from_json_dict

Convert from dictionary data to an object.

to_json_dict

Convert from an object to a JSON serializable dictionary.

with_beam_pipeline_args

Add per component Beam pipeline args.

with_platform_config

Attaches a proto-form platform config to a component.

ATTRIBUTE DESCRIPTION
id

Node id, unique across all TFX nodes in a pipeline.

TYPE: str

outputs

Component's output channel dict.

TYPE: Dict[str, OutputChannel]

Source code in tfx/components/statistics_gen/component.py
def __init__(self,
             examples: types.BaseChannel,
             schema: Optional[types.BaseChannel] = None,
             stats_options: Optional[tfdv.StatsOptions] = None,
             exclude_splits: Optional[List[str]] = None):
  """Construct a StatisticsGen component.

  Args:
    examples: A BaseChannel of `ExamplesPath` type, likely generated by the
      [ExampleGen component](../../../guide/examplegen).
        This needs to contain two splits labeled `train` and `eval`.
        _required_
    schema: A `Schema` channel to use for automatically configuring the value
      of stats options passed to TFDV.
    stats_options: The StatsOptions instance to configure optional TFDV
      behavior. When stats_options.schema is set, it will be used instead of
      the `schema` channel input. Due to the requirement that stats_options be
      serialized, the slicer functions and custom stats generators are not
      usable, and an error will be raised if either is specified.
    exclude_splits: Names of splits where statistics and sample should not be
      generated. Default behavior (when exclude_splits is set to None) is
      excluding no splits.
  """
  if exclude_splits is None:
    exclude_splits = []
    logging.info('Excluding no splits because exclude_splits is not set.')
  statistics = types.Channel(type=standard_artifacts.ExampleStatistics)
  # TODO(b/150802589): Move jsonable interface to tfx_bsl and use json_utils.
  stats_options_json = stats_options.to_json() if stats_options else None
  spec = standard_component_specs.StatisticsGenSpec(
      examples=examples,
      schema=schema,
      stats_options_json=stats_options_json,
      exclude_splits=json_utils.dumps(exclude_splits),
      statistics=statistics)
  super().__init__(spec=spec)
Attributes
id property writable
id: str

Node id, unique across all TFX nodes in a pipeline.

If id is set by the user, return it directly. Otherwise, return .

RETURNS DESCRIPTION
str

node id.

outputs property
outputs: Dict[str, OutputChannel]

Component's output channel dict.

Functions
add_downstream_node
add_downstream_node(downstream_node)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_node.

PARAMETER DESCRIPTION
downstream_node

a component that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_node(self, downstream_node):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_node`.

  Args:
    downstream_node: a component that must run after this node.
  """
  self._downstream_nodes.add(downstream_node)
  if self not in downstream_node.upstream_nodes:
    downstream_node.add_upstream_node(self)
add_downstream_nodes
add_downstream_nodes(downstream_nodes)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_nodes.

PARAMETER DESCRIPTION
downstream_nodes

a list of components that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_nodes(self, downstream_nodes):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_nodes`.

  Args:
    downstream_nodes: a list of components that must run after this node.
  """
  self._downstream_nodes.update(downstream_nodes)
  for downstream_node in downstream_nodes:
    if self not in downstream_node.upstream_nodes:
      downstream_node.add_upstream_node(self)
add_upstream_node
add_upstream_node(upstream_node)

Experimental: Add another component that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_downstream_node.

PARAMETER DESCRIPTION
upstream_node

a component that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_node(self, upstream_node):
  """Experimental: Add another component that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_downstream_node`.

  Args:
    upstream_node: a component that must run before this node.
  """
  self._upstream_nodes.add(upstream_node)
  if self not in upstream_node.downstream_nodes:
    upstream_node.add_downstream_node(self)
add_upstream_nodes
add_upstream_nodes(upstream_nodes)

Experimental: Add components that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

PARAMETER DESCRIPTION
upstream_nodes

a list of components that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_nodes(self, upstream_nodes):
  """Experimental: Add components that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.


  Args:
    upstream_nodes: a list of components that must run before this node.
  """
  self._upstream_nodes.update(upstream_nodes)
  for upstream_node in upstream_nodes:
    if self not in upstream_node.downstream_nodes:
      upstream_node.add_downstream_node(self)
from_json_dict classmethod
from_json_dict(dict_data: Dict[str, Any]) -> Any

Convert from dictionary data to an object.

Source code in tfx/utils/json_utils.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def from_json_dict(cls, dict_data: Dict[str, Any]) -> Any:
  """Convert from dictionary data to an object."""
  instance = cls.__new__(cls)
  instance.__dict__ = dict_data
  return instance
to_json_dict
to_json_dict() -> Dict[str, Any]

Convert from an object to a JSON serializable dictionary.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def to_json_dict(self) -> Dict[str, Any]:
  """Convert from an object to a JSON serializable dictionary."""
  return dict((k, v)
              for k, v in self.__dict__.items()
              if k not in ['_upstream_nodes', '_downstream_nodes'])
with_beam_pipeline_args
with_beam_pipeline_args(beam_pipeline_args: Iterable[Union[str, Placeholder]]) -> BaseBeamComponent

Add per component Beam pipeline args.

PARAMETER DESCRIPTION
beam_pipeline_args

List of Beam pipeline args to be added to the Beam executor spec.

TYPE: Iterable[Union[str, Placeholder]]

RETURNS DESCRIPTION
BaseBeamComponent

the same component itself.

Source code in tfx/dsl/components/base/base_beam_component.py
def with_beam_pipeline_args(
    self, beam_pipeline_args: Iterable[Union[str, placeholder.Placeholder]]
) -> 'BaseBeamComponent':
  """Add per component Beam pipeline args.

  Args:
    beam_pipeline_args: List of Beam pipeline args to be added to the Beam
      executor spec.

  Returns:
    the same component itself.
  """
  cast(executor_spec.BeamExecutorSpec,
       self.executor_spec).add_beam_pipeline_args(beam_pipeline_args)
  return self
with_platform_config
with_platform_config(config: Message) -> Self

Attaches a proto-form platform config to a component.

The config will be a per-node platform-specific config.

PARAMETER DESCRIPTION
config

platform config to attach to the component.

TYPE: Message

RETURNS DESCRIPTION
Self

the same component itself.

Source code in tfx/dsl/components/base/base_component.py
@doc_controls.do_not_doc_in_subclasses
def with_platform_config(
    self, config: message.Message
) -> typing_extensions.Self:
  """Attaches a proto-form platform config to a component.

  The config will be a per-node platform-specific config.

  Args:
    config: platform config to attach to the component.

  Returns:
    the same component itself.
  """
  self.platform_config = config
  return self

Trainer

Trainer(examples: Optional[BaseChannel] = None, transformed_examples: Optional[BaseChannel] = None, transform_graph: Optional[BaseChannel] = None, schema: Optional[BaseChannel] = None, base_model: Optional[BaseChannel] = None, hyperparameters: Optional[BaseChannel] = None, module_file: Optional[Union[str, RuntimeParameter]] = None, run_fn: Optional[Union[str, RuntimeParameter]] = None, train_args: Optional[Union[TrainArgs, RuntimeParameter]] = None, eval_args: Optional[Union[EvalArgs, RuntimeParameter]] = None, custom_config: Optional[Union[Dict[str, Any], RuntimeParameter]] = None, custom_executor_spec: Optional[ExecutorSpec] = None)

Bases: BaseComponent

A TFX component to train a TensorFlow model.

The Trainer component is used to train and eval a model using given inputs and a user-supplied run_fn function.

An example of run_fn() can be found in the user-supplied code of the TFX penguin pipeline example.

Note

This component trains locally. For cloud distributed training, please refer to Cloud AI Platform Trainer.

Example

# Uses user-provided Python function that trains a model using TF.
trainer = Trainer(
    module_file=module_file,
    examples=transform.outputs["transformed_examples"],
    schema=infer_schema.outputs["schema"],
    transform_graph=transform.outputs["transform_graph"],
    train_args=proto.TrainArgs(splits=["train"], num_steps=10000),
    eval_args=proto.EvalArgs(splits=["eval"], num_steps=5000),
)

Component outputs contains:

Please see the Trainer guide for more details.

Construct a Trainer component.

PARAMETER DESCRIPTION
examples

A BaseChannel of type standard_artifacts.Examples, serving as the source of examples used in training (required). May be raw or transformed.

TYPE: Optional[BaseChannel] DEFAULT: None

transformed_examples

Deprecated (no compatibility guarantee). Please set 'examples' instead.

TYPE: Optional[BaseChannel] DEFAULT: None

transform_graph

An optional BaseChannel of type standard_artifacts.TransformGraph, serving as the input transform graph if present.

TYPE: Optional[BaseChannel] DEFAULT: None

schema

An optional BaseChannel of type standard_artifacts.Schema, serving as the schema of training and eval data. Schema is optional when

  1. transform_graph is provided which contains schema.
  2. user module bypasses the usage of schema, e.g., hardcoded.

TYPE: Optional[BaseChannel] DEFAULT: None

base_model

A BaseChannel of type Model, containing model that will be used for training. This can be used for warmstart, transfer learning or model ensembling.

TYPE: Optional[BaseChannel] DEFAULT: None

hyperparameters

A [BaseChannel] of type standard_artifacts.HyperParameters, serving as the hyperparameters for training module. Tuner's output best hyperparameters can be feed into this.

TYPE: Optional[BaseChannel] DEFAULT: None

module_file

A path to python module file containing UDF model definition. The module_file must implement a function named run_fn at its top level with function signature:

def run_fn(trainer.fn_args_utils.FnArgs)
and the trained model must be saved to FnArgs.serving_model_dir when this function is executed.

Exactly one of module_file or run_fn must be supplied if Trainer uses GenericExecutor (default). Use of a RuntimeParameter for this argument is experimental.

TYPE: Optional[Union[str, RuntimeParameter]] DEFAULT: None

run_fn

A python path to UDF model definition function for generic trainer. See 'module_file' for details. Exactly one of 'module_file' or 'run_fn' must be supplied if Trainer uses GenericExecutor (default). Use of a RuntimeParameter for this argument is experimental.

TYPE: Optional[Union[str, RuntimeParameter]] DEFAULT: None

train_args

A proto.TrainArgs instance, containing args used for training Currently only splits and num_steps are available. Default behavior (when splits is empty) is train on train split.

TYPE: Optional[Union[TrainArgs, RuntimeParameter]] DEFAULT: None

eval_args

A proto.EvalArgs instance, containing args used for evaluation. Currently only splits and num_steps are available. Default behavior (when splits is empty) is evaluate on eval split.

TYPE: Optional[Union[EvalArgs, RuntimeParameter]] DEFAULT: None

custom_config

A dict which contains addtional training job parameters that will be passed into user module.

TYPE: Optional[Union[Dict[str, Any], RuntimeParameter]] DEFAULT: None

custom_executor_spec

Optional custom executor spec. Deprecated (no compatibility guarantee), please customize component directly.

TYPE: Optional[ExecutorSpec] DEFAULT: None

RAISES DESCRIPTION
ValueError
  • When both or neither of module_file and run_fn is supplied.
  • When both or neither of examples and transformed_examples is supplied.
  • When transformed_examples is supplied but transform_graph is not supplied.
METHOD DESCRIPTION
add_downstream_node

Experimental: Add another component that must run after this one.

add_downstream_nodes

Experimental: Add another component that must run after this one.

add_upstream_node

Experimental: Add another component that must run before this one.

add_upstream_nodes

Experimental: Add components that must run before this one.

from_json_dict

Convert from dictionary data to an object.

to_json_dict

Convert from an object to a JSON serializable dictionary.

with_platform_config

Attaches a proto-form platform config to a component.

ATTRIBUTE DESCRIPTION
id

Node id, unique across all TFX nodes in a pipeline.

TYPE: str

outputs

Component's output channel dict.

TYPE: Dict[str, OutputChannel]

Source code in tfx/components/trainer/component.py
def __init__(
    self,
    examples: Optional[types.BaseChannel] = None,
    transformed_examples: Optional[types.BaseChannel] = None,
    transform_graph: Optional[types.BaseChannel] = None,
    schema: Optional[types.BaseChannel] = None,
    base_model: Optional[types.BaseChannel] = None,
    hyperparameters: Optional[types.BaseChannel] = None,
    module_file: Optional[Union[str, data_types.RuntimeParameter]] = None,
    run_fn: Optional[Union[str, data_types.RuntimeParameter]] = None,
    train_args: Optional[Union[trainer_pb2.TrainArgs,
                               data_types.RuntimeParameter]] = None,
    eval_args: Optional[Union[trainer_pb2.EvalArgs,
                              data_types.RuntimeParameter]] = None,
    custom_config: Optional[Union[Dict[str, Any],
                                  data_types.RuntimeParameter]] = None,
    custom_executor_spec: Optional[executor_spec.ExecutorSpec] = None):
  """Construct a Trainer component.

  Args:
    examples: A [BaseChannel][tfx.v1.types.BaseChannel] of type [`standard_artifacts.Examples`][tfx.v1.types.standard_artifacts.Examples],
      serving as the source of examples used in training (required). May be raw or
      transformed.
    transformed_examples: Deprecated (no compatibility guarantee). Please set
      'examples' instead.
    transform_graph: An optional [BaseChannel][tfx.v1.types.BaseChannel] of type
      [`standard_artifacts.TransformGraph`][tfx.v1.types.standard_artifacts.TransformGraph],
      serving as the input transform graph if present.
    schema:  An optional [BaseChannel][tfx.v1.types.BaseChannel] of type
      [`standard_artifacts.Schema`][tfx.v1.types.standard_artifacts.Schema],
      serving as the schema of training and eval data. Schema is optional when

        1. transform_graph is provided which contains schema.
        2. user module bypasses the usage of schema, e.g., hardcoded.
    base_model: A [BaseChannel][tfx.v1.types.BaseChannel] of type `Model`, containing model that will be
      used for training. This can be used for warmstart, transfer learning or
      model ensembling.
    hyperparameters: A [BaseChannel] of type
      [`standard_artifacts.HyperParameters`][tfx.v1.types.standard_artifacts.HyperParameters],
      serving as the hyperparameters for training module. Tuner's output best
      hyperparameters can be feed into this.
    module_file: A path to python module file containing UDF model definition.
      The `module_file` must implement a function named `run_fn` at its top
      level with function signature:
      ```python
      def run_fn(trainer.fn_args_utils.FnArgs)
      ```
      and the trained model must be saved to `FnArgs.serving_model_dir` when
      this function is executed.

      Exactly one of `module_file` or `run_fn` must be supplied if Trainer
      uses GenericExecutor (default). Use of a [RuntimeParameter][tfx.v1.dsl.experimental.RuntimeParameter] for this
      argument is experimental.
    run_fn:  A python path to UDF model definition function for generic
      trainer. See 'module_file' for details. Exactly one of 'module_file' or
      'run_fn' must be supplied if Trainer uses GenericExecutor (default). Use
      of a [RuntimeParameter][tfx.v1.dsl.experimental.RuntimeParameter] for this argument is experimental.
    train_args: A proto.TrainArgs instance, containing args used for training
      Currently only splits and num_steps are available. Default behavior
      (when splits is empty) is train on `train` split.
    eval_args: A proto.EvalArgs instance, containing args used for evaluation.
      Currently only splits and num_steps are available. Default behavior
      (when splits is empty) is evaluate on `eval` split.
    custom_config: A dict which contains addtional training job parameters
      that will be passed into user module.
    custom_executor_spec: Optional custom executor spec. Deprecated (no
      compatibility guarantee), please customize component directly.

  Raises:
    ValueError:
      - When both or neither of `module_file` and `run_fn` is supplied.
      - When both or neither of `examples` and `transformed_examples`
          is supplied.
      - When `transformed_examples` is supplied but `transform_graph`
          is not supplied.
  """
  if [bool(module_file), bool(run_fn)].count(True) != 1:
    raise ValueError(
        "Exactly one of 'module_file', or 'run_fn' must be supplied.")

  if bool(examples) == bool(transformed_examples):
    raise ValueError(
        "Exactly one of 'example' or 'transformed_example' must be supplied.")

  if transformed_examples and not transform_graph:
    raise ValueError("If 'transformed_examples' is supplied, "
                     "'transform_graph' must be supplied too.")

  if custom_executor_spec:
    logging.warning(
        "`custom_executor_spec` is deprecated. Please customize component directly."
    )
  if transformed_examples:
    logging.warning(
        "`transformed_examples` is deprecated. Please use `examples` instead."
    )
  examples = examples or transformed_examples
  model = types.Channel(type=standard_artifacts.Model)
  model_run = types.Channel(type=standard_artifacts.ModelRun)
  spec = standard_component_specs.TrainerSpec(
      examples=examples,
      transform_graph=transform_graph,
      schema=schema,
      base_model=base_model,
      hyperparameters=hyperparameters,
      train_args=train_args or trainer_pb2.TrainArgs(),
      eval_args=eval_args or trainer_pb2.EvalArgs(),
      module_file=module_file,
      run_fn=run_fn,
      custom_config=(custom_config
                     if isinstance(custom_config, data_types.RuntimeParameter)
                     else json_utils.dumps(custom_config)),
      model=model,
      model_run=model_run)
  super().__init__(spec=spec, custom_executor_spec=custom_executor_spec)

  if udf_utils.should_package_user_modules():
    # In this case, the `MODULE_PATH_KEY` execution property will be injected
    # as a reference to the given user module file after packaging, at which
    # point the `MODULE_FILE_KEY` execution property will be removed.
    udf_utils.add_user_module_dependency(
        self, standard_component_specs.MODULE_FILE_KEY,
        standard_component_specs.MODULE_PATH_KEY)
Attributes
id property writable
id: str

Node id, unique across all TFX nodes in a pipeline.

If id is set by the user, return it directly. Otherwise, return .

RETURNS DESCRIPTION
str

node id.

outputs property
outputs: Dict[str, OutputChannel]

Component's output channel dict.

Functions
add_downstream_node
add_downstream_node(downstream_node)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_node.

PARAMETER DESCRIPTION
downstream_node

a component that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_node(self, downstream_node):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_node`.

  Args:
    downstream_node: a component that must run after this node.
  """
  self._downstream_nodes.add(downstream_node)
  if self not in downstream_node.upstream_nodes:
    downstream_node.add_upstream_node(self)
add_downstream_nodes
add_downstream_nodes(downstream_nodes)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_nodes.

PARAMETER DESCRIPTION
downstream_nodes

a list of components that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_nodes(self, downstream_nodes):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_nodes`.

  Args:
    downstream_nodes: a list of components that must run after this node.
  """
  self._downstream_nodes.update(downstream_nodes)
  for downstream_node in downstream_nodes:
    if self not in downstream_node.upstream_nodes:
      downstream_node.add_upstream_node(self)
add_upstream_node
add_upstream_node(upstream_node)

Experimental: Add another component that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_downstream_node.

PARAMETER DESCRIPTION
upstream_node

a component that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_node(self, upstream_node):
  """Experimental: Add another component that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_downstream_node`.

  Args:
    upstream_node: a component that must run before this node.
  """
  self._upstream_nodes.add(upstream_node)
  if self not in upstream_node.downstream_nodes:
    upstream_node.add_downstream_node(self)
add_upstream_nodes
add_upstream_nodes(upstream_nodes)

Experimental: Add components that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

PARAMETER DESCRIPTION
upstream_nodes

a list of components that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_nodes(self, upstream_nodes):
  """Experimental: Add components that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.


  Args:
    upstream_nodes: a list of components that must run before this node.
  """
  self._upstream_nodes.update(upstream_nodes)
  for upstream_node in upstream_nodes:
    if self not in upstream_node.downstream_nodes:
      upstream_node.add_downstream_node(self)
from_json_dict classmethod
from_json_dict(dict_data: Dict[str, Any]) -> Any

Convert from dictionary data to an object.

Source code in tfx/utils/json_utils.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def from_json_dict(cls, dict_data: Dict[str, Any]) -> Any:
  """Convert from dictionary data to an object."""
  instance = cls.__new__(cls)
  instance.__dict__ = dict_data
  return instance
to_json_dict
to_json_dict() -> Dict[str, Any]

Convert from an object to a JSON serializable dictionary.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def to_json_dict(self) -> Dict[str, Any]:
  """Convert from an object to a JSON serializable dictionary."""
  return dict((k, v)
              for k, v in self.__dict__.items()
              if k not in ['_upstream_nodes', '_downstream_nodes'])
with_platform_config
with_platform_config(config: Message) -> Self

Attaches a proto-form platform config to a component.

The config will be a per-node platform-specific config.

PARAMETER DESCRIPTION
config

platform config to attach to the component.

TYPE: Message

RETURNS DESCRIPTION
Self

the same component itself.

Source code in tfx/dsl/components/base/base_component.py
@doc_controls.do_not_doc_in_subclasses
def with_platform_config(
    self, config: message.Message
) -> typing_extensions.Self:
  """Attaches a proto-form platform config to a component.

  The config will be a per-node platform-specific config.

  Args:
    config: platform config to attach to the component.

  Returns:
    the same component itself.
  """
  self.platform_config = config
  return self

Transform

Transform(examples: BaseChannel, schema: BaseChannel, module_file: Optional[Union[str, RuntimeParameter]] = None, preprocessing_fn: Optional[Union[str, RuntimeParameter]] = None, splits_config: Optional[SplitsConfig] = None, analyzer_cache: Optional[BaseChannel] = None, materialize: bool = True, disable_analyzer_cache: bool = False, force_tf_compat_v1: bool = False, custom_config: Optional[Dict[str, Any]] = None, disable_statistics: bool = False, stats_options_updater_fn: Optional[str] = None)

Bases: BaseBeamComponent

A TFX component to transform the input examples.

The Transform component wraps TensorFlow Transform (tf.Transform) to preprocess data in a TFX pipeline. This component will load the preprocessing_fn from input module file, preprocess both 'train' and 'eval' splits of input examples, generate the tf.Transform output, and save both transform function and transformed examples to orchestrator desired locations.

The Transform component can also invoke TFDV to compute statistics on the pre-transform and post-transform data. Invocations of TFDV take an optional StatsOptions object. To configure the StatsOptions object that is passed to TFDV for both pre-transform and post-transform statistics, users can define the optional stats_options_updater_fn within the module file.

Providing a preprocessing function

The Transform executor will look specifically for the preprocessing_fn() function within that file.

An example of preprocessing_fn() can be found in the user-supplied code of the TFX Chicago Taxi pipeline example.

Updating StatsOptions

The Transform executor will look specifically for the stats_options_updater_fn() within the module file specified above.

An example of stats_options_updater_fn() can be found in the user-supplied code of the TFX BERT MRPC pipeline example.

Example

# Performs transformations and feature engineering in training and serving.
transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=infer_schema.outputs['schema'],
    module_file=module_file,
)

Component outputs contains:

  • transform_graph: Channel of type standard_artifacts.TransformGraph, which includes an exported Tensorflow graph suitable for both training and serving.
  • transformed_examples: Channel of type standard_artifacts.Examples for materialized transformed examples, which includes transform splits as specified in splits_config. This is optional controlled by materialize.

Please see the Transform guide for more details.

Construct a Transform component.

PARAMETER DESCRIPTION
examples

A BaseChannel of type standard_artifacts.Examples required. This should contain custom splits specified in splits_config. If custom split is not provided, this should contain two splits 'train' and 'eval'.

TYPE: BaseChannel

schema

A BaseChannel of type standard_artifacts.Schema. This should contain a single schema artifact.

TYPE: BaseChannel

module_file

The file path to a python module file, from which the 'preprocessing_fn' function will be loaded. Exactly one of 'module_file' or 'preprocessing_fn' must be supplied.

The function needs to have the following signature:

def preprocessing_fn(inputs: Dict[Text, Any]) -> Dict[Text, Any]:
    ...
where the values of input and returned Dict are either tf.Tensor or tf.SparseTensor.

If additional inputs are needed for preprocessing_fn, they can be passed in custom_config:

def preprocessing_fn(
    inputs: Dict[Text, Any],
    custom_config: Dict[Text, Any],
) -> Dict[Text, Any]:
    ...
To update the stats options used to compute the pre-transform or post-transform statistics, optionally define the 'stats-options_updater_fn' within the same module. If implemented, this function needs to have the following signature:
def stats_options_updater_fn(
    stats_type: tfx.components.transform.stats_options_util.StatsType,
    stats_options: tfdv.StatsOptions,
) -> tfdv.StatsOptions:
    ...
Use of a RuntimeParameter for this argument is experimental.

TYPE: Optional[Union[str, RuntimeParameter]] DEFAULT: None

preprocessing_fn

The path to python function that implements a 'preprocessing_fn'. See 'module_file' for expected signature of the function. Exactly one of 'module_file' or 'preprocessing_fn' must be supplied. Use of a RuntimeParameter for this argument is experimental.

TYPE: Optional[Union[str, RuntimeParameter]] DEFAULT: None

splits_config

A transform_pb2.SplitsConfig instance, providing splits that should be analyzed and splits that should be transformed. Note analyze and transform splits can have overlap. Default behavior (when splits_config is not set) is analyze the 'train' split and transform all splits. If splits_config is set, analyze cannot be empty.

TYPE: Optional[SplitsConfig] DEFAULT: None

analyzer_cache

Optional input 'TransformCache' channel containing cached information from previous Transform runs. When provided, Transform will try use the cached calculation if possible.

TYPE: Optional[BaseChannel] DEFAULT: None

materialize

If True, write transformed examples as an output.

TYPE: bool DEFAULT: True

disable_analyzer_cache

If False, Transform will use input cache if provided and write cache output. If True, analyzer_cache must not be provided.

TYPE: bool DEFAULT: False

force_tf_compat_v1

(Optional) If True and/or TF2 behaviors are disabled Transform will use Tensorflow in compat.v1 mode irrespective of installed version of Tensorflow. Defaults to False.

TYPE: bool DEFAULT: False

custom_config

A dict which contains additional parameters that will be passed to preprocessing_fn.

TYPE: Optional[Dict[str, Any]] DEFAULT: None

disable_statistics

If True, do not invoke TFDV to compute pre-transform and post-transform statistics. When statistics are computed, they will will be stored in the pre_transform_feature_stats/ and post_transform_feature_stats/ subfolders of the transform_graph export.

TYPE: bool DEFAULT: False

stats_options_updater_fn

The path to a python function that implements a 'stats_options_updater_fn'. See 'module_file' for expected signature of the function. 'stats_options_updater_fn' cannot be defined if 'module_file' is specified.

TYPE: Optional[str] DEFAULT: None

RAISES DESCRIPTION
ValueError

When both or neither of 'module_file' and 'preprocessing_fn' is supplied.

METHOD DESCRIPTION
add_downstream_node

Experimental: Add another component that must run after this one.

add_downstream_nodes

Experimental: Add another component that must run after this one.

add_upstream_node

Experimental: Add another component that must run before this one.

add_upstream_nodes

Experimental: Add components that must run before this one.

from_json_dict

Convert from dictionary data to an object.

to_json_dict

Convert from an object to a JSON serializable dictionary.

with_beam_pipeline_args

Add per component Beam pipeline args.

with_platform_config

Attaches a proto-form platform config to a component.

ATTRIBUTE DESCRIPTION
id

Node id, unique across all TFX nodes in a pipeline.

TYPE: str

outputs

Component's output channel dict.

TYPE: Dict[str, OutputChannel]

Source code in tfx/components/transform/component.py
def __init__(
    self,
    examples: types.BaseChannel,
    schema: types.BaseChannel,
    module_file: Optional[Union[str, data_types.RuntimeParameter]] = None,
    preprocessing_fn: Optional[Union[str,
                                     data_types.RuntimeParameter]] = None,
    splits_config: Optional[transform_pb2.SplitsConfig] = None,
    analyzer_cache: Optional[types.BaseChannel] = None,
    materialize: bool = True,
    disable_analyzer_cache: bool = False,
    force_tf_compat_v1: bool = False,
    custom_config: Optional[Dict[str, Any]] = None,
    disable_statistics: bool = False,
    stats_options_updater_fn: Optional[str] = None):
  """Construct a Transform component.

  Args:
    examples: A [BaseChannel][tfx.v1.types.BaseChannel] of type [`standard_artifacts.Examples`][tfx.v1.types.standard_artifacts.Examples] _required_.
      This should contain custom splits specified in splits_config. If custom
      split is not provided, this should contain two splits 'train' and
      'eval'.
    schema: A [BaseChannel][tfx.v1.types.BaseChannel] of type [`standard_artifacts.Schema`][tfx.v1.types.standard_artifacts.Schema]. This should
      contain a single schema artifact.
    module_file: The file path to a python module file, from which the
      'preprocessing_fn' function will be loaded.
      Exactly one of 'module_file' or 'preprocessing_fn' must be supplied.

      The function needs to have the following signature:
      ``` {.python .no-copy}
      def preprocessing_fn(inputs: Dict[Text, Any]) -> Dict[Text, Any]:
          ...
      ```
      where the values of input and returned Dict are either tf.Tensor or
      tf.SparseTensor.

      If additional inputs are needed for preprocessing_fn, they can be passed
      in custom_config:

      ``` {.python .no-copy}
      def preprocessing_fn(
          inputs: Dict[Text, Any],
          custom_config: Dict[Text, Any],
      ) -> Dict[Text, Any]:
          ...
      ```
      To update the stats options used to compute the pre-transform or
      post-transform statistics, optionally define the
      'stats-options_updater_fn' within the same module. If implemented,
      this function needs to have the following signature:
      ``` {.python .no-copy}
      def stats_options_updater_fn(
          stats_type: tfx.components.transform.stats_options_util.StatsType,
          stats_options: tfdv.StatsOptions,
      ) -> tfdv.StatsOptions:
          ...
      ```
      Use of a [RuntimeParameter][tfx.v1.dsl.experimental.RuntimeParameter] for this argument is experimental.
    preprocessing_fn: The path to python function that implements a
      'preprocessing_fn'. See 'module_file' for expected signature of the
      function. Exactly one of 'module_file' or 'preprocessing_fn' must be
      supplied. Use of a [RuntimeParameter][tfx.v1.dsl.experimental.RuntimeParameter] for this argument is experimental.
    splits_config: A transform_pb2.SplitsConfig instance, providing splits
      that should be analyzed and splits that should be transformed. Note
      analyze and transform splits can have overlap. Default behavior (when
      splits_config is not set) is analyze the 'train' split and transform all
      splits. If splits_config is set, analyze cannot be empty.
    analyzer_cache: Optional input 'TransformCache' channel containing cached
      information from previous Transform runs. When provided, Transform will
      try use the cached calculation if possible.
    materialize: If True, write transformed examples as an output.
    disable_analyzer_cache: If False, Transform will use input cache if
      provided and write cache output. If True, `analyzer_cache` must not be
      provided.
    force_tf_compat_v1: (Optional) If True and/or TF2 behaviors are disabled
      Transform will use Tensorflow in compat.v1 mode irrespective of
      installed version of Tensorflow. Defaults to `False`.
    custom_config: A dict which contains additional parameters that will be
      passed to preprocessing_fn.
    disable_statistics: If True, do not invoke TFDV to compute pre-transform
      and post-transform statistics. When statistics are computed, they will
      will be stored in the `pre_transform_feature_stats/` and
      `post_transform_feature_stats/` subfolders of the `transform_graph`
      export.
    stats_options_updater_fn: The path to a python function that implements a
      'stats_options_updater_fn'. See 'module_file' for expected signature of
      the function. 'stats_options_updater_fn' cannot be defined if
      'module_file' is specified.

  Raises:
    ValueError: When both or neither of 'module_file' and 'preprocessing_fn'
      is supplied.
  """
  if bool(module_file) == bool(preprocessing_fn):
    raise ValueError(
        "Exactly one of 'module_file' or 'preprocessing_fn' must be supplied."
    )

  if bool(module_file) and bool(stats_options_updater_fn):
    raise ValueError(
        "'stats_options_updater_fn' cannot be specified together with "
        "'module_file'")

  transform_graph = types.Channel(type=standard_artifacts.TransformGraph)
  transformed_examples = None
  if materialize:
    transformed_examples = types.Channel(type=standard_artifacts.Examples)
    transformed_examples.matching_channel_name = "examples"

  (pre_transform_schema, pre_transform_stats, post_transform_schema,
   post_transform_stats, post_transform_anomalies) = (None,) * 5
  if not disable_statistics:
    pre_transform_schema = types.Channel(type=standard_artifacts.Schema)
    post_transform_schema = types.Channel(type=standard_artifacts.Schema)
    pre_transform_stats = types.Channel(
        type=standard_artifacts.ExampleStatistics)
    post_transform_stats = types.Channel(
        type=standard_artifacts.ExampleStatistics)
    post_transform_anomalies = types.Channel(
        type=standard_artifacts.ExampleAnomalies)

  if disable_analyzer_cache:
    updated_analyzer_cache = None
    if analyzer_cache:
      raise ValueError(
          "`analyzer_cache` is set when disable_analyzer_cache is True.")
  else:
    updated_analyzer_cache = types.Channel(
        type=standard_artifacts.TransformCache)

  spec = standard_component_specs.TransformSpec(
      examples=examples,
      schema=schema,
      module_file=module_file,
      preprocessing_fn=preprocessing_fn,
      stats_options_updater_fn=stats_options_updater_fn,
      force_tf_compat_v1=int(force_tf_compat_v1),
      splits_config=splits_config,
      transform_graph=transform_graph,
      transformed_examples=transformed_examples,
      analyzer_cache=analyzer_cache,
      updated_analyzer_cache=updated_analyzer_cache,
      custom_config=json_utils.dumps(custom_config),
      disable_statistics=int(disable_statistics),
      pre_transform_schema=pre_transform_schema,
      pre_transform_stats=pre_transform_stats,
      post_transform_schema=post_transform_schema,
      post_transform_stats=post_transform_stats,
      post_transform_anomalies=post_transform_anomalies)
  super().__init__(spec=spec)

  if udf_utils.should_package_user_modules():
    # In this case, the `MODULE_PATH_KEY` execution property will be injected
    # as a reference to the given user module file after packaging, at which
    # point the `MODULE_FILE_KEY` execution property will be removed.
    udf_utils.add_user_module_dependency(
        self, standard_component_specs.MODULE_FILE_KEY,
        standard_component_specs.MODULE_PATH_KEY)
Attributes
id property writable
id: str

Node id, unique across all TFX nodes in a pipeline.

If id is set by the user, return it directly. Otherwise, return .

RETURNS DESCRIPTION
str

node id.

outputs property
outputs: Dict[str, OutputChannel]

Component's output channel dict.

Functions
add_downstream_node
add_downstream_node(downstream_node)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_node.

PARAMETER DESCRIPTION
downstream_node

a component that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_node(self, downstream_node):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_node`.

  Args:
    downstream_node: a component that must run after this node.
  """
  self._downstream_nodes.add(downstream_node)
  if self not in downstream_node.upstream_nodes:
    downstream_node.add_upstream_node(self)
add_downstream_nodes
add_downstream_nodes(downstream_nodes)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_nodes.

PARAMETER DESCRIPTION
downstream_nodes

a list of components that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_nodes(self, downstream_nodes):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_nodes`.

  Args:
    downstream_nodes: a list of components that must run after this node.
  """
  self._downstream_nodes.update(downstream_nodes)
  for downstream_node in downstream_nodes:
    if self not in downstream_node.upstream_nodes:
      downstream_node.add_upstream_node(self)
add_upstream_node
add_upstream_node(upstream_node)

Experimental: Add another component that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_downstream_node.

PARAMETER DESCRIPTION
upstream_node

a component that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_node(self, upstream_node):
  """Experimental: Add another component that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_downstream_node`.

  Args:
    upstream_node: a component that must run before this node.
  """
  self._upstream_nodes.add(upstream_node)
  if self not in upstream_node.downstream_nodes:
    upstream_node.add_downstream_node(self)
add_upstream_nodes
add_upstream_nodes(upstream_nodes)

Experimental: Add components that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

PARAMETER DESCRIPTION
upstream_nodes

a list of components that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_nodes(self, upstream_nodes):
  """Experimental: Add components that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.


  Args:
    upstream_nodes: a list of components that must run before this node.
  """
  self._upstream_nodes.update(upstream_nodes)
  for upstream_node in upstream_nodes:
    if self not in upstream_node.downstream_nodes:
      upstream_node.add_downstream_node(self)
from_json_dict classmethod
from_json_dict(dict_data: Dict[str, Any]) -> Any

Convert from dictionary data to an object.

Source code in tfx/utils/json_utils.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def from_json_dict(cls, dict_data: Dict[str, Any]) -> Any:
  """Convert from dictionary data to an object."""
  instance = cls.__new__(cls)
  instance.__dict__ = dict_data
  return instance
to_json_dict
to_json_dict() -> Dict[str, Any]

Convert from an object to a JSON serializable dictionary.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def to_json_dict(self) -> Dict[str, Any]:
  """Convert from an object to a JSON serializable dictionary."""
  return dict((k, v)
              for k, v in self.__dict__.items()
              if k not in ['_upstream_nodes', '_downstream_nodes'])
with_beam_pipeline_args
with_beam_pipeline_args(beam_pipeline_args: Iterable[Union[str, Placeholder]]) -> BaseBeamComponent

Add per component Beam pipeline args.

PARAMETER DESCRIPTION
beam_pipeline_args

List of Beam pipeline args to be added to the Beam executor spec.

TYPE: Iterable[Union[str, Placeholder]]

RETURNS DESCRIPTION
BaseBeamComponent

the same component itself.

Source code in tfx/dsl/components/base/base_beam_component.py
def with_beam_pipeline_args(
    self, beam_pipeline_args: Iterable[Union[str, placeholder.Placeholder]]
) -> 'BaseBeamComponent':
  """Add per component Beam pipeline args.

  Args:
    beam_pipeline_args: List of Beam pipeline args to be added to the Beam
      executor spec.

  Returns:
    the same component itself.
  """
  cast(executor_spec.BeamExecutorSpec,
       self.executor_spec).add_beam_pipeline_args(beam_pipeline_args)
  return self
with_platform_config
with_platform_config(config: Message) -> Self

Attaches a proto-form platform config to a component.

The config will be a per-node platform-specific config.

PARAMETER DESCRIPTION
config

platform config to attach to the component.

TYPE: Message

RETURNS DESCRIPTION
Self

the same component itself.

Source code in tfx/dsl/components/base/base_component.py
@doc_controls.do_not_doc_in_subclasses
def with_platform_config(
    self, config: message.Message
) -> typing_extensions.Self:
  """Attaches a proto-form platform config to a component.

  The config will be a per-node platform-specific config.

  Args:
    config: platform config to attach to the component.

  Returns:
    the same component itself.
  """
  self.platform_config = config
  return self

Tuner

Tuner(examples: BaseChannel, schema: Optional[BaseChannel] = None, transform_graph: Optional[BaseChannel] = None, base_model: Optional[BaseChannel] = None, module_file: Optional[str] = None, tuner_fn: Optional[str] = None, train_args: Optional[TrainArgs] = None, eval_args: Optional[EvalArgs] = None, tune_args: Optional[TuneArgs] = None, custom_config: Optional[Dict[str, Any]] = None)

Bases: BaseComponent

A TFX component for model hyperparameter tuning.

Component outputs contains:

See the Tuner guide for more details.

Construct a Tuner component.

PARAMETER DESCRIPTION
examples

A BaseChannel of type standard_artifacts.Examples, serving as the source of examples that are used in tuning (required).

TYPE: BaseChannel

schema

An optional BaseChannel of type standard_artifacts.Schema, serving as the schema of training and eval data. This is used when raw examples are provided.

TYPE: Optional[BaseChannel] DEFAULT: None

transform_graph

An optional BaseChannel of type standard_artifacts.TransformGraph, serving as the input transform graph if present. This is used when transformed examples are provided.

TYPE: Optional[BaseChannel] DEFAULT: None

base_model

A BaseChannel of type Model, containing model that will be used for training. This can be used for warmstart, transfer learning or model ensembling.

TYPE: Optional[BaseChannel] DEFAULT: None

module_file

A path to python module file containing UDF tuner definition. The module_file must implement a function named tuner_fn at its top level. The function must have the following signature.

def tuner_fn(fn_args: FnArgs) -> TunerFnResult:
    ...
Exactly one of 'module_file' or 'tuner_fn' must be supplied.

TYPE: Optional[str] DEFAULT: None

tuner_fn

A python path to UDF model definition function. See 'module_file' for the required signature of the UDF. Exactly one of 'module_file' or 'tuner_fn' must be supplied.

TYPE: Optional[str] DEFAULT: None

train_args

A trainer_pb2.TrainArgs instance, containing args used for training. Currently only splits and num_steps are available. Default behavior (when splits is empty) is train on train split.

TYPE: Optional[TrainArgs] DEFAULT: None

eval_args

A trainer_pb2.EvalArgs instance, containing args used for eval. Currently only splits and num_steps are available. Default behavior (when splits is empty) is evaluate on eval split.

TYPE: Optional[EvalArgs] DEFAULT: None

tune_args

A tuner_pb2.TuneArgs instance, containing args used for tuning. Currently only num_parallel_trials is available.

TYPE: Optional[TuneArgs] DEFAULT: None

custom_config

A dict which contains addtional training job parameters that will be passed into user module.

TYPE: Optional[Dict[str, Any]] DEFAULT: None

METHOD DESCRIPTION
add_downstream_node

Experimental: Add another component that must run after this one.

add_downstream_nodes

Experimental: Add another component that must run after this one.

add_upstream_node

Experimental: Add another component that must run before this one.

add_upstream_nodes

Experimental: Add components that must run before this one.

from_json_dict

Convert from dictionary data to an object.

to_json_dict

Convert from an object to a JSON serializable dictionary.

with_platform_config

Attaches a proto-form platform config to a component.

ATTRIBUTE DESCRIPTION
id

Node id, unique across all TFX nodes in a pipeline.

TYPE: str

outputs

Component's output channel dict.

TYPE: Dict[str, OutputChannel]

Source code in tfx/components/tuner/component.py
def __init__(self,
             examples: types.BaseChannel,
             schema: Optional[types.BaseChannel] = None,
             transform_graph: Optional[types.BaseChannel] = None,
             base_model: Optional[types.BaseChannel] = None,
             module_file: Optional[str] = None,
             tuner_fn: Optional[str] = None,
             train_args: Optional[trainer_pb2.TrainArgs] = None,
             eval_args: Optional[trainer_pb2.EvalArgs] = None,
             tune_args: Optional[tuner_pb2.TuneArgs] = None,
             custom_config: Optional[Dict[str, Any]] = None):
  """Construct a Tuner component.

  Args:
    examples: A [BaseChannel][tfx.v1.types.BaseChannel] of type [`standard_artifacts.Examples`][tfx.v1.types.standard_artifacts.Examples], serving as
      the source of examples that are used in tuning (required).
    schema:  An optional [BaseChannel][tfx.v1.types.BaseChannel] of type [`standard_artifacts.Schema`][tfx.v1.types.standard_artifacts.Schema],
      serving as the schema of training and eval data. This is used when raw
      examples are provided.
    transform_graph: An optional [BaseChannel][tfx.v1.types.BaseChannel] of type
      [`standard_artifacts.TransformGraph`][tfx.v1.types.standard_artifacts.TransformGraph], serving as the input transform
      graph if present. This is used when transformed examples are provided.
    base_model: A [BaseChannel][tfx.v1.types.BaseChannel] of type [`Model`][tfx.v1.types.standard_artifacts.Model], containing model that will be
      used for training. This can be used for warmstart, transfer learning or
      model ensembling.
    module_file: A path to python module file containing UDF tuner definition.
      The module_file must implement a function named `tuner_fn` at its top
      level. The function must have the following signature.
          ``` {.python .no-copy}
          def tuner_fn(fn_args: FnArgs) -> TunerFnResult:
              ...
          ```
          Exactly one of 'module_file' or 'tuner_fn' must be supplied.
    tuner_fn:  A python path to UDF model definition function. See
      'module_file' for the required signature of the UDF. Exactly one of
      'module_file' or 'tuner_fn' must be supplied.
    train_args: A trainer_pb2.TrainArgs instance, containing args used for
      training. Currently only splits and num_steps are available. Default
      behavior (when splits is empty) is train on `train` split.
    eval_args: A trainer_pb2.EvalArgs instance, containing args used for eval.
      Currently only splits and num_steps are available. Default behavior
      (when splits is empty) is evaluate on `eval` split.
    tune_args: A tuner_pb2.TuneArgs instance, containing args used for tuning.
      Currently only num_parallel_trials is available.
    custom_config: A dict which contains addtional training job parameters
      that will be passed into user module.
  """
  if bool(module_file) == bool(tuner_fn):
    raise ValueError(
        "Exactly one of 'module_file' or 'tuner_fn' must be supplied")

  best_hyperparameters = types.Channel(
      type=standard_artifacts.HyperParameters)
  tuner_results = types.Channel(type=standard_artifacts.TunerResults)
  spec = standard_component_specs.TunerSpec(
      examples=examples,
      schema=schema,
      transform_graph=transform_graph,
      base_model=base_model,
      module_file=module_file,
      tuner_fn=tuner_fn,
      train_args=train_args or trainer_pb2.TrainArgs(),
      eval_args=eval_args or trainer_pb2.EvalArgs(),
      tune_args=tune_args,
      best_hyperparameters=best_hyperparameters,
      tuner_results=tuner_results,
      custom_config=json_utils.dumps(custom_config),
  )
  super().__init__(spec=spec)

  if udf_utils.should_package_user_modules():
    # In this case, the `MODULE_PATH_KEY` execution property will be injected
    # as a reference to the given user module file after packaging, at which
    # point the `MODULE_FILE_KEY` execution property will be removed.
    udf_utils.add_user_module_dependency(
        self, standard_component_specs.MODULE_FILE_KEY,
        standard_component_specs.MODULE_PATH_KEY)
Attributes
id property writable
id: str

Node id, unique across all TFX nodes in a pipeline.

If id is set by the user, return it directly. Otherwise, return .

RETURNS DESCRIPTION
str

node id.

outputs property
outputs: Dict[str, OutputChannel]

Component's output channel dict.

Functions
add_downstream_node
add_downstream_node(downstream_node)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_node.

PARAMETER DESCRIPTION
downstream_node

a component that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_node(self, downstream_node):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_node`.

  Args:
    downstream_node: a component that must run after this node.
  """
  self._downstream_nodes.add(downstream_node)
  if self not in downstream_node.upstream_nodes:
    downstream_node.add_upstream_node(self)
add_downstream_nodes
add_downstream_nodes(downstream_nodes)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_nodes.

PARAMETER DESCRIPTION
downstream_nodes

a list of components that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_nodes(self, downstream_nodes):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_nodes`.

  Args:
    downstream_nodes: a list of components that must run after this node.
  """
  self._downstream_nodes.update(downstream_nodes)
  for downstream_node in downstream_nodes:
    if self not in downstream_node.upstream_nodes:
      downstream_node.add_upstream_node(self)
add_upstream_node
add_upstream_node(upstream_node)

Experimental: Add another component that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_downstream_node.

PARAMETER DESCRIPTION
upstream_node

a component that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_node(self, upstream_node):
  """Experimental: Add another component that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_downstream_node`.

  Args:
    upstream_node: a component that must run before this node.
  """
  self._upstream_nodes.add(upstream_node)
  if self not in upstream_node.downstream_nodes:
    upstream_node.add_downstream_node(self)
add_upstream_nodes
add_upstream_nodes(upstream_nodes)

Experimental: Add components that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

PARAMETER DESCRIPTION
upstream_nodes

a list of components that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_nodes(self, upstream_nodes):
  """Experimental: Add components that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.


  Args:
    upstream_nodes: a list of components that must run before this node.
  """
  self._upstream_nodes.update(upstream_nodes)
  for upstream_node in upstream_nodes:
    if self not in upstream_node.downstream_nodes:
      upstream_node.add_downstream_node(self)
from_json_dict classmethod
from_json_dict(dict_data: Dict[str, Any]) -> Any

Convert from dictionary data to an object.

Source code in tfx/utils/json_utils.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def from_json_dict(cls, dict_data: Dict[str, Any]) -> Any:
  """Convert from dictionary data to an object."""
  instance = cls.__new__(cls)
  instance.__dict__ = dict_data
  return instance
to_json_dict
to_json_dict() -> Dict[str, Any]

Convert from an object to a JSON serializable dictionary.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def to_json_dict(self) -> Dict[str, Any]:
  """Convert from an object to a JSON serializable dictionary."""
  return dict((k, v)
              for k, v in self.__dict__.items()
              if k not in ['_upstream_nodes', '_downstream_nodes'])
with_platform_config
with_platform_config(config: Message) -> Self

Attaches a proto-form platform config to a component.

The config will be a per-node platform-specific config.

PARAMETER DESCRIPTION
config

platform config to attach to the component.

TYPE: Message

RETURNS DESCRIPTION
Self

the same component itself.

Source code in tfx/dsl/components/base/base_component.py
@doc_controls.do_not_doc_in_subclasses
def with_platform_config(
    self, config: message.Message
) -> typing_extensions.Self:
  """Attaches a proto-form platform config to a component.

  The config will be a per-node platform-specific config.

  Args:
    config: platform config to attach to the component.

  Returns:
    the same component itself.
  """
  self.platform_config = config
  return self