Skip to content

DSL

tfx.v1.dsl

TFX DSL module.

MODULE DESCRIPTION
components

TFX DSL components module.

experimental

TFX dsl.experimental module.

io

TFX DSL I/O module.

placeholders

TFX placeholders module.

standard_annotations

Public API for base type annotations.

CLASS DESCRIPTION
Artifact

TFX artifact used for orchestration.

Channel

Legacy channel interface.

Cond

Cond context manager that disable containing nodes if predicate is False.

ExecutionMode

Execution mode of a pipeline.

Importer

Definition for TFX Importer.

Pipeline

Logical TFX pipeline object.

Resolver

Definition for TFX Resolver.

Classes

Artifact

Artifact(mlmd_artifact_type: Optional[ArtifactType] = None)

Bases: Jsonable

TFX artifact used for orchestration.

This is used for type-checking and inter-component communication. Currently, it wraps a tuple of (ml_metadata.proto.Artifact, ml_metadata.proto.ArtifactType) with additional property accessors for internal state.

A user may create a subclass of Artifact and override the TYPE_NAME property with the type for this artifact subclass. Users of the subclass may then omit the "type_name" field when construction the object.

A user may specify artifact type-specific properties for an Artifact subclass by overriding the PROPERTIES dictionary, as detailed below.

Note

The behavior of this class is experimental, without backwards compatibility guarantees, and may change in upcoming releases.

Construct an instance of Artifact.

Used by TFX internal implementation: create an empty Artifact with type_name and optional split info specified. The remaining info will be filled in during compiling and running time. The Artifact should be transparent to end users and should not be initiated directly by pipeline users.

PARAMETER DESCRIPTION
mlmd_artifact_type

Proto message defining the underlying ArtifactType. Optional and intended for internal use.

TYPE: Optional[ArtifactType] DEFAULT: None

METHOD DESCRIPTION
copy_from

Set uri, properties and custom properties from a given Artifact.

from_json_dict

Convert from dictionary data to an object.

get_bool_custom_property

Get a custom property of bool type.

get_custom_property

Gets a custom property with key. Return None if not found.

get_float_custom_property

Gets a custom property of float type.

get_int_custom_property

Get a custom property of int type.

get_json_value_custom_property

Get a custom property of JSON type.

get_proto_custom_property

Get a custom property of proto type.

get_string_custom_property

Get a custom property of string type.

set_bool_custom_property

Sets a custom property of bool type.

set_float_custom_property

Sets a custom property of float type.

set_int_custom_property

Set a custom property of int type.

set_json_value_custom_property

Sets a custom property of JSON type.

set_mlmd_artifact

Replace the MLMD artifact object on this artifact.

set_mlmd_artifact_type

Set entire ArtifactType in this object.

set_proto_custom_property

Sets a custom property of proto type.

set_string_custom_property

Set a custom property of string type.

to_json_dict

Convert from an object to a JSON serializable dictionary.

ATTRIBUTE DESCRIPTION
artifact_type

Type of the underlying mlmd artifact.

external_id

external id of the underlying artifact.

TYPE: str

id

Id of the underlying mlmd artifact.

TYPE: int

is_external

Returns true if the artifact is external.

TYPE: bool

mlmd_artifact

Underlying mlmd artifact.

name

Name of the underlying mlmd artifact.

TYPE: str

pipeline_name

Name of the pipeline that produce the artifact.

TYPE: str

producer_component

Producer component of the artifact.

TYPE: str

state

State of the underlying mlmd artifact.

TYPE: str

type

Type of the artifact.

type_id

Type id of the underlying mlmd artifact.

TYPE: int

type_name

Type name of the underlying mlmd artifact.

uri

Artifact URI.

TYPE: str

Source code in tfx/types/artifact.py
def __init__(
    self,
    mlmd_artifact_type: Optional[metadata_store_pb2.ArtifactType] = None):
  """Construct an instance of Artifact.

  Used by TFX internal implementation: create an empty Artifact with
  type_name and optional split info specified. The remaining info will be
  filled in during compiling and running time. The Artifact should be
  transparent to end users and should not be initiated directly by pipeline
  users.

  Args:
    mlmd_artifact_type: Proto message defining the underlying ArtifactType.
      Optional and intended for internal use.
  """
  if self.__class__ == Artifact:
    if not mlmd_artifact_type:
      raise ValueError(
          'The "mlmd_artifact_type" argument must be passed to specify a '
          'type for this Artifact.')
    if not isinstance(mlmd_artifact_type, metadata_store_pb2.ArtifactType):
      raise ValueError(
          'The "mlmd_artifact_type" argument must be an instance of the '
          'proto message ml_metadata.proto.metadata_store_pb2.ArtifactType.')
  else:
    if mlmd_artifact_type:
      raise ValueError(
          'The "mlmd_artifact_type" argument must not be passed for '
          'Artifact subclass %s.' % self.__class__)
    mlmd_artifact_type = self._get_artifact_type()

  # MLMD artifact type proto object.
  self._artifact_type = mlmd_artifact_type
  # Underlying MLMD artifact proto object.
  self._artifact = metadata_store_pb2.Artifact()
  # When list/dict JSON or proto value properties are read, it is possible
  # they will be modified without knowledge of this class. Therefore,
  # deserialized values need to be cached here and reserialized into the
  # metadata proto when requested.
  self._cached_modifiable_properties = {}
  self._cached_modifiable_custom_properties = {}
  # Initialization flag to prevent recursive getattr / setattr errors.
  self._initialized = True
Attributes
artifact_type property
artifact_type

Type of the underlying mlmd artifact.

external_id property
external_id: str

external id of the underlying artifact.

id property writable
id: int

Id of the underlying mlmd artifact.

is_external property writable
is_external: bool

Returns true if the artifact is external.

mlmd_artifact property
mlmd_artifact

Underlying mlmd artifact.

name property writable
name: str

Name of the underlying mlmd artifact.

pipeline_name property writable
pipeline_name: str

Name of the pipeline that produce the artifact.

producer_component property writable
producer_component: str

Producer component of the artifact.

state property writable
state: str

State of the underlying mlmd artifact.

type property
type

Type of the artifact.

type_id property writable
type_id: int

Type id of the underlying mlmd artifact.

type_name property
type_name

Type name of the underlying mlmd artifact.

uri property writable
uri: str

Artifact URI.

Functions
copy_from
copy_from(other: Artifact)

Set uri, properties and custom properties from a given Artifact.

Source code in tfx/types/artifact.py
@doc_controls.do_not_doc_inheritable
def copy_from(self, other: 'Artifact'):
  """Set uri, properties and custom properties from a given Artifact."""
  assert self.type is other.type, (
      'Unable to set properties from an artifact of different type: {} vs {}'
      .format(self.type_name, other.type_name))
  self.uri = other.uri
  if other.artifact_type.HasField('id'):
    self.type_id = other.artifact_type.id

  self._artifact.properties.clear()
  self._artifact.properties.MergeFrom(other._artifact.properties)  # pylint: disable=protected-access
  self._artifact.custom_properties.clear()
  self._artifact.custom_properties.MergeFrom(
      other._artifact.custom_properties)  # pylint: disable=protected-access
  self._cached_modifiable_properties = copy.deepcopy(
      other._cached_modifiable_properties)  # pylint: disable=protected-access
  self._cached_modifiable_custom_properties = copy.deepcopy(
      other._cached_modifiable_custom_properties)  # pylint: disable=protected-access
from_json_dict classmethod
from_json_dict(dict_data: Dict[str, Any]) -> Any

Convert from dictionary data to an object.

Source code in tfx/types/artifact.py
@classmethod
@doc_controls.do_not_doc_inheritable
def from_json_dict(cls, dict_data: Dict[str, Any]) -> Any:
  module_name = dict_data['__artifact_class_module__']
  class_name = dict_data['__artifact_class_name__']
  artifact = metadata_store_pb2.Artifact()
  artifact_type = metadata_store_pb2.ArtifactType()
  json_format.Parse(json.dumps(dict_data['artifact']), artifact)
  json_format.Parse(json.dumps(dict_data['artifact_type']), artifact_type)

  # First, try to resolve the specific class used for the artifact; if this
  # is not possible, use a generic artifact.Artifact object.
  result = None
  try:
    artifact_cls = getattr(importlib.import_module(module_name), class_name)
    # If the artifact type is the base Artifact class, do not construct the
    # object here since that constructor requires the mlmd_artifact_type
    # argument.
    if artifact_cls != Artifact:
      result = artifact_cls()
  except (AttributeError, ImportError, ValueError):
    logging.warning((
        'Could not load artifact class %s.%s; using fallback deserialization '
        'for the relevant artifact. Please make sure that any artifact '
        'classes can be imported within your container or environment.'),
                    module_name, class_name)
  if not result:
    result = Artifact(mlmd_artifact_type=artifact_type)
  result.set_mlmd_artifact_type(artifact_type)
  result.set_mlmd_artifact(artifact)
  return result
get_bool_custom_property
get_bool_custom_property(key: str) -> bool

Get a custom property of bool type.

Source code in tfx/types/artifact.py
@doc_controls.do_not_doc_in_subclasses
def get_bool_custom_property(self, key: str) -> bool:
  """Get a custom property of bool type."""
  if key not in self._artifact.custom_properties:
    return False
  json_value = self.get_json_value_custom_property(key)
  if isinstance(json_value, bool):
    return json_value
  return self._artifact.custom_properties[key].bool_value
get_custom_property
get_custom_property(key: str) -> Optional[Union[int, float, str, bool, JsonValueType]]

Gets a custom property with key. Return None if not found.

Source code in tfx/types/artifact.py
@doc_controls.do_not_doc_in_subclasses
def get_custom_property(
    self, key: str
) -> Optional[Union[int, float, str, bool, JsonValueType]]:
  """Gets a custom property with key. Return None if not found."""
  if key not in self._artifact.custom_properties:
    return None

  json_value = self.get_json_value_custom_property(key)
  if json_value:
    return json_value

  mlmd_value = self._artifact.custom_properties[key]
  if mlmd_value.HasField('int_value'):
    return mlmd_value.int_value
  elif mlmd_value.HasField('double_value'):
    return mlmd_value.double_value
  elif mlmd_value.HasField('string_value'):
    return mlmd_value.string_value
  elif mlmd_value.HasField('bool_value'):
    return mlmd_value.bool_value
  return None
get_float_custom_property
get_float_custom_property(key: str) -> float

Gets a custom property of float type.

Source code in tfx/types/artifact.py
@doc_controls.do_not_doc_in_subclasses
def get_float_custom_property(self, key: str) -> float:
  """Gets a custom property of float type."""
  if key not in self._artifact.custom_properties:
    return 0.0
  json_value = self.get_json_value_custom_property(key)
  if isinstance(json_value, float):
    return json_value
  return self._artifact.custom_properties[key].double_value
get_int_custom_property
get_int_custom_property(key: str) -> int

Get a custom property of int type.

Source code in tfx/types/artifact.py
@doc_controls.do_not_doc_in_subclasses
def get_int_custom_property(self, key: str) -> int:
  """Get a custom property of int type."""
  if key not in self._artifact.custom_properties:
    return 0
  json_value = self.get_json_value_custom_property(key)
  if isinstance(json_value, float):
    return int(json_value)
  return self._artifact.custom_properties[key].int_value
get_json_value_custom_property
get_json_value_custom_property(key: str) -> JsonValueType

Get a custom property of JSON type.

Source code in tfx/types/artifact.py
@doc_controls.do_not_doc_inheritable
def get_json_value_custom_property(self, key: str) -> JsonValueType:
  """Get a custom property of JSON type."""
  if key in self._cached_modifiable_custom_properties:
    return self._cached_modifiable_custom_properties[key]
  if (key not in self._artifact.custom_properties or
      not self._artifact.custom_properties[key].HasField('struct_value')):
    return None
  value = _decode_struct_value(
      self._artifact.custom_properties[key].struct_value)
  # We must cache the decoded lists or dictionaries returned here so that
  # if their recursive contents are modified, the Metadata proto message
  # can be updated to reflect this.
  if isinstance(value, (dict, list)):
    self._cached_modifiable_custom_properties[key] = value
  return value
get_proto_custom_property
get_proto_custom_property(key: str) -> Optional[Message]

Get a custom property of proto type.

Source code in tfx/types/artifact.py
@doc_controls.do_not_doc_inheritable
def get_proto_custom_property(self, key: str) -> Optional[message.Message]:
  """Get a custom property of proto type."""
  if key in self._cached_modifiable_custom_properties:
    return self._cached_modifiable_custom_properties[key]
  if (key not in self._artifact.custom_properties or
      not self._artifact.custom_properties[key].HasField('proto_value')):
    return None
  value = proto_utils.unpack_proto_any(
      self._artifact.custom_properties[key].proto_value)
  # We must cache the protobuf message here so that if its contents are
  # modified, the Metadata proto message can be updated to reflect this.
  if isinstance(value, message.Message):
    self._cached_modifiable_custom_properties[key] = value
  return value
get_string_custom_property
get_string_custom_property(key: str) -> str

Get a custom property of string type.

Source code in tfx/types/artifact.py
@doc_controls.do_not_doc_in_subclasses
def get_string_custom_property(self, key: str) -> str:
  """Get a custom property of string type."""
  if key not in self._artifact.custom_properties:
    return ''
  json_value = self.get_json_value_custom_property(key)
  if isinstance(json_value, str):
    return json_value
  return self._artifact.custom_properties[key].string_value
set_bool_custom_property
set_bool_custom_property(key: str, value: bool)

Sets a custom property of bool type.

Source code in tfx/types/artifact.py
@doc_controls.do_not_doc_in_subclasses
def set_bool_custom_property(self, key: str, value: bool):
  """Sets a custom property of bool type."""
  self._artifact.custom_properties[key].bool_value = value
set_float_custom_property
set_float_custom_property(key: str, value: float)

Sets a custom property of float type.

Source code in tfx/types/artifact.py
@doc_controls.do_not_doc_in_subclasses
def set_float_custom_property(self, key: str, value: float):
  """Sets a custom property of float type."""
  self._artifact.custom_properties[key].double_value = builtins.float(value)
set_int_custom_property
set_int_custom_property(key: str, value: int)

Set a custom property of int type.

Source code in tfx/types/artifact.py
@doc_controls.do_not_doc_in_subclasses
def set_int_custom_property(self, key: str, value: int):
  """Set a custom property of int type."""
  self._artifact.custom_properties[key].int_value = builtins.int(value)
set_json_value_custom_property
set_json_value_custom_property(key: str, value: JsonValueType)

Sets a custom property of JSON type.

Source code in tfx/types/artifact.py
@doc_controls.do_not_doc_inheritable
def set_json_value_custom_property(self, key: str, value: JsonValueType):
  """Sets a custom property of JSON type."""
  self._cached_modifiable_custom_properties[key] = value
set_mlmd_artifact
set_mlmd_artifact(artifact: Artifact)

Replace the MLMD artifact object on this artifact.

Source code in tfx/types/artifact.py
@doc_controls.do_not_doc_inheritable
def set_mlmd_artifact(self, artifact: metadata_store_pb2.Artifact):
  """Replace the MLMD artifact object on this artifact."""
  if not isinstance(artifact, metadata_store_pb2.Artifact):
    raise ValueError(
        ('Expected instance of metadata_store_pb2.Artifact, got %s '
         'instead.') % (artifact,))
  self._artifact = artifact
  self._cached_modifiable_properties = {}
  self._cached_modifiable_custom_properties = {}
set_mlmd_artifact_type
set_mlmd_artifact_type(artifact_type: ArtifactType)

Set entire ArtifactType in this object.

Source code in tfx/types/artifact.py
@doc_controls.do_not_doc_inheritable
def set_mlmd_artifact_type(self,
                           artifact_type: metadata_store_pb2.ArtifactType):
  """Set entire ArtifactType in this object."""
  if not isinstance(artifact_type, metadata_store_pb2.ArtifactType):
    raise ValueError(
        ('Expected instance of metadata_store_pb2.ArtifactType, got %s '
         'instead.') % (artifact_type,))
  self._artifact_type = artifact_type
  self._artifact.type_id = artifact_type.id
set_proto_custom_property
set_proto_custom_property(key: str, value: Message)

Sets a custom property of proto type.

Source code in tfx/types/artifact.py
@doc_controls.do_not_doc_inheritable
def set_proto_custom_property(self, key: str, value: message.Message):
  """Sets a custom property of proto type."""
  self._cached_modifiable_custom_properties[key] = value
set_string_custom_property
set_string_custom_property(key: str, value: str)

Set a custom property of string type.

Source code in tfx/types/artifact.py
@doc_controls.do_not_doc_in_subclasses
def set_string_custom_property(self, key: str, value: str):
  """Set a custom property of string type."""
  self._artifact.custom_properties[key].string_value = value
to_json_dict
to_json_dict() -> Dict[str, Any]

Convert from an object to a JSON serializable dictionary.

Source code in tfx/types/artifact.py
@doc_controls.do_not_doc_inheritable
def to_json_dict(self) -> Dict[str, Any]:
  return {
      'artifact':
          json.loads(
              json_format.MessageToJson(
                  message=self.mlmd_artifact,
                  preserving_proto_field_name=True)),
      'artifact_type':
          json.loads(
              json_format.MessageToJson(
                  message=self._artifact_type,
                  preserving_proto_field_name=True)),
      '__artifact_class_module__':
          self.__class__.__module__,
      '__artifact_class_name__':
          self.__class__.__name__,
  }

Channel

Channel(type: Type[Artifact], additional_properties: Optional[Dict[str, Property]] = None, additional_custom_properties: Optional[Dict[str, Property]] = None, artifacts: Optional[Iterable[Artifact]] = None, producer_component_id: Optional[str] = None, output_key: Optional[str] = None)

Bases: Jsonable, BaseChannel

Legacy channel interface.

Channel used to represent the BaseChannel concept in the early TFX code, but due to having too much features in the same class, we refactored it to multiple classes:

  • BaseChannel for the general input abstraction
  • OutputChannel for component.outputs['key'].
  • MLMDQueryChannel for simple filter-based input resolution.

Please do not use this class directly, but instead use the alternatives. This class won't be removed in TFX 1.x due to backward compatibility guarantee though.

Initialization of Channel.

PARAMETER DESCRIPTION
type

Subclass of Artifact that represents the type of this Channel.

TYPE: Type[Artifact]

additional_properties

(Optional) A mapping of properties which will be added to artifacts when this channel is used as an output of components. This is experimental and is subject to change in the future.

TYPE: Optional[Dict[str, Property]] DEFAULT: None

additional_custom_properties

(Optional) A mapping of custom_properties which will be added to artifacts when this channel is used as an output of components. This is experimental and is subject to change in the future.

TYPE: Optional[Dict[str, Property]] DEFAULT: None

artifacts

Deprecated and ignored, kept only for backward compatibility.

TYPE: Optional[Iterable[Artifact]] DEFAULT: None

producer_component_id

(Optional) Producer component id of the Channel. This argument is internal/experimental and is subject to change in the future.

TYPE: Optional[str] DEFAULT: None

output_key

(Optional) The output key when producer component produces the artifacts in this Channel. This argument is internal/experimental and is subject to change in the future.

TYPE: Optional[str] DEFAULT: None

METHOD DESCRIPTION
as_optional

Creates an optional version of self.

as_output_channel

Internal method to derive OutputChannel from the Channel instance.

from_json_dict

Convert from dictionary data to an object.

get

Returns all artifacts that can be get from this Channel.

get_data_dependent_node_ids

Get data dependent nodes of this channel.

set_artifacts

Sets artifacts for a static Channel. Will be deprecated.

to_json_dict

Convert from an object to a JSON serializable dictionary.

ATTRIBUTE DESCRIPTION
is_optional

If this is an "optional" channel. Changes Pipeline runtime behavior.

TYPE: Optional[bool]

type_name

Name of the artifact type class that Channel takes.

Source code in tfx/types/channel.py
def __init__(
    self,
    type: Type[Artifact],  # pylint: disable=redefined-builtin
    additional_properties: Optional[Dict[str, Property]] = None,
    additional_custom_properties: Optional[Dict[str, Property]] = None,
    # TODO(b/161490287): deprecate static artifact.
    artifacts: Optional[Iterable[Artifact]] = None,
    producer_component_id: Optional[str] = None,
    output_key: Optional[str] = None):
  """Initialization of Channel.

  Args:
    type: Subclass of Artifact that represents the type of this Channel.
    additional_properties: (Optional) A mapping of properties which will be
      added to artifacts when this channel is used as an output of components.
      This is experimental and is subject to change in the future.
    additional_custom_properties: (Optional) A mapping of custom_properties
      which will be added to artifacts when this channel is used as an output
      of components. This is experimental and is subject to change in the
      future.
    artifacts: Deprecated and ignored, kept only for backward compatibility.
    producer_component_id: (Optional) Producer component id of the Channel.
      This argument is internal/experimental and is subject to change in the
      future.
    output_key: (Optional) The output key when producer component produces the
      artifacts in this Channel. This argument is internal/experimental and is
      subject to change in the future.
  """
  super().__init__(type=type)

  if additional_properties is not None:
    self._validate_additional_properties(additional_properties)
  self.additional_properties = additional_properties or {}

  if additional_custom_properties is not None:
    self._validate_additional_custom_properties(additional_custom_properties)
  self.additional_custom_properties = additional_custom_properties or {}

  if producer_component_id is not None:
    self._validate_producer_component_id(producer_component_id)
  # Use a protected attribute & getter/setter property as OutputChannel is
  # overriding it.
  self._producer_component_id = producer_component_id

  if output_key is not None:
    self._validate_output_key(output_key)
  self.output_key = output_key

  if artifacts:
    logging.warning(
        'Artifacts param is ignored by Channel constructor, please remove!')
  self._artifacts = []
  self._matching_channel_name = None
Attributes
is_optional property
is_optional: Optional[bool]

If this is an "optional" channel. Changes Pipeline runtime behavior.

type_name property
type_name

Name of the artifact type class that Channel takes.

Functions
as_optional
as_optional() -> Self

Creates an optional version of self.

By default component input channels are considered required, meaning if the channel does not contain at least 1 artifact, the component will be skipped. Making channel optional disables this requirement and allows componenst to be executed with no artifacts from this channel.

RETURNS DESCRIPTION
Self

A copy of self which is optional.

Source code in tfx/types/channel.py
def as_optional(self) -> typing_extensions.Self:
  """Creates an optional version of self.

  By default component input channels are considered required, meaning
  if the channel does not contain at least 1 artifact, the component
  will be skipped. Making channel optional disables this requirement and
  allows componenst to be executed with no artifacts from this channel.

  Returns:
    A copy of self which is optional.
  """
  new_channel = copy.copy(self)
  new_channel._is_optional = True  # pylint: disable=protected-access
  return new_channel
as_output_channel
as_output_channel(producer_component: Any, output_key: str) -> 'OutputChannel'

Internal method to derive OutputChannel from the Channel instance.

Return value (OutputChannel instance) is based on the shallow copy of self, so that any attribute change in one is reflected on the others.

PARAMETER DESCRIPTION
producer_component

A BaseNode instance that is producing this channel.

TYPE: Any

output_key

Corresponding node.outputs key for this channel.

TYPE: str

RETURNS DESCRIPTION
'OutputChannel'

An OutputChannel instance that shares attributes with self.

Source code in tfx/types/channel.py
@doc_controls.do_not_generate_docs
def as_output_channel(
    self, producer_component: Any, output_key: str) -> 'OutputChannel':
  """Internal method to derive OutputChannel from the Channel instance.

  Return value (OutputChannel instance) is based on the shallow copy of self,
  so that any attribute change in one is reflected on the others.

  Args:
    producer_component: A BaseNode instance that is producing this channel.
    output_key: Corresponding node.outputs key for this channel.

  Returns:
    An OutputChannel instance that shares attributes with self.
  """
  # Disable pylint false alarm for safe access of protected attributes.
  # pylint: disable=protected-access
  result = OutputChannel(self.type, producer_component, output_key)
  result.additional_properties = self.additional_properties
  result.additional_custom_properties = self.additional_custom_properties
  result.set_artifacts(self._artifacts)
  return result
from_json_dict classmethod
from_json_dict(dict_data: Dict[str, Any]) -> Any

Convert from dictionary data to an object.

Source code in tfx/types/channel.py
@classmethod
@doc_controls.do_not_doc_inheritable
def from_json_dict(cls, dict_data: Dict[str, Any]) -> Any:
  artifact_type = metadata_store_pb2.ArtifactType()
  json_format.Parse(json.dumps(dict_data['type']), artifact_type)
  type_cls = artifact_utils.get_artifact_type_class(artifact_type)
  artifacts = list(Artifact.from_json_dict(a) for a in dict_data['artifacts'])
  additional_properties = dict_data['additional_properties']
  additional_custom_properties = dict_data['additional_custom_properties']
  producer_component_id = dict_data.get('producer_component_id', None)
  output_key = dict_data.get('output_key', None)
  return Channel(
      type=type_cls,
      additional_properties=additional_properties,
      additional_custom_properties=additional_custom_properties,
      producer_component_id=producer_component_id,
      output_key=output_key).set_artifacts(artifacts)
get
get() -> Iterable[Artifact]

Returns all artifacts that can be get from this Channel.

RETURNS DESCRIPTION
Iterable[Artifact]

An artifact collection.

Source code in tfx/types/channel.py
@doc_controls.do_not_doc_inheritable
def get(self) -> Iterable[Artifact]:
  """Returns all artifacts that can be get from this Channel.

  Returns:
    An artifact collection.
  """
  # TODO(b/125037186): We should support dynamic query against a Channel
  # instead of a static Artifact collection.
  return self._artifacts
get_data_dependent_node_ids
get_data_dependent_node_ids() -> Set[str]

Get data dependent nodes of this channel.

Currently only the OutputChannel directly imposes the data dependency, but other channels can also indirectly have a data dependency if they depend on the OutputChannel. Use this abstract method to define transitive data dependency.

RETURNS DESCRIPTION
Set[str]

A set of data-dependent node IDs.

Source code in tfx/types/channel.py
def get_data_dependent_node_ids(self) -> Set[str]:
  return set()
set_artifacts
set_artifacts(artifacts: Iterable[Artifact]) -> 'Channel'

Sets artifacts for a static Channel. Will be deprecated.

Source code in tfx/types/channel.py
@doc_controls.do_not_doc_inheritable
def set_artifacts(self, artifacts: Iterable[Artifact]) -> 'Channel':
  """Sets artifacts for a static Channel. Will be deprecated."""
  if self._matching_channel_name:
    raise ValueError(
        'Only one of `artifacts` and `matching_channel_name` should be set.')
  self._validate_static_artifacts(artifacts)
  self._artifacts = artifacts
  return self
to_json_dict
to_json_dict() -> Dict[str, Any]

Convert from an object to a JSON serializable dictionary.

Source code in tfx/types/channel.py
@doc_controls.do_not_doc_inheritable
def to_json_dict(self) -> Dict[str, Any]:
  return {
      'type':
          json.loads(
              json_format.MessageToJson(
                  message=self.type._get_artifact_type(),  # pylint: disable=protected-access
                  preserving_proto_field_name=True)),
      'artifacts':
          list(a.to_json_dict() for a in self._artifacts),
      'additional_properties':
          self.additional_properties,
      'additional_custom_properties':
          self.additional_custom_properties,
      'producer_component_id':
          (self.producer_component_id if self.producer_component_id else None
          ),
      'output_key': (self.output_key if self.output_key else None),
  }

Cond

Cond(predicate: Predicate)

Bases: DslContextManager[None]

Cond context manager that disable containing nodes if predicate is False.

Cond blocks can be nested to express the nested conditions.

Usage:

evaluator = Evaluator(
      examples=example_gen.outputs["examples"],
      model=trainer.outputs["model"],
      eval_config=EvalConfig(...),
)

with Cond(evaluator.outputs["blessing"].future().custom_property("blessed") == 1):
    pusher = Pusher(
        model=trainer.outputs["model"], push_destination=PushDestination(...)
    )
Source code in tfx/dsl/experimental/conditionals/conditional.py
def __init__(self, predicate: placeholder.Predicate):
  super().__init__()
  self._predicate = predicate

ExecutionMode

Bases: Enum

Execution mode of a pipeline.

Please see this RFC for more details.

Importer

Importer(source_uri: str, artifact_type: Type[Artifact], reimport: Optional[bool] = False, properties: Optional[Dict[str, Union[str, int]]] = None, custom_properties: Optional[Dict[str, Union[str, int]]] = None, output_key: Optional[str] = None)

Bases: BaseNode

Definition for TFX Importer.

The Importer is a special TFX node which registers an external resource into MLMD so that downstream nodes can use the registered artifact as an input.

Here is an example to use the Importer:

importer = Importer(
    source_uri="uri/to/schema",
    artifact_type=standard_artifacts.Schema,
    reimport=False,
).with_id("import_schema")
schema_gen = SchemaGen(
    fixed_schema=importer.outputs["result"],
    examples=...,
)

Init function for the Importer.

PARAMETER DESCRIPTION
source_uri

the URI of the resource that needs to be registered.

TYPE: str

artifact_type

the type of the artifact to import.

TYPE: Type[Artifact]

reimport

whether or not to re-import as a new artifact if the URI has been imported in before.

TYPE: Optional[bool] DEFAULT: False

properties

Dictionary of properties for the imported Artifact. These properties should be ones declared for the given artifact_type (see the PROPERTIES attribute of the definition of the type for details).

TYPE: Optional[Dict[str, Union[str, int]]] DEFAULT: None

custom_properties

Dictionary of custom properties for the imported Artifact. These properties should be of type Text or int.

TYPE: Optional[Dict[str, Union[str, int]]] DEFAULT: None

output_key

The key to use for the imported artifact in the Importer's output dictionary. Defaults to 'result'.

TYPE: Optional[str] DEFAULT: None

METHOD DESCRIPTION
add_downstream_node

Experimental: Add another component that must run after this one.

add_downstream_nodes

Experimental: Add another component that must run after this one.

add_upstream_node

Experimental: Add another component that must run before this one.

add_upstream_nodes

Experimental: Add components that must run before this one.

from_json_dict

Convert from dictionary data to an object.

to_json_dict

Convert from an object to a JSON serializable dictionary.

ATTRIBUTE DESCRIPTION
id

Node id, unique across all TFX nodes in a pipeline.

TYPE: str

outputs

Output Channel dict that contains imported artifacts.

TYPE: Dict[str, Any]

Source code in tfx/dsl/components/common/importer.py
def __init__(self,
             source_uri: str,
             artifact_type: Type[types.Artifact],
             reimport: Optional[bool] = False,
             properties: Optional[Dict[str, Union[str, int]]] = None,
             custom_properties: Optional[Dict[str, Union[str, int]]] = None,
             output_key: Optional[str] = None):
  """Init function for the Importer.

  Args:
    source_uri: the URI of the resource that needs to be registered.
    artifact_type: the type of the artifact to import.
    reimport: whether or not to re-import as a new artifact if the URI has
      been imported in before.
    properties: Dictionary of properties for the imported Artifact. These
      properties should be ones declared for the given artifact_type (see the
      PROPERTIES attribute of the definition of the type for details).
    custom_properties: Dictionary of custom properties for the imported
      Artifact. These properties should be of type Text or int.
    output_key: The key to use for the imported artifact in the Importer's
      output dictionary. Defaults to 'result'.
  """
  self._source_uri = source_uri
  self._reimport = reimport
  self._output_key = output_key or IMPORT_RESULT_KEY

  artifact = artifact_type()
  artifact.is_external = True
  _set_artifact_properties(artifact, properties, custom_properties)

  output_channel = types.OutputChannel(
      artifact_type,
      producer_component=self,
      output_key=self._output_key,
      additional_properties=properties,
      additional_custom_properties=custom_properties)

  # TODO(b/161490287): remove static artifacts.
  output_channel.set_artifacts([artifact])
  self._output_dict = {self._output_key: output_channel}

  super().__init__(driver_class=ImporterDriver)
Attributes
id property writable
id: str

Node id, unique across all TFX nodes in a pipeline.

If id is set by the user, return it directly. Otherwise, return .

RETURNS DESCRIPTION
str

node id.

outputs property
outputs: Dict[str, Any]

Output Channel dict that contains imported artifacts.

Functions
add_downstream_node
add_downstream_node(downstream_node)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_node.

PARAMETER DESCRIPTION
downstream_node

a component that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_node(self, downstream_node):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_node`.

  Args:
    downstream_node: a component that must run after this node.
  """
  self._downstream_nodes.add(downstream_node)
  if self not in downstream_node.upstream_nodes:
    downstream_node.add_upstream_node(self)
add_downstream_nodes
add_downstream_nodes(downstream_nodes)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_nodes.

PARAMETER DESCRIPTION
downstream_nodes

a list of components that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_nodes(self, downstream_nodes):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_nodes`.

  Args:
    downstream_nodes: a list of components that must run after this node.
  """
  self._downstream_nodes.update(downstream_nodes)
  for downstream_node in downstream_nodes:
    if self not in downstream_node.upstream_nodes:
      downstream_node.add_upstream_node(self)
add_upstream_node
add_upstream_node(upstream_node)

Experimental: Add another component that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_downstream_node.

PARAMETER DESCRIPTION
upstream_node

a component that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_node(self, upstream_node):
  """Experimental: Add another component that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_downstream_node`.

  Args:
    upstream_node: a component that must run before this node.
  """
  self._upstream_nodes.add(upstream_node)
  if self not in upstream_node.downstream_nodes:
    upstream_node.add_downstream_node(self)
add_upstream_nodes
add_upstream_nodes(upstream_nodes)

Experimental: Add components that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

PARAMETER DESCRIPTION
upstream_nodes

a list of components that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_nodes(self, upstream_nodes):
  """Experimental: Add components that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.


  Args:
    upstream_nodes: a list of components that must run before this node.
  """
  self._upstream_nodes.update(upstream_nodes)
  for upstream_node in upstream_nodes:
    if self not in upstream_node.downstream_nodes:
      upstream_node.add_downstream_node(self)
from_json_dict classmethod
from_json_dict(dict_data: Dict[str, Any]) -> Any

Convert from dictionary data to an object.

Source code in tfx/utils/json_utils.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def from_json_dict(cls, dict_data: Dict[str, Any]) -> Any:
  """Convert from dictionary data to an object."""
  instance = cls.__new__(cls)
  instance.__dict__ = dict_data
  return instance
to_json_dict
to_json_dict() -> Dict[str, Any]

Convert from an object to a JSON serializable dictionary.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def to_json_dict(self) -> Dict[str, Any]:
  """Convert from an object to a JSON serializable dictionary."""
  return dict((k, v)
              for k, v in self.__dict__.items()
              if k not in ['_upstream_nodes', '_downstream_nodes'])

Pipeline

Pipeline(pipeline_name: str, pipeline_root: Optional[Union[str, Placeholder]] = '', metadata_connection_config: Optional[ConnectionConfigType] = None, components: Iterable[BaseNode] = (), enable_cache: bool = False, beam_pipeline_args: Optional[List[Union[str, Placeholder]]] = None, platform_config: Optional[Message] = None, execution_mode: ExecutionMode = SYNC, inputs: Optional[PipelineInputs] = None, outputs: Optional[Dict[str, OutputChannel]] = None, dsl_context_registry: Optional[DslContextRegistry] = None)

Bases: BaseNode

Logical TFX pipeline object.

Pipeline object represents the DAG of TFX components, which can be run using one of the pipeline orchestration systems that TFX supports. For details, please refer to the guide.

ATTRIBUTE DESCRIPTION
components

A deterministic list of logical components of this pipeline, which are deduped and topologically sorted.

enable_cache

Whether or not cache is enabled for this run.

metadata_connection_config

The config to connect to ML metadata.

execution_mode

Execution mode of the pipeline. Currently only support synchronous execution mode.

beam_pipeline_args

Pipeline arguments for Beam powered Components. Use with_beam_pipeline_args to set component level Beam args.

platform_config

Pipeline level platform config, in proto form.

Initialize pipeline.

PARAMETER DESCRIPTION
pipeline_name

Name of the pipeline;

TYPE: str

pipeline_root

Path to root directory of the pipeline. This will most often be just a string. Some orchestrators may have limited support for constructing this from a Placeholder, e.g. a RuntimeInfoPlaceholder that refers to fields from the platform config. pipeline_root is optional only if the pipeline is composed within another parent pipeline, in which case it will inherit its parent pipeline's root.

TYPE: Optional[Union[str, Placeholder]] DEFAULT: ''

metadata_connection_config

The config to connect to ML metadata.

TYPE: Optional[ConnectionConfigType] DEFAULT: None

components

Optional list of components to construct the pipeline.

TYPE: Iterable[BaseNode] DEFAULT: ()

enable_cache

Whether or not cache is enabled for this run.

TYPE: bool DEFAULT: False

beam_pipeline_args

Pipeline arguments for Beam powered Components.

TYPE: Optional[List[Union[str, Placeholder]]] DEFAULT: None

platform_config

Pipeline level platform config, in proto form.

TYPE: Optional[Message] DEFAULT: None

execution_mode

The execution mode of the pipeline, can be SYNC or ASYNC.

TYPE: ExecutionMode DEFAULT: SYNC

inputs

Optional inputs of a pipeline.

TYPE: Optional[PipelineInputs] DEFAULT: None

outputs

Optional outputs of a pipeline.

TYPE: Optional[Dict[str, OutputChannel]] DEFAULT: None

dsl_context_registry

DslContextRegistry to use for this pipeline, if not provided then the current context (potentially a new DslContext) will be used.

TYPE: Optional[DslContextRegistry] DEFAULT: None

METHOD DESCRIPTION
add_downstream_node

Experimental: Add another component that must run after this one.

add_downstream_nodes

Experimental: Add another component that must run after this one.

add_upstream_node

Experimental: Add another component that must run before this one.

add_upstream_nodes

Experimental: Add components that must run before this one.

from_json_dict

Convert from dictionary data to an object.

to_json_dict

Convert from an object to a JSON serializable dictionary.

ATTRIBUTE DESCRIPTION
beam_pipeline_args

Beam pipeline args used for all components in the pipeline.

components

A deterministic list of logical components that are deduped and topologically sorted.

id

Node id, unique across all TFX nodes in a pipeline.

Source code in tfx/orchestration/pipeline.py
def __init__(
    self,
    pipeline_name: str,
    pipeline_root: Optional[Union[str, ph.Placeholder]] = '',
    metadata_connection_config: Optional[
        metadata.ConnectionConfigType
    ] = None,
    components: Iterable[base_node.BaseNode] = (),
    enable_cache: bool = False,
    beam_pipeline_args: Optional[List[Union[str, ph.Placeholder]]] = None,
    platform_config: Optional[message.Message] = None,
    execution_mode: ExecutionMode = ExecutionMode.SYNC,
    inputs: Optional[PipelineInputs] = None,
    outputs: Optional[Dict[str, channel.OutputChannel]] = None,
    dsl_context_registry: Optional[
        dsl_context_registry_lib.DslContextRegistry
    ] = None,
):
  """Initialize pipeline.

  Args:
    pipeline_name: Name of the pipeline;
    pipeline_root: Path to root directory of the pipeline. This will most
      often be just a string. Some orchestrators may have limited support for
      constructing this from a Placeholder, e.g. a RuntimeInfoPlaceholder that
      refers to fields from the platform config. pipeline_root is optional
      only if the pipeline is composed within another parent pipeline, in
      which case it will inherit its parent pipeline's root.
    metadata_connection_config: The config to connect to ML metadata.
    components: Optional list of components to construct the pipeline.
    enable_cache: Whether or not cache is enabled for this run.
    beam_pipeline_args: Pipeline arguments for Beam powered Components.
    platform_config: Pipeline level platform config, in proto form.
    execution_mode: The execution mode of the pipeline, can be SYNC or ASYNC.
    inputs: Optional inputs of a pipeline.
    outputs: Optional outputs of a pipeline.
    dsl_context_registry: DslContextRegistry to use for this pipeline, if not
      provided then the current context (potentially a new DslContext) will be
      used.
  """
  if len(pipeline_name) > _MAX_PIPELINE_NAME_LENGTH:
    raise ValueError(
        f'pipeline {pipeline_name} exceeds maximum allowed length: {_MAX_PIPELINE_NAME_LENGTH}.'
    )
  self.pipeline_name = pipeline_name

  # Registry extraction should come before super().__init__() which put self
  # to the active DslContextRegistry.
  self._dsl_context_registry = dsl_context_registry
  if self._dsl_context_registry is None:
    parent_reg = dsl_context_registry_lib.get()
    self._dsl_context_registry = parent_reg.extract_for_pipeline(components)

  # Initialize pipeline as a node.
  super().__init__()

  if inputs:
    inputs.pipeline = self
  self._inputs = inputs
  if outputs:
    self._outputs = {
        k: channel.PipelineOutputChannel(v, pipeline=self, output_key=k)
        for k, v in outputs.items()
    }
  else:
    self._outputs = {}
  self._id = pipeline_name

  # Once pipeline is finalized, this instance is regarded as immutable and
  # any detectable mutation will raise an error.
  self._finalized = False

  # TODO(b/183621450): deprecate PipelineInfo.
  self.pipeline_info = data_types.PipelineInfo(  # pylint: disable=g-missing-from-attributes
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root)
  self.enable_cache = enable_cache
  self.metadata_connection_config = metadata_connection_config
  self.execution_mode = execution_mode

  self._beam_pipeline_args = beam_pipeline_args or []

  self.platform_config = platform_config

  # TODO: b/324635891 - Remove all references and clean this up.
  self.additional_pipeline_args = {}

  # TODO(b/216581002): Use self._dsl_context_registry to obtain components.
  self._components = []
  if components:
    self._set_components(components)
Attributes
beam_pipeline_args property
beam_pipeline_args

Beam pipeline args used for all components in the pipeline.

components property writable
components

A deterministic list of logical components that are deduped and topologically sorted.

id property
id

Node id, unique across all TFX nodes in a pipeline.

If id is set by the user, return it directly. Otherwise, return .

RETURNS DESCRIPTION
str

node id.

Functions
add_downstream_node
add_downstream_node(downstream_node)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_node.

PARAMETER DESCRIPTION
downstream_node

a component that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_node(self, downstream_node):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_node`.

  Args:
    downstream_node: a component that must run after this node.
  """
  self._downstream_nodes.add(downstream_node)
  if self not in downstream_node.upstream_nodes:
    downstream_node.add_upstream_node(self)
add_downstream_nodes
add_downstream_nodes(downstream_nodes)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_nodes.

PARAMETER DESCRIPTION
downstream_nodes

a list of components that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_nodes(self, downstream_nodes):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_nodes`.

  Args:
    downstream_nodes: a list of components that must run after this node.
  """
  self._downstream_nodes.update(downstream_nodes)
  for downstream_node in downstream_nodes:
    if self not in downstream_node.upstream_nodes:
      downstream_node.add_upstream_node(self)
add_upstream_node
add_upstream_node(upstream_node)

Experimental: Add another component that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_downstream_node.

PARAMETER DESCRIPTION
upstream_node

a component that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_node(self, upstream_node):
  """Experimental: Add another component that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_downstream_node`.

  Args:
    upstream_node: a component that must run before this node.
  """
  self._upstream_nodes.add(upstream_node)
  if self not in upstream_node.downstream_nodes:
    upstream_node.add_downstream_node(self)
add_upstream_nodes
add_upstream_nodes(upstream_nodes)

Experimental: Add components that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

PARAMETER DESCRIPTION
upstream_nodes

a list of components that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_nodes(self, upstream_nodes):
  """Experimental: Add components that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.


  Args:
    upstream_nodes: a list of components that must run before this node.
  """
  self._upstream_nodes.update(upstream_nodes)
  for upstream_node in upstream_nodes:
    if self not in upstream_node.downstream_nodes:
      upstream_node.add_downstream_node(self)
from_json_dict classmethod
from_json_dict(dict_data: Dict[str, Any]) -> Any

Convert from dictionary data to an object.

Source code in tfx/utils/json_utils.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def from_json_dict(cls, dict_data: Dict[str, Any]) -> Any:
  """Convert from dictionary data to an object."""
  instance = cls.__new__(cls)
  instance.__dict__ = dict_data
  return instance
to_json_dict
to_json_dict() -> Dict[str, Any]

Convert from an object to a JSON serializable dictionary.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def to_json_dict(self) -> Dict[str, Any]:
  """Convert from an object to a JSON serializable dictionary."""
  return dict((k, v)
              for k, v in self.__dict__.items()
              if k not in ['_upstream_nodes', '_downstream_nodes'])

Resolver

Resolver(strategy_class: Optional[Type[ResolverStrategy]] = None, config: Optional[Dict[str, JsonableType]] = None, **channels: BaseChannel)

Bases: BaseNode

Definition for TFX Resolver.

Resolver is a special TFX node which handles special artifact resolution logics that will be used as inputs for downstream nodes.

To use Resolver, pass the followings to the Resolver constructor:

  • Name of the Resolver instance
  • A subclass of ResolverStrategy
  • Configs that will be used to construct an instance of ResolverStrategy
  • Channels to resolve with their tag, in the form of kwargs

Here is an example:

example_gen = ImportExampleGen(...)
examples_resolver = Resolver(
    strategy_class=tfx.dsl.experimental.SpanRangeStrategy,
    config={"range_config": range_config},
    examples=Channel(
        type=Examples,
        producer_component_id=example_gen.id,
    ),
).with_id("Resolver.span_resolver")
trainer = Trainer(
    examples=examples_resolver.outputs["examples"],
    ...,
)

You can find experimental ResolverStrategy classes under tfx.v1.dsl.experimental module, including LatestArtifactStrategy, LatestBlessedModelStrategy, SpanRangeStrategy, etc.

Init function for Resolver.

PARAMETER DESCRIPTION
strategy_class

Optional ResolverStrategy which contains the artifact resolution logic.

TYPE: Optional[Type[ResolverStrategy]] DEFAULT: None

config

Optional dict of key to Jsonable type for constructing resolver_strategy.

TYPE: Optional[Dict[str, JsonableType]] DEFAULT: None

**channels

Input channels to the Resolver node as keyword arguments.

TYPE: BaseChannel DEFAULT: {}

METHOD DESCRIPTION
add_downstream_node

Experimental: Add another component that must run after this one.

add_downstream_nodes

Experimental: Add another component that must run after this one.

add_upstream_node

Experimental: Add another component that must run before this one.

add_upstream_nodes

Experimental: Add components that must run before this one.

from_json_dict

Convert from dictionary data to an object.

to_json_dict

Convert from an object to a JSON serializable dictionary.

ATTRIBUTE DESCRIPTION
id

Node id, unique across all TFX nodes in a pipeline.

TYPE: str

outputs

Output Channel dict that contains resolved artifacts.

TYPE: Dict[str, OutputChannel]

Source code in tfx/dsl/components/common/resolver.py
def __init__(self,
             strategy_class: Optional[Type[ResolverStrategy]] = None,
             config: Optional[Dict[str, json_utils.JsonableType]] = None,
             **channels: types.BaseChannel):
  """Init function for Resolver.

  Args:
    strategy_class: Optional `ResolverStrategy` which contains the artifact
        resolution logic.
    config: Optional dict of key to Jsonable type for constructing
        resolver_strategy.
    **channels: Input channels to the Resolver node as keyword arguments.
  """
  if (strategy_class is not None and
      not issubclass(strategy_class, ResolverStrategy)):
    raise TypeError('strategy_class should be ResolverStrategy, but got '
                    f'{strategy_class} instead.')
  if strategy_class is None and config is not None:
    raise ValueError('Cannot use config parameter without strategy_class.')
  for input_key, channel in channels.items():
    if not isinstance(channel, channel_types.BaseChannel):
      raise ValueError(f'Resolver got non-BaseChannel argument {input_key}.')
  self._strategy_class = strategy_class
  self._config = config or {}
  # An observed inputs from DSL as if Resolver node takes an inputs.
  # TODO(b/246907396): Remove raw_inputs usage.
  self._raw_inputs = dict(channels)
  if strategy_class is not None:
    output_node = resolver_op.OpNode(
        op_type=strategy_class,
        output_data_type=resolver_op.DataType.ARTIFACT_MULTIMAP,
        args=[
            resolver_op.DictNode({
                input_key: resolver_op.InputNode(channel)
                for input_key, channel in channels.items()
            })
        ],
        kwargs=self._config)
    self._input_dict = {
        k: resolved_channel.ResolvedChannel(c.type, output_node, k)
        for k, c in channels.items()
    }
  else:
    self._input_dict = channels
  self._output_dict = {
      input_key: types.OutputChannel(channel.type, self, input_key)
      for input_key, channel in channels.items()
  }
  super().__init__(driver_class=_ResolverDriver)
Attributes
id property writable
id: str

Node id, unique across all TFX nodes in a pipeline.

If id is set by the user, return it directly. Otherwise, return .

RETURNS DESCRIPTION
str

node id.

outputs property
outputs: Dict[str, OutputChannel]

Output Channel dict that contains resolved artifacts.

Functions
add_downstream_node
add_downstream_node(downstream_node)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_node.

PARAMETER DESCRIPTION
downstream_node

a component that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_node(self, downstream_node):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_node`.

  Args:
    downstream_node: a component that must run after this node.
  """
  self._downstream_nodes.add(downstream_node)
  if self not in downstream_node.upstream_nodes:
    downstream_node.add_upstream_node(self)
add_downstream_nodes
add_downstream_nodes(downstream_nodes)

Experimental: Add another component that must run after this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_upstream_nodes.

PARAMETER DESCRIPTION
downstream_nodes

a list of components that must run after this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_downstream_nodes(self, downstream_nodes):
  """Experimental: Add another component that must run after this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_upstream_nodes`.

  Args:
    downstream_nodes: a list of components that must run after this node.
  """
  self._downstream_nodes.update(downstream_nodes)
  for downstream_node in downstream_nodes:
    if self not in downstream_node.upstream_nodes:
      downstream_node.add_upstream_node(self)
add_upstream_node
add_upstream_node(upstream_node)

Experimental: Add another component that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

It is symmetric with add_downstream_node.

PARAMETER DESCRIPTION
upstream_node

a component that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_node(self, upstream_node):
  """Experimental: Add another component that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.

  It is symmetric with `add_downstream_node`.

  Args:
    upstream_node: a component that must run before this node.
  """
  self._upstream_nodes.add(upstream_node)
  if self not in upstream_node.downstream_nodes:
    upstream_node.add_downstream_node(self)
add_upstream_nodes
add_upstream_nodes(upstream_nodes)

Experimental: Add components that must run before this one.

This method enables task-based dependencies by enforcing execution order for synchronous pipelines on supported platforms. Currently, the supported platforms are Airflow, Beam, and Kubeflow Pipelines.

Note that this API call should be considered experimental, and may not work with asynchronous pipelines, sub-pipelines and pipelines with conditional nodes. We also recommend relying on data for capturing dependencies where possible to ensure data lineage is fully captured within MLMD.

PARAMETER DESCRIPTION
upstream_nodes

a list of components that must run before this node.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def add_upstream_nodes(self, upstream_nodes):
  """Experimental: Add components that must run before this one.

  This method enables task-based dependencies by enforcing execution order for
  synchronous pipelines on supported platforms. Currently, the supported
  platforms are Airflow, Beam, and Kubeflow Pipelines.

  Note that this API call should be considered experimental, and may not work
  with asynchronous pipelines, sub-pipelines and pipelines with conditional
  nodes. We also recommend relying on data for capturing dependencies where
  possible to ensure data lineage is fully captured within MLMD.


  Args:
    upstream_nodes: a list of components that must run before this node.
  """
  self._upstream_nodes.update(upstream_nodes)
  for upstream_node in upstream_nodes:
    if self not in upstream_node.downstream_nodes:
      upstream_node.add_downstream_node(self)
from_json_dict classmethod
from_json_dict(dict_data: Dict[str, Any]) -> Any

Convert from dictionary data to an object.

Source code in tfx/utils/json_utils.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def from_json_dict(cls, dict_data: Dict[str, Any]) -> Any:
  """Convert from dictionary data to an object."""
  instance = cls.__new__(cls)
  instance.__dict__ = dict_data
  return instance
to_json_dict
to_json_dict() -> Dict[str, Any]

Convert from an object to a JSON serializable dictionary.

Source code in tfx/dsl/components/base/base_node.py
@doc_controls.do_not_doc_in_subclasses
def to_json_dict(self) -> Dict[str, Any]:
  """Convert from an object to a JSON serializable dictionary."""
  return dict((k, v)
              for k, v in self.__dict__.items()
              if k not in ['_upstream_nodes', '_downstream_nodes'])

Modules

components

TFX DSL components module.

CLASS DESCRIPTION
AsyncOutputArtifact

Intermediate artifact object type annotation.

BeamComponentParameter

Component parameter type annotation.

InputArtifact

Input artifact object type annotation.

OutputArtifact

Output artifact object type annotation.

OutputDict

Decorator declaring component executor function outputs.

Parameter

Component parameter type annotation.

FUNCTION DESCRIPTION
component

Decorator: creates a component from a typehint-annotated Python function.

Classes
AsyncOutputArtifact

Bases: Generic[T]

Intermediate artifact object type annotation.

BeamComponentParameter
BeamComponentParameter(artifact_type: Type[_BeamPipeline], _init_via_getitem=False)

Bases: _PipelineTypeGeneric

Component parameter type annotation.

Source code in tfx/dsl/component/experimental/annotations.py
def __init__(  # pylint: disable=invalid-name
    self, artifact_type: Type[_BeamPipeline], _init_via_getitem=False
):
  if not _init_via_getitem:
    class_name = self.__class__.__name__
    raise ValueError(
        (
            '%s should be instantiated via the syntax `%s[T]`, where T is '
            '`beam.Pipeline`.'
        )
        % (class_name, class_name)
    )
  self._type = artifact_type
InputArtifact
InputArtifact(artifact_type: Union[Type[Artifact], Type[List[Artifact]]], _init_via_getitem=False)

Bases: _ArtifactGeneric

Input artifact object type annotation.

Source code in tfx/dsl/component/experimental/annotations.py
def __init__(  # pylint: disable=invalid-name
    self,
    artifact_type: Union[
        Type[artifact.Artifact], Type[List[artifact.Artifact]]
    ],
    _init_via_getitem=False,
):
  if not _init_via_getitem:
    class_name = self.__class__.__name__
    raise ValueError(
        '%s should be instantiated via the syntax `%s[T]`, where T is a '
        'subclass of tfx.types.Artifact.' % (class_name, class_name)
    )
  self.type = artifact_type
OutputArtifact
OutputArtifact(artifact_type: Union[Type[Artifact], Type[List[Artifact]]], _init_via_getitem=False)

Bases: _ArtifactGeneric

Output artifact object type annotation.

Source code in tfx/dsl/component/experimental/annotations.py
def __init__(  # pylint: disable=invalid-name
    self,
    artifact_type: Union[
        Type[artifact.Artifact], Type[List[artifact.Artifact]]
    ],
    _init_via_getitem=False,
):
  if not _init_via_getitem:
    class_name = self.__class__.__name__
    raise ValueError(
        '%s should be instantiated via the syntax `%s[T]`, where T is a '
        'subclass of tfx.types.Artifact.' % (class_name, class_name)
    )
  self.type = artifact_type
OutputDict
OutputDict(**kwargs)

Decorator declaring component executor function outputs.

Now @component can understand TypedDict return type annotation as well, so please use a TypedDict instead of using an OutputDict.

Source code in tfx/dsl/component/experimental/annotations.py
@deprecation_utils.deprecated('2023-08-25', 'Please use TypedDict instead.')
def __init__(self, **kwargs):
  self.kwargs = kwargs
Parameter
Parameter(artifact_type: Type[Union[int, float, str, bool, Message]], _init_via_getitem=False)

Bases: _PrimitiveAndProtoTypeGeneric

Component parameter type annotation.

Source code in tfx/dsl/component/experimental/annotations.py
def __init__(  # pylint: disable=invalid-name
    self,
    artifact_type: Type[Union[int, float, str, bool, message.Message]],
    _init_via_getitem=False,
):
  if not _init_via_getitem:
    class_name = self.__class__.__name__
    raise ValueError(
        (
            '%s should be instantiated via the syntax `%s[T]`, where T is '
            '`int`, `float`, `str`, `bool` or proto type.'
        )
        % (class_name, class_name)
    )
  self._type = artifact_type
Functions
component
component(*, component_annotation: Optional[type[SystemExecution]] = None, use_beam: bool = False) -> Callable[[FunctionType], BaseFunctionalComponentFactory]
component(func: Optional[FunctionType] = None, /, *, component_annotation: Optional[Type[SystemExecution]] = None, use_beam: bool = False) -> Union[BaseFunctionalComponentFactory, Callable[[FunctionType], BaseFunctionalComponentFactory]]

Decorator: creates a component from a typehint-annotated Python function.

This decorator creates a component based on typehint annotations specified for the arguments and return value for a Python function. The decorator can be supplied with a parameter component_annotation to specify the annotation for this component decorator. This annotation hints which system execution type this python function-based component belongs to. Specifically, function arguments can be annotated with the following types and associated semantics:

  • Parameter[T] where T is int, float, str, or bool: indicates that a primitive type execution parameter, whose value is known at pipeline construction time, will be passed for this argument. These parameters will be recorded in ML Metadata as part of the component's execution record. Can be an optional argument.
  • int, float, str, bytes, bool, Dict, List: indicates that a primitive type value will be passed for this argument. This value is tracked as an Integer, Float, String, Bytes, Boolean or JsonValue artifact (see tfx.types.standard_artifacts) whose value is read and passed into the given Python component function. Can be an optional argument.
  • InputArtifact[ArtifactType]: indicates that an input artifact object of type ArtifactType (deriving from tfx.types.Artifact) will be passed for this argument. This artifact is intended to be consumed as an input by this component (possibly reading from the path specified by its .uri). Can be an optional argument by specifying a default value of None.
  • OutputArtifact[ArtifactType]: indicates that an output artifact object of type ArtifactType (deriving from tfx.types.Artifact) will be passed for this argument. This artifact is intended to be emitted as an output by this component (and written to the path specified by its .uri). Cannot be an optional argument.

The return value typehint should be either empty or None, in the case of a component function that has no return values, or a TypedDict of primitive value types (int, float, str, bytes, bool, dict or list; or Optional[T], where T is a primitive type value, in which case None can be returned), to indicate that the return value is a dictionary with specified keys and value types.

Note that output artifacts should not be included in the return value typehint; they should be included as OutputArtifact annotations in the function inputs, as described above.

The function to which this decorator is applied must be at the top level of its Python module (it may not be defined within nested classes or function closures).

This is example usage of component definition using this decorator:

from tfx import v1 as tfx

InputArtifact = tfx.dsl.components.InputArtifact
OutputArtifact = tfx.dsl.components.OutputArtifact
Parameter = tfx.dsl.components.Parameter
Examples = tfx.types.standard_artifacts.Examples
Model = tfx.types.standard_artifacts.Model


class MyOutput(TypedDict):
    loss: float
    accuracy: float


@component(component_annotation=tfx.dsl.standard_annotations.Train)
def MyTrainerComponent(
    training_data: InputArtifact[Examples],
    model: OutputArtifact[Model],
    dropout_hyperparameter: float,
    num_iterations: Parameter[int] = 10,
) -> MyOutput:
    """My simple trainer component."""

    records = read_examples(training_data.uri)
    model_obj = train_model(records, num_iterations, dropout_hyperparameter)
    model_obj.write_to(model.uri)

    return {"loss": model_obj.loss, "accuracy": model_obj.accuracy}


# Example usage in a pipeline graph definition:
# ...
trainer = MyTrainerComponent(
    training_data=example_gen.outputs["examples"],
    dropout_hyperparameter=other_component.outputs["dropout"],
    num_iterations=1000,
)
pusher = Pusher(model=trainer.outputs["model"])
# ...

When the parameter component_annotation is not supplied, the default value is None. This is another example usage with component_annotation = None:

@component
def MyTrainerComponent(
    training_data: InputArtifact[standard_artifacts.Examples],
    model: OutputArtifact[standard_artifacts.Model],
    dropout_hyperparameter: float,
    num_iterations: Parameter[int] = 10,
) -> Output:
    """My simple trainer component."""

    records = read_examples(training_data.uri)
    model_obj = train_model(records, num_iterations, dropout_hyperparameter)
    model_obj.write_to(model.uri)

    return {"loss": model_obj.loss, "accuracy": model_obj.accuracy}

When the parameter use_beam is True, one of the parameters of the decorated function type-annotated by BeamComponentParameter[beam.Pipeline] and the default value can only be None. It will be replaced by a beam Pipeline made with the tfx pipeline's beam_pipeline_args that's shared with other beam-based components:

@component(use_beam=True)
def DataProcessingComponent(
    input_examples: InputArtifact[standard_artifacts.Examples],
    output_examples: OutputArtifact[standard_artifacts.Examples],
    beam_pipeline: BeamComponentParameter[beam.Pipeline] = None,
) -> None:
    """My simple trainer component."""

    records = read_examples(training_data.uri)
    with beam_pipeline as p:
        ...

Experimental: no backwards compatibility guarantees.

PARAMETER DESCRIPTION
func

Typehint-annotated component executor function.

TYPE: Optional[FunctionType] DEFAULT: None

component_annotation

used to annotate the python function-based component. It is a subclass of SystemExecution from third_party/py/tfx/types/system_executions.py; it can be None.

TYPE: Optional[Type[SystemExecution]] DEFAULT: None

use_beam

Whether to create a component that is a subclass of BaseBeamComponent. This allows a beam.Pipeline to be made with tfx-pipeline-wise beam_pipeline_args.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
Union[BaseFunctionalComponentFactory, Callable[[FunctionType], BaseFunctionalComponentFactory]]

An object that:

  1. you can call like the initializer of a subclass of base_component.BaseComponent (or base_component.BaseBeamComponent).
  2. has a test_call() member function for unit testing the inner implementation of the component.

Today, the returned object is literally a subclass of BaseComponent, so it can be used as a Type e.g. in isinstance() checks. But you must not rely on this, as we reserve the right to reserve a different kind of object in the future, which only satisfies the two criteria (1.) and (2.) above without being a Type itself.

RAISES DESCRIPTION
EnvironmentError

if the current Python interpreter is not Python 3.

Source code in tfx/dsl/component/experimental/decorators.py
def component(
    func: Optional[types.FunctionType] = None,
    /,
    *,
    component_annotation: Optional[
        Type[system_executions.SystemExecution]
    ] = None,
    use_beam: bool = False,
) -> Union[
    BaseFunctionalComponentFactory,
    Callable[[types.FunctionType], BaseFunctionalComponentFactory],
]:
  '''Decorator: creates a component from a typehint-annotated Python function.

  This decorator creates a component based on typehint annotations specified for
  the arguments and return value for a Python function. The decorator can be
  supplied with a parameter `component_annotation` to specify the annotation for
  this component decorator. This annotation hints which system execution type
  this python function-based component belongs to.
  Specifically, function arguments can be annotated with the following types and
  associated semantics:

  * `Parameter[T]` where `T` is `int`, `float`, `str`, or `bool`:
    indicates that a primitive type execution parameter, whose value is known at
    pipeline construction time, will be passed for this argument. These
    parameters will be recorded in ML Metadata as part of the component's
    execution record. Can be an optional argument.
  * `int`, `float`, `str`, `bytes`, `bool`, `Dict`, `List`: indicates that a
    primitive type value will be passed for this argument. This value is tracked
    as an `Integer`, `Float`, `String`, `Bytes`, `Boolean` or `JsonValue`
    artifact (see `tfx.types.standard_artifacts`) whose value is read and passed
    into the given Python component function. Can be an optional argument.
  * `InputArtifact[ArtifactType]`: indicates that an input artifact object of
    type `ArtifactType` (deriving from `tfx.types.Artifact`) will be passed for
    this argument. This artifact is intended to be consumed as an input by this
    component (possibly reading from the path specified by its `.uri`). Can be
    an optional argument by specifying a default value of `None`.
  * `OutputArtifact[ArtifactType]`: indicates that an output artifact object of
    type `ArtifactType` (deriving from `tfx.types.Artifact`) will be passed for
    this argument. This artifact is intended to be emitted as an output by this
    component (and written to the path specified by its `.uri`). Cannot be an
    optional argument.

  The return value typehint should be either empty or `None`, in the case of a
  component function that has no return values, or a `TypedDict` of primitive
  value types (`int`, `float`, `str`, `bytes`, `bool`, `dict` or `list`; or
  `Optional[T]`, where T is a primitive type value, in which case `None` can be
  returned), to indicate that the return value is a dictionary with specified
  keys and value types.

  Note that output artifacts should not be included in the return value
  typehint; they should be included as `OutputArtifact` annotations in the
  function inputs, as described above.

  The function to which this decorator is applied must be at the top level of
  its Python module (it may not be defined within nested classes or function
  closures).

  This is example usage of component definition using this decorator:

  ``` python
  from tfx import v1 as tfx

  InputArtifact = tfx.dsl.components.InputArtifact
  OutputArtifact = tfx.dsl.components.OutputArtifact
  Parameter = tfx.dsl.components.Parameter
  Examples = tfx.types.standard_artifacts.Examples
  Model = tfx.types.standard_artifacts.Model


  class MyOutput(TypedDict):
      loss: float
      accuracy: float


  @component(component_annotation=tfx.dsl.standard_annotations.Train)
  def MyTrainerComponent(
      training_data: InputArtifact[Examples],
      model: OutputArtifact[Model],
      dropout_hyperparameter: float,
      num_iterations: Parameter[int] = 10,
  ) -> MyOutput:
      """My simple trainer component."""

      records = read_examples(training_data.uri)
      model_obj = train_model(records, num_iterations, dropout_hyperparameter)
      model_obj.write_to(model.uri)

      return {"loss": model_obj.loss, "accuracy": model_obj.accuracy}


  # Example usage in a pipeline graph definition:
  # ...
  trainer = MyTrainerComponent(
      training_data=example_gen.outputs["examples"],
      dropout_hyperparameter=other_component.outputs["dropout"],
      num_iterations=1000,
  )
  pusher = Pusher(model=trainer.outputs["model"])
  # ...
  ```

  When the parameter `component_annotation` is not supplied, the default value
  is None. This is another example usage with `component_annotation` = None:

  ``` python
  @component
  def MyTrainerComponent(
      training_data: InputArtifact[standard_artifacts.Examples],
      model: OutputArtifact[standard_artifacts.Model],
      dropout_hyperparameter: float,
      num_iterations: Parameter[int] = 10,
  ) -> Output:
      """My simple trainer component."""

      records = read_examples(training_data.uri)
      model_obj = train_model(records, num_iterations, dropout_hyperparameter)
      model_obj.write_to(model.uri)

      return {"loss": model_obj.loss, "accuracy": model_obj.accuracy}
  ```

  When the parameter `use_beam` is True, one of the parameters of the decorated
  function type-annotated by BeamComponentParameter[beam.Pipeline] and the
  default value can only be None. It will be replaced by a beam Pipeline made
  with the tfx pipeline's beam_pipeline_args that's shared with other beam-based
  components:

  ``` python
  @component(use_beam=True)
  def DataProcessingComponent(
      input_examples: InputArtifact[standard_artifacts.Examples],
      output_examples: OutputArtifact[standard_artifacts.Examples],
      beam_pipeline: BeamComponentParameter[beam.Pipeline] = None,
  ) -> None:
      """My simple trainer component."""

      records = read_examples(training_data.uri)
      with beam_pipeline as p:
          ...
  ```

  Experimental: no backwards compatibility guarantees.

  Args:
    func: Typehint-annotated component executor function.
    component_annotation: used to annotate the python function-based component.
      It is a subclass of SystemExecution from
      third_party/py/tfx/types/system_executions.py; it can be None.
    use_beam: Whether to create a component that is a subclass of
      BaseBeamComponent. This allows a beam.Pipeline to be made with
      tfx-pipeline-wise beam_pipeline_args.

  Returns:
    An object that:

      1. you can call like the initializer of a subclass of [`base_component.BaseComponent`][tfx.v1.types.BaseChannel] (or [`base_component.BaseBeamComponent`][tfx.v1.types.BaseBeamComponent]).
      2. has a test_call() member function for unit testing the inner implementation of the component.

      Today, the returned object is literally a subclass of [BaseComponent][tfx.v1.types.BaseChannel], so it can be used as a `Type` e.g. in isinstance() checks. But you must not rely on this, as we reserve the right to reserve a different kind of object in the future, which _only_ satisfies the two criteria (1.) and (2.) above without being a `Type` itself.

  Raises:
    EnvironmentError: if the current Python interpreter is not Python 3.
  '''
  if func is None:
    # Python decorators with arguments in parentheses result in two function
    # calls. The first function call supplies the kwargs and the second supplies
    # the decorated function. Here we forward the kwargs to the second call.
    return functools.partial(
        component,
        component_annotation=component_annotation,
        use_beam=use_beam,
    )

  utils.assert_is_top_level_func(func)

  (inputs, outputs, parameters, arg_formats, arg_defaults, returned_values,
   json_typehints, return_json_typehints) = (
       function_parser.parse_typehint_component_function(func))
  if use_beam and list(parameters.values()).count(_BeamPipeline) != 1:
    raise ValueError('The decorated function must have one and only one '
                     'optional parameter of type '
                     'BeamComponentParameter[beam.Pipeline] with '
                     'default value None when use_beam=True.')

  component_class = utils.create_component_class(
      func=func,
      arg_defaults=arg_defaults,
      arg_formats=arg_formats,
      base_executor_class=(
          _FunctionBeamExecutor if use_beam else _FunctionExecutor
      ),
      executor_spec_class=(
          executor_spec.BeamExecutorSpec
          if use_beam
          else executor_spec.ExecutorClassSpec
      ),
      base_component_class=(
          _SimpleBeamComponent if use_beam else _SimpleComponent
      ),
      inputs=inputs,
      outputs=outputs,
      parameters=parameters,
      type_annotation=component_annotation,
      json_compatible_inputs=json_typehints,
      json_compatible_outputs=return_json_typehints,
      return_values_optionality=returned_values,
  )
  return typing.cast(BaseFunctionalComponentFactory, component_class)

experimental

TFX dsl.experimental module.

CLASS DESCRIPTION
LatestArtifactStrategy

Strategy that resolves the latest n(=1) artifacts per each channel.

LatestBlessedModelStrategy

LatestBlessedModelStrategy resolves the latest blessed Model artifact.

ResolverStrategy

Base class for ResolverStrategy.

RuntimeParameter

Runtime parameter.

SpanRangeStrategy

SpanRangeStrategy resolves artifacts based on "span" property.

FUNCTION DESCRIPTION
create_container_component

Creates a container-based component.

Classes
LatestArtifactStrategy
LatestArtifactStrategy(desired_num_of_artifacts: Optional[int] = 1)

Bases: ResolverStrategy

Strategy that resolves the latest n(=1) artifacts per each channel.

Note that this ResolverStrategy is experimental and is subject to change in terms of both interface and implementation.

Don't construct LatestArtifactStrategy directly, example usage:

model_resolver.outputs['model']
model_resolver = Resolver(
    strategy_class=LatestArtifactStrategy,
    model=Channel(type=Model),
).with_id("latest_model_resolver")
model_resolver.outputs["model"]
METHOD DESCRIPTION
resolve_artifacts

Resolves artifacts from channels by querying MLMD.

Source code in tfx/dsl/input_resolution/strategies/latest_artifact_strategy.py
def __init__(self, desired_num_of_artifacts: Optional[int] = 1):
  self._desired_num_of_artifact = desired_num_of_artifacts
Functions
resolve_artifacts
resolve_artifacts(store: MetadataStore, input_dict: Dict[str, List[Artifact]]) -> Optional[Dict[str, List[Artifact]]]

Resolves artifacts from channels by querying MLMD.

PARAMETER DESCRIPTION
store

An MLMD MetadataStore object.

TYPE: MetadataStore

input_dict

The input_dict to resolve from.

TYPE: Dict[str, List[Artifact]]

RETURNS DESCRIPTION
Optional[Dict[str, List[Artifact]]]

If min_count for every input is met, returns a Dict[str, List[Artifact]]. Otherwise, return None.

Source code in tfx/dsl/input_resolution/strategies/latest_artifact_strategy.py
@doc_controls.do_not_generate_docs
def resolve_artifacts(
    self, store: mlmd.MetadataStore,
    input_dict: Dict[str, List[types.Artifact]]
) -> Optional[Dict[str, List[types.Artifact]]]:
  """Resolves artifacts from channels by querying MLMD.

  Args:
    store: An MLMD MetadataStore object.
    input_dict: The input_dict to resolve from.

  Returns:
    If `min_count` for every input is met, returns a
      Dict[str, List[Artifact]]. Otherwise, return None.
  """
  resolved_dict = self._resolve(input_dict)
  all_min_count_met = all(
      len(artifact_list) >= self._desired_num_of_artifact
      for artifact_list in resolved_dict.values())
  return resolved_dict if all_min_count_met else None
LatestBlessedModelStrategy

Bases: ResolverStrategy

LatestBlessedModelStrategy resolves the latest blessed Model artifact.

Note that this ResolverStrategy is experimental and is subject to change in terms of both interface and implementation.

Don't construct LatestBlessedModelStrategy directly, example usage:

model_resolver.outputs['model']
model_resolver = Resolver(
    strategy_class=LatestBlessedModelStrategy,
    model=Channel(type=Model),
    model_blessing=Channel(type=ModelBlessing),
).with_id("latest_blessed_model_resolver")
model_resolver.outputs["model"]
METHOD DESCRIPTION
resolve_artifacts

Resolves artifacts from channels by querying MLMD.

Functions
resolve_artifacts
resolve_artifacts(store: MetadataStore, input_dict: Dict[str, List[Artifact]]) -> Optional[Dict[str, List[Artifact]]]

Resolves artifacts from channels by querying MLMD.

PARAMETER DESCRIPTION
store

An MLMD MetadataStore object.

TYPE: MetadataStore

input_dict

The input_dict to resolve from.

TYPE: Dict[str, List[Artifact]]

RETURNS DESCRIPTION
Optional[Dict[str, List[Artifact]]]

The latest blessed Model and its corresponding ModelBlessing, respectively in the same input channel they were contained to.

RAISES DESCRIPTION
RuntimeError

if input_dict contains unsupported artifact types.

Source code in tfx/dsl/input_resolution/strategies/latest_blessed_model_strategy.py
@doc_controls.do_not_generate_docs
def resolve_artifacts(
    self, store: mlmd.MetadataStore,
    input_dict: Dict[str, List[types.Artifact]]
) -> Optional[Dict[str, List[types.Artifact]]]:
  """Resolves artifacts from channels by querying MLMD.

  Args:
    store: An MLMD MetadataStore object.
    input_dict: The input_dict to resolve from.

  Returns:
    The latest blessed Model and its corresponding [ModelBlessing][tfx.v1.types.standard_artifacts.ModelBlessing], respectively
      in the same input channel they were contained to.

  Raises:
    RuntimeError: if input_dict contains unsupported artifact types.
  """
  model_channel_key = None
  model_blessing_channel_key = None
  assert len(input_dict) == 2, 'Expecting 2 input Channels'
  for k, artifact_list in input_dict.items():
    if not artifact_list:
      # If model or model blessing channel has no artifacts, the min_count
      # can not be met, short cut to return empty dict here.
      return {key: [] for key in input_dict}
    artifact = artifact_list[0]
    if issubclass(type(artifact), standard_artifacts.Model):
      model_channel_key = k
    elif issubclass(type(artifact), standard_artifacts.ModelBlessing):
      model_blessing_channel_key = k
    else:
      raise RuntimeError('Only expecting Model or ModelBlessing, got %s' %
                         artifact.TYPE_NAME)
  assert model_channel_key is not None, 'Expecting Model as input'
  assert model_blessing_channel_key is not None, ('Expecting ModelBlessing as'
                                                  ' input')

  result = self._resolve(input_dict, model_channel_key,
                         model_blessing_channel_key)
  return result
ResolverStrategy

Bases: ABC

Base class for ResolverStrategy.

ResolverStrategy is used with tfx.dsl.Resolver to express the input resolution logic. Currently TFX supports the following builtin ResolverStrategy:

A resolver strategy defines a type behavior used for input selection. A resolver strategy subclass must override the resolve_artifacts() function which takes a Dict[str, List[Artifact]] as parameters and returns the resolved dict of the same type.

METHOD DESCRIPTION
resolve_artifacts

Resolves artifacts from channels, optionally querying MLMD if needed.

Functions
resolve_artifacts abstractmethod
resolve_artifacts(store: MetadataStore, input_dict: Dict[str, List[Artifact]]) -> Optional[Dict[str, List[Artifact]]]

Resolves artifacts from channels, optionally querying MLMD if needed.

In asynchronous execution mode, resolver classes may composed in sequence where the resolve_artifacts() result from the previous resolver instance would be passed to the next resolver instance's resolve_artifacts() inputs.

If resolve_artifacts() returns None, it is considered as "no inputs available", and the remaining resolvers will not be executed.

Also if resolve_artifacts() omits any key from the input_dict it will not be available from the downstream resolver instances. General recommendation is to preserve all keys in the input_dict unless you have specific reason.

PARAMETER DESCRIPTION
store

An MLMD MetadataStore.

TYPE: MetadataStore

input_dict

The input_dict to resolve from.

TYPE: Dict[str, List[Artifact]]

RETURNS DESCRIPTION
Optional[Dict[str, List[Artifact]]]

If all entries has enough data after the resolving, returns the resolved input_dict. Otherise, return None.

Source code in tfx/dsl/components/common/resolver.py
@abc.abstractmethod
def resolve_artifacts(
    self, store: mlmd.MetadataStore,
    input_dict: Dict[str, List[types.Artifact]]
) -> Optional[Dict[str, List[types.Artifact]]]:
  """Resolves artifacts from channels, optionally querying MLMD if needed.

  In asynchronous execution mode, resolver classes may composed in sequence
  where the resolve_artifacts() result from the previous resolver instance
  would be passed to the next resolver instance's resolve_artifacts() inputs.

  If resolve_artifacts() returns None, it is considered as "no inputs
  available", and the remaining resolvers will not be executed.

  Also if resolve_artifacts() omits any key from the input_dict it will not
  be available from the downstream resolver instances. General recommendation
  is to preserve all keys in the input_dict unless you have specific reason.

  Args:
    store: An MLMD MetadataStore.
    input_dict: The input_dict to resolve from.

  Returns:
    If all entries has enough data after the resolving, returns the resolved
      input_dict. Otherise, return None.
  """
RuntimeParameter
RuntimeParameter(name: str, ptype: Optional[Type] = None, default: Optional[Union[int, float, str]] = None, description: Optional[str] = None)

Bases: Jsonable

Runtime parameter.

Currently only supported on KubeflowV2DagRunner.

For protos, use text type RuntimeParameter, which holds the proto json string, e.g., '{"num_steps": 5}' for TrainArgs proto.

ATTRIBUTE DESCRIPTION
name

The name of the runtime parameter.

default

Default value for runtime params when it's not explicitly specified.

ptype

The type of the runtime parameter.

description

Description of the usage of the parameter.

METHOD DESCRIPTION
from_json_dict

Convert from dictionary data to an object.

to_json_dict

Convert from an object to a JSON serializable dictionary.

Source code in tfx/orchestration/data_types.py
def __init__(
    self,
    name: str,
    ptype: Optional[Type] = None,  # pylint: disable=g-bare-generic
    default: Optional[Union[int, float, str]] = None,
    description: Optional[str] = None):
  if ptype and ptype not in [int, float, str]:
    raise RuntimeError('Only str and scalar runtime parameters are supported')
  if (default and ptype) and not isinstance(default, ptype):
    raise TypeError('Default value must be consistent with specified ptype')
  self.name = name
  self.default = default
  self.ptype = ptype
  self.description = description
Functions
from_json_dict classmethod
from_json_dict(dict_data: Dict[str, Any]) -> Any

Convert from dictionary data to an object.

Source code in tfx/utils/json_utils.py
@classmethod
@doc_controls.do_not_doc_in_subclasses
def from_json_dict(cls, dict_data: Dict[str, Any]) -> Any:
  """Convert from dictionary data to an object."""
  instance = cls.__new__(cls)
  instance.__dict__ = dict_data
  return instance
to_json_dict
to_json_dict() -> Dict[str, Any]

Convert from an object to a JSON serializable dictionary.

Source code in tfx/utils/json_utils.py
@doc_controls.do_not_doc_in_subclasses
def to_json_dict(self) -> Dict[str, Any]:
  """Convert from an object to a JSON serializable dictionary."""
  return self.__dict__
SpanRangeStrategy
SpanRangeStrategy(range_config: Any)

Bases: ResolverStrategy

SpanRangeStrategy resolves artifacts based on "span" property.

Note that this ResolverStrategy is experimental and is subject to change in terms of both interface and implementation.

Don't construct SpanRangeStrategy directly, example usage:

examples_resolver = Resolver(
    strategy_class=SpanRangeStrategy,
    config={"range_config": range_config},
    examples=Channel(type=Examples, producer_component_id=example_gen.id),
).with_id("span_resolver")
examples_resolver.outputs["examples"]
METHOD DESCRIPTION
resolve_artifacts

Resolves artifacts from channels by querying MLMD.

Source code in tfx/dsl/input_resolution/strategies/span_range_strategy.py
def __init__(self, range_config: Any):
  # Import range_config locally, as SpanRangeStrategy is included in
  # ml-pipelines-sdk package while tfx.proto is not.
  from tfx.proto import range_config_pb2  # pylint: disable=g-import-not-at-top
  self._range_config: range_config_pb2.RangeConfig = range_config
Functions
resolve_artifacts
resolve_artifacts(store: MetadataStore, input_dict: Dict[str, List[Artifact]]) -> Optional[Dict[str, List[Artifact]]]

Resolves artifacts from channels by querying MLMD.

PARAMETER DESCRIPTION
store

An MLMD MetadataStore object.

TYPE: MetadataStore

input_dict

The input_dict to resolve from.

TYPE: Dict[str, List[Artifact]]

RETURNS DESCRIPTION
Optional[Dict[str, List[Artifact]]]

If min_count for every input is met, returns a

Optional[Dict[str, List[Artifact]]]

Dict[Text, List[Artifact]]. Otherwise, return None.

RAISES DESCRIPTION
RuntimeError

if input_dict contains artifact without span property.

Source code in tfx/dsl/input_resolution/strategies/span_range_strategy.py
@doc_controls.do_not_generate_docs
def resolve_artifacts(
    self, store: mlmd.MetadataStore,
    input_dict: Dict[str, List[types.Artifact]]
) -> Optional[Dict[str, List[types.Artifact]]]:
  """Resolves artifacts from channels by querying MLMD.

  Args:
    store: An MLMD MetadataStore object.
    input_dict: The input_dict to resolve from.

  Returns:
    If `min_count` for every input is met, returns a
    Dict[Text, List[Artifact]]. Otherwise, return None.

  Raises:
    RuntimeError: if input_dict contains artifact without span property.
  """
  resolved_dict = self._resolve(input_dict)
  all_min_count_met = all(
      bool(artifact_list) for artifact_list in resolved_dict.values())
  return resolved_dict if all_min_count_met else None
Functions
create_container_component
create_container_component(name: str, image: str, command: List[CommandlineArgumentType], inputs: Optional[Dict[str, Any]] = None, outputs: Optional[Dict[str, Any]] = None, parameters: Optional[Dict[str, Any]] = None) -> Callable[..., BaseComponent]

Creates a container-based component.

PARAMETER DESCRIPTION
name

The name of the component

TYPE: str

image

Container image name.

TYPE: str

command

Container entrypoint command-line. Not executed within a shell. The command-line can use placeholder objects that will be replaced at the compilation time. The placeholder objects can be imported from tfx.dsl.component.experimental.placeholders. Note that Jinja templates are not supported.

TYPE: List[CommandlineArgumentType]

inputs

The list of component inputs

TYPE: Optional[Dict[str, Any]] DEFAULT: None

outputs

The list of component outputs

TYPE: Optional[Dict[str, Any]] DEFAULT: None

parameters

The list of component parameters

TYPE: Optional[Dict[str, Any]] DEFAULT: None

RETURNS DESCRIPTION
Callable[..., BaseComponent]

Component that can be instantiated and user inside pipeline.

Example

component = create_container_component(
    name="TrainModel",
    inputs={
        "training_data": Dataset,
    },
    outputs={
        "model": Model,
    },
    parameters={
        "num_training_steps": int,
    },
    image="gcr.io/my-project/my-trainer",
    command=[
        "python3", "my_trainer",
        "--training_data_uri", InputUriPlaceholder("training_data"),
        "--model_uri", OutputUriPlaceholder("model"),
        "--num_training-steps", InputValuePlaceholder("num_training_steps"),
    ],
)
Source code in tfx/dsl/component/experimental/container_component.py
def create_container_component(
    name: str,
    image: str,
    command: List[placeholders.CommandlineArgumentType],
    inputs: Optional[Dict[str, Any]] = None,
    outputs: Optional[Dict[str, Any]] = None,
    parameters: Optional[Dict[str, Any]] = None,
) -> Callable[..., base_component.BaseComponent]:
  """Creates a container-based component.

  Args:
    name: The name of the component
    image: Container image name.
    command: Container entrypoint command-line. Not executed within a shell. The
      command-line can use placeholder objects that will be replaced at the
      compilation time. The placeholder objects can be imported from
      tfx.dsl.component.experimental.placeholders. Note that Jinja templates are
      not supported.
    inputs: The list of component inputs
    outputs: The list of component outputs
    parameters: The list of component parameters

  Returns:
    Component that can be instantiated and user inside pipeline.

  !!! Example
      ``` python
      component = create_container_component(
          name="TrainModel",
          inputs={
              "training_data": Dataset,
          },
          outputs={
              "model": Model,
          },
          parameters={
              "num_training_steps": int,
          },
          image="gcr.io/my-project/my-trainer",
          command=[
              "python3", "my_trainer",
              "--training_data_uri", InputUriPlaceholder("training_data"),
              "--model_uri", OutputUriPlaceholder("model"),
              "--num_training-steps", InputValuePlaceholder("num_training_steps"),
          ],
      )
      ```
  """
  if not name:
    raise ValueError('Component name cannot be empty.')

  if inputs is None:
    inputs = {}
  if outputs is None:
    outputs = {}
  if parameters is None:
    parameters = {}

  input_channel_parameters = {}
  output_channel_parameters = {}
  output_channels = {}
  execution_parameters = {}

  for input_name, channel_type in inputs.items():
    # TODO(b/155804245) Sanitize the names so that they're valid python names
    input_channel_parameters[input_name] = (
        component_spec.ChannelParameter(type=channel_type,))

  for output_name, channel_type in outputs.items():
    # TODO(b/155804245) Sanitize the names so that they're valid python names
    output_channel_parameters[output_name] = (
        component_spec.ChannelParameter(type=channel_type))
    artifact = channel_type()
    channel = channel_utils.as_channel([artifact])
    output_channels[output_name] = channel

  for param_name, parameter_type in parameters.items():
    # TODO(b/155804245) Sanitize the names so that they're valid python names

    execution_parameters[param_name] = (
        component_spec.ExecutionParameter(type=parameter_type))

  default_init_args = {**output_channels}

  return component_utils.create_tfx_component_class(
      name=name,
      tfx_executor_spec=executor_specs.TemplatedExecutorContainerSpec(
          image=image,
          command=command,
      ),
      input_channel_parameters=input_channel_parameters,
      output_channel_parameters=output_channel_parameters,
      execution_parameters=execution_parameters,
      default_init_args=default_init_args)

io

TFX DSL I/O module.

MODULE DESCRIPTION
fileio

TFX DSL file I/O module.

Modules
fileio

TFX DSL file I/O module.

FUNCTION DESCRIPTION
copy

Copy a file from the source to the destination.

exists

Return whether a path exists.

glob

Return the paths that match a glob pattern.

isdir

Return whether a path is a directory.

listdir

Return the list of files in a directory.

makedirs

Make a directory at the given path, recursively creating parents.

mkdir

Make a directory at the given path; parent directory must exist.

open

Open a file at the given path.

remove

Remove the file at the given path.

rename

Rename a source file to a destination path.

rmtree

Remove the given directory and its recursive contents.

stat

Return the stat descriptor for a given file path.

walk

Return an iterator walking a directory tree.

Functions
copy
copy(src: PathType, dst: PathType, overwrite: bool = False) -> None

Copy a file from the source to the destination.

Source code in tfx/dsl/io/fileio.py
def copy(src: PathType, dst: PathType, overwrite: bool = False) -> None:
  """Copy a file from the source to the destination."""
  src_fs = _get_filesystem(src)
  dst_fs = _get_filesystem(dst)
  if src_fs is dst_fs:
    src_fs.copy(src, dst, overwrite=overwrite)
  else:
    if not overwrite and exists(dst):
      raise OSError(
          ('Destination file %r already exists and argument `overwrite` is '
           'false.') % dst)
    with open(src, mode='rb') as f_src:
      contents = f_src.read()
    with open(dst, mode='wb') as f_dst:
      f_dst.write(contents)
exists
exists(path: PathType) -> bool

Return whether a path exists.

Source code in tfx/dsl/io/fileio.py
def exists(path: PathType) -> bool:
  """Return whether a path exists."""
  return _get_filesystem(path).exists(path)
glob
glob(pattern: PathType) -> List[PathType]

Return the paths that match a glob pattern.

Source code in tfx/dsl/io/fileio.py
def glob(pattern: PathType) -> List[PathType]:
  """Return the paths that match a glob pattern."""
  return _get_filesystem(pattern).glob(pattern)
isdir
isdir(path: PathType) -> bool

Return whether a path is a directory.

Source code in tfx/dsl/io/fileio.py
def isdir(path: PathType) -> bool:
  """Return whether a path is a directory."""
  return _get_filesystem(path).isdir(path)
listdir
listdir(path: PathType) -> List[PathType]

Return the list of files in a directory.

Source code in tfx/dsl/io/fileio.py
def listdir(path: PathType) -> List[PathType]:
  """Return the list of files in a directory."""
  return _get_filesystem(path).listdir(path)
makedirs
makedirs(path: PathType) -> None

Make a directory at the given path, recursively creating parents.

Source code in tfx/dsl/io/fileio.py
def makedirs(path: PathType) -> None:
  """Make a directory at the given path, recursively creating parents."""
  _get_filesystem(path).makedirs(path)
mkdir
mkdir(path: PathType) -> None

Make a directory at the given path; parent directory must exist.

Source code in tfx/dsl/io/fileio.py
def mkdir(path: PathType) -> None:
  """Make a directory at the given path; parent directory must exist."""
  _get_filesystem(path).mkdir(path)
open
open(path: PathType, mode: str = 'r')

Open a file at the given path.

Source code in tfx/dsl/io/fileio.py
def open(path: PathType, mode: str = 'r'):  # pylint: disable=redefined-builtin
  """Open a file at the given path."""
  return _get_filesystem(path).open(path, mode=mode)
remove
remove(path: PathType) -> None

Remove the file at the given path.

Source code in tfx/dsl/io/fileio.py
def remove(path: PathType) -> None:
  """Remove the file at the given path."""
  _get_filesystem(path).remove(path)
rename
rename(src: PathType, dst: PathType, overwrite: bool = False) -> None

Rename a source file to a destination path.

Source code in tfx/dsl/io/fileio.py
def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
  """Rename a source file to a destination path."""
  src_fs = _get_filesystem(src)
  dst_fs = _get_filesystem(dst)
  if src_fs is dst_fs:
    src_fs.rename(src, dst, overwrite=overwrite)
  else:
    raise NotImplementedError(
        ('Rename from %r to %r using different filesystems plugins is '
         'currently not supported.') % (src, dst))
rmtree
rmtree(path: PathType) -> None

Remove the given directory and its recursive contents.

Source code in tfx/dsl/io/fileio.py
def rmtree(path: PathType) -> None:
  """Remove the given directory and its recursive contents."""
  _get_filesystem(path).rmtree(path)
stat
stat(path: PathType) -> Any

Return the stat descriptor for a given file path.

Source code in tfx/dsl/io/fileio.py
def stat(path: PathType) -> Any:
  """Return the stat descriptor for a given file path."""
  return _get_filesystem(path).stat(path)
walk
walk(top: PathType, topdown: bool = True, onerror: Optional[Callable[..., None]] = None) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]

Return an iterator walking a directory tree.

Source code in tfx/dsl/io/fileio.py
def walk(
    top: PathType,
    topdown: bool = True,
    onerror: Optional[Callable[..., None]] = None
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
  """Return an iterator walking a directory tree."""
  return _get_filesystem(top).walk(top, topdown=topdown, onerror=onerror)

placeholders

TFX placeholders module.

FUNCTION DESCRIPTION
exec_property

Returns a Placeholder that represents an execution property.

execution_invocation

Returns a Placeholder representing ExecutionInvocation proto.

input

Returns a Placeholder that represents an input artifact.

output

Returns a Placeholder that represents an output artifact.

Functions
exec_property
exec_property(key: str) -> ExecPropertyPlaceholder

Returns a Placeholder that represents an execution property.

PARAMETER DESCRIPTION
key

The key of the output artifact.

TYPE: str

RETURNS DESCRIPTION
ExecPropertyPlaceholder

A Placeholder that supports

  1. Rendering the value of an execution property at a given key. Example: exec_property('version')
  2. Rendering the whole proto or a proto field of an execution property, if the value is a proto type. The (possibly nested) proto field in a placeholder can be accessed as if accessing a proto field in Python. Example: exec_property('model_config').num_layers
  3. Concatenating with other placeholders or strings. Example: output('model').uri + '/model/' + exec_property('version')
Source code in tfx/dsl/placeholder/runtime_placeholders.py
def exec_property(key: str) -> ExecPropertyPlaceholder:
  """Returns a Placeholder that represents an execution property.

  Args:
    key: The key of the output artifact.

  Returns:
    A Placeholder that supports

      1. Rendering the value of an execution property at a given key.
         Example: `#!python exec_property('version')`
      2. Rendering the whole proto or a proto field of an execution property,
         if the value is a proto type.
         The (possibly nested) proto field in a placeholder can be accessed as
         if accessing a proto field in Python.
         Example: `#!python exec_property('model_config').num_layers`
      3. Concatenating with other placeholders or strings.
         Example: `#!python output('model').uri + '/model/' + exec_property('version')`
  """
  return ExecPropertyPlaceholder(key)
execution_invocation
execution_invocation() -> ExecInvocationPlaceholder

Returns a Placeholder representing ExecutionInvocation proto.

RETURNS DESCRIPTION
ExecInvocationPlaceholder

A Placeholder that will render to the ExecutionInvocation proto. Accessing a proto field is the same as if accessing a proto field in Python.

Prefer to use input(key)/output(key)/exec_property(key) functions instead of input_dict/output_dict/execution_properties field from ExecutionInvocation proto.

Source code in tfx/dsl/placeholder/runtime_placeholders.py
def execution_invocation() -> ExecInvocationPlaceholder:
  """Returns a Placeholder representing ExecutionInvocation proto.

  Returns:
    A Placeholder that will render to the ExecutionInvocation proto.
      Accessing a proto field is the same as if accessing a proto field in Python.

      Prefer to use input(key)/output(key)/exec_property(key) functions instead of
      input_dict/output_dict/execution_properties field from ExecutionInvocation
      proto.
  """
  return ExecInvocationPlaceholder()
input
input(key: str) -> ArtifactPlaceholder

Returns a Placeholder that represents an input artifact.

PARAMETER DESCRIPTION
key

The key of the input artifact.

TYPE: str

RETURNS DESCRIPTION
ArtifactPlaceholder

A Placeholder that supports

  1. Rendering the whole MLMD artifact proto as text_format. Example: input('model')
  2. Accessing a specific index using [index], if multiple artifacts are associated with the given key. If not specified, default to the first artifact. Example: input('model')[0]
  3. Getting the URI of an artifact through .uri property. Example: input('model').uri or input('model')[0].uri
  4. Getting the URI of a specific split of an artifact using .split_uri(split_name) method. Example: input('examples')[0].split_uri('train')
  5. Getting the value of a primitive artifact through .value property. Example: input('primitive').value
  6. Concatenating with other placeholders or strings. Example: input('model').uri + '/model/' + exec_property('version')
Source code in tfx/dsl/placeholder/artifact_placeholder.py
def input(key: str) -> ArtifactPlaceholder:  # pylint: disable=redefined-builtin
  """Returns a Placeholder that represents an input artifact.

  Args:
    key: The key of the input artifact.

  Returns:
    A Placeholder that supports

      1. Rendering the whole MLMD artifact proto as text_format.
         Example: `#!python input('model')`
      2. Accessing a specific index using `#!python [index]`, if multiple artifacts are
         associated with the given key. If not specified, default to the first
         artifact.
         Example: `#!python input('model')[0]`
      3. Getting the URI of an artifact through .uri property.
         Example: `#!python input('model').uri or input('model')[0].uri`
      4. Getting the URI of a specific split of an artifact using
         `#!python .split_uri(split_name)` method.
         Example: `#!python input('examples')[0].split_uri('train')`
      5. Getting the value of a primitive artifact through .value property.
         Example: `#!python input('primitive').value`
      6. Concatenating with other placeholders or strings.
         Example: `#!python input('model').uri + '/model/' + exec_property('version')`
  """
  return ArtifactPlaceholder(key, is_input=True)
output
output(key: str) -> ArtifactPlaceholder

Returns a Placeholder that represents an output artifact.

It is the same as input(...) function, except it is for output artifacts.

PARAMETER DESCRIPTION
key

The key of the output artifact.

TYPE: str

RETURNS DESCRIPTION
ArtifactPlaceholder

A Placeholder that supports

  1. Rendering the whole artifact as text_format. Example: output('model')
  2. Accessing a specific index using [index], if multiple artifacts are associated with the given key. If not specified, default to the first artifact. Example: output('model')[0]
  3. Getting the URI of an artifact through .uri property. Example: output('model').uri or output('model')[0].uri
  4. Getting the URI of a specific split of an artifact using .split_uri(split_name) method. Example: output('examples')[0].split_uri('train')
  5. Getting the value of a primitive artifact through .value property. Example: output('primitive').value
  6. Concatenating with other placeholders or strings. Example: output('model').uri + '/model/' + exec_property('version')
Source code in tfx/dsl/placeholder/artifact_placeholder.py
def output(key: str) -> ArtifactPlaceholder:
  """Returns a Placeholder that represents an output artifact.

  It is the same as input(...) function, except it is for output artifacts.

  Args:
    key: The key of the output artifact.

  Returns:
    A Placeholder that supports

      1. Rendering the whole artifact as text_format.
         Example: `#!python output('model')`
      2. Accessing a specific index using [index], if multiple artifacts are
         associated with the given key. If not specified, default to the first
         artifact.
         Example: `#!python output('model')[0]`
      3. Getting the URI of an artifact through .uri property.
         Example: `#!python output('model').uri or output('model')[0].uri`
      4. Getting the URI of a specific split of an artifact using
         `#!python .split_uri(split_name)` method.
         Example: `#!python output('examples')[0].split_uri('train')`
      5. Getting the value of a primitive artifact through .value property.
         Example: `#!python output('primitive').value`
      6. Concatenating with other placeholders or strings.
         Example: `#!python output('model').uri + '/model/' + exec_property('version')`
  """
  return ArtifactPlaceholder(key, is_input=False)

standard_annotations

Public API for base type annotations.

CLASS DESCRIPTION
Dataset

Dataset is a TFX pre-defined system artifact.

Deploy

Deploy is a TFX pre-defined system execution.

Evaluate

Evaluate is a TFX pre-defined system execution.

Metrics

Metrics is a TFX pre-defined system artifact.

Model

Model is a TFX pre-defined system artifact.

Process

Process is a TFX pre-defined system execution.

Statistics

Statistics is a TFX pre-defined system artifact.

Train

Train is a TFX pre-defined system execution.

Transform

Transform is a TFX pre-defined system execution.

Classes
Dataset

Bases: SystemArtifact

Dataset is a TFX pre-defined system artifact.

Deploy

Bases: SystemExecution

Deploy is a TFX pre-defined system execution.

This execution performs model deployment. For example, Pusher component can be annotated as Deploy execution, which checks whether the model passed the validation steps and pushes fully validated models to Servomatic, CNS/Placer, TF-Hub, and other destinations.

Evaluate

Bases: SystemExecution

Evaluate is a TFX pre-defined system execution.

It computes a model’s evaluation statistics over (slices of) features.

Metrics

Bases: SystemArtifact

Metrics is a TFX pre-defined system artifact.

Model

Bases: SystemArtifact

Model is a TFX pre-defined system artifact.

Process

Bases: SystemExecution

Process is a TFX pre-defined system execution.

It includes various executions such as ExampleGen, SchemaGen, SkewDetection, e.t.c., which performs data/model/statistics processing.

Statistics

Bases: SystemArtifact

Statistics is a TFX pre-defined system artifact.

Train

Bases: SystemExecution

Train is a TFX pre-defined system execution.

Train is one of the key executions that performs the actual model training.

Transform

Bases: SystemExecution

Transform is a TFX pre-defined system execution.

It performs transformations and feature engineering in training and serving.