Building Fully Custom Components¶
This guide describes how to use the TFX API to build a fully custom component. Fully custom components let you build components by defining the component specification, executor, and component interface classes. This approach lets you reuse and extend a standard component to fit your needs.
If you are new to TFX pipelines, learn more about the core concepts of TFX pipelines.
Custom executor or custom component¶
If only custom processing logic is needed while the inputs, outputs, and execution properties of the component are the same as an existing component, a custom executor is sufficient. A fully custom component is needed when any of the inputs, outputs, or execution properties are different from any existing TFX components.
How to create a custom component?¶
Developing a fully custom component requires:
- A defined set of input and output artifact specifications for the new component. Specially, the types for the input artifacts should be consistent with the output artifact types of the components that produce the artifacts and the types for the output artifacts should be consistent with the input artifact types of the components that consume the artifacts if any.
- The non-artifact execution parameters that are needed for the new component.
ComponentSpec¶
The ComponentSpec
class defines the component contract by defining the input
and output artifacts to a component as well as the parameters that are used for
the component execution. It has three parts:
- INPUTS: A dictionary of typed parameters for the input artifacts that are passed into the component executor. Normally input artifacts are the outputs from upstream components and thus share the same type.
- OUTPUTS: A dictionary of typed parameters for the output artifacts which the component produces.
- PARAMETERS: A dictionary of additional ExecutionParameter items that will be passed into the component executor. These are non-artifact parameters that we want to define flexibly in the pipeline DSL and pass into execution.
Here is an example of the ComponentSpec:
class HelloComponentSpec(types.ComponentSpec):
"""ComponentSpec for Custom TFX Hello World Component."""
PARAMETERS = {
# These are parameters that will be passed in the call to
# create an instance of this component.
'name': ExecutionParameter(type=Text),
}
INPUTS = {
# This will be a dictionary with input artifacts, including URIs
'input_data': ChannelParameter(type=standard_artifacts.Examples),
}
OUTPUTS = {
# This will be a dictionary which this component will populate
'output_data': ChannelParameter(type=standard_artifacts.Examples),
}
Executor¶
Next, write the executor code for the new component. Basically, a new subclass
of base_executor.BaseExecutor
needs to be created with its Do
function
overriden. In the Do
function, the arguments input_dict
, output_dict
and
exec_properties
that are passed in map to INPUTS
, OUTPUTS
and PARAMETERS
that are defined in ComponentSpec respectively. For exec_properties
, the value
can be fetched directly through a dictionary lookup. For artifacts in
input_dict
and output_dict
, there are convenient functions available in
artifact_utils
class that can be used to fetch artifact instance or artifact uri.
class Executor(base_executor.BaseExecutor):
"""Executor for HelloComponent."""
def Do(self, input_dict: Dict[Text, List[types.Artifact]],
output_dict: Dict[Text, List[types.Artifact]],
exec_properties: Dict[Text, Any]) -> None:
...
split_to_instance = {}
for artifact in input_dict['input_data']:
for split in json.loads(artifact.split_names):
uri = artifact_utils.get_split_uri([artifact], split)
split_to_instance[split] = uri
for split, instance in split_to_instance.items():
input_dir = instance
output_dir = artifact_utils.get_split_uri(
output_dict['output_data'], split)
for filename in tf.io.gfile.listdir(input_dir):
input_uri = os.path.join(input_dir, filename)
output_uri = os.path.join(output_dir, filename)
io_utils.copy_file(src=input_uri, dst=output_uri, overwrite=True)
Unit testing a custom executor¶
Unit tests for the custom executor can be created similar to this one.
Component interface¶
Now that the most complex part is complete, the next step is to assemble these pieces into a component interface, to enable the component to be used in a pipeline. There are several steps:
- Make the component interface a subclass of
base_component.BaseComponent
- Assign a class variable
SPEC_CLASS
with theComponentSpec
class that was defined earlier - Assign a class variable
EXECUTOR_SPEC
with the Executor class that was defined earlier - Define the
__init__()
constructor function by using the arguments to the function to construct an instance of the ComponentSpec class and invoke the super function with that value, along with an optional name
When an instance of the component is created, type checking logic in the
base_component.BaseComponent
class will be invoked to ensure that the
arguments which were passed in are compatible with the type info defined in the
ComponentSpec
class.
from tfx.types import standard_artifacts
from hello_component import executor
class HelloComponent(base_component.BaseComponent):
"""Custom TFX Hello World Component."""
SPEC_CLASS = HelloComponentSpec
EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(executor.Executor)
def __init__(self,
input_data: types.Channel = None,
output_data: types.Channel = None,
name: Optional[Text] = None):
if not output_data:
examples_artifact = standard_artifacts.Examples()
examples_artifact.split_names = input_data.get()[0].split_names
output_data = channel_utils.as_channel([examples_artifact])
spec = HelloComponentSpec(input_data=input_data,
output_data=output_data, name=name)
super(HelloComponent, self).__init__(spec=spec)
Assemble into a TFX pipeline¶
The last step is to plug the new custom component into a TFX pipeline. Besides adding an instance of the new component, the following are also needed:
- Properly wire the upstream and downstream components of the new component to it. This is done by referencing the outputs of the upstream component in the new component and referencing the outputs of the new component in downstream components
- Add the new component instance to the components list when constructing the pipeline.
The example below highlights the aforementioned changes. Full example can be found in the TFX GitHub repo.
def _create_pipeline():
...
example_gen = CsvExampleGen(input_base=examples)
hello = component.HelloComponent(
input_data=example_gen.outputs['examples'], name='HelloWorld')
statistics_gen = StatisticsGen(examples=hello.outputs['output_data'])
...
return pipeline.Pipeline(
...
components=[example_gen, hello, statistics_gen, ...],
...
)
Deploy a fully custom component¶
Beside code changes, all the newly added parts (ComponentSpec
, Executor
,
component interface) need to be accessible in pipeline running environment in
order to run the pipeline properly.