Skip to content

TFX-BSL Public TFXIO

tfx_bsl.public.tfxio

Module level imports for tfx_bsl.public.tfxio.

TFXIO defines a common in-memory data representation shared by all TFX libraries and components, as well as an I/O abstraction layer to produce such representations. See the RFC for details: https://github.com/tensorflow/community/blob/master/rfcs/20191017-tfx-standardized-inputs.md

Attributes

TensorRepresentations module-attribute

TensorRepresentations = Dict[str, TensorRepresentation]

Classes

BeamRecordCsvTFXIO

BeamRecordCsvTFXIO(
    physical_format: str,
    column_names: List[str],
    delimiter: Optional[str] = ",",
    skip_blank_lines: bool = True,
    multivalent_columns: Optional[str] = None,
    secondary_delimiter: Optional[str] = None,
    schema: Optional[Schema] = None,
    raw_record_column_name: Optional[str] = None,
    telemetry_descriptors: Optional[List[str]] = None,
)

Bases: _CsvTFXIOBase

TFXIO implementation for CSV records in pcoll[bytes].

This is a special TFXIO that does not actually do I/O -- it relies on the caller to prepare a PCollection of bytes.

Source code in tfx_bsl/tfxio/csv_tfxio.py
def __init__(
    self,
    physical_format: str,
    column_names: List[str],
    delimiter: Optional[str] = ",",
    skip_blank_lines: bool = True,
    multivalent_columns: Optional[str] = None,
    secondary_delimiter: Optional[str] = None,
    schema: Optional[schema_pb2.Schema] = None,
    raw_record_column_name: Optional[str] = None,
    telemetry_descriptors: Optional[List[str]] = None,
):
    super().__init__(
        telemetry_descriptors=telemetry_descriptors,
        raw_record_column_name=raw_record_column_name,
        logical_format="csv",
        physical_format=physical_format,
    )
    self._schema = schema
    self._column_names = column_names
    self._delimiter = delimiter
    self._skip_blank_lines = skip_blank_lines
    self._multivalent_columns = multivalent_columns
    self._secondary_delimiter = secondary_delimiter
    self._raw_record_column_name = raw_record_column_name
    if schema is not None:
        feature_names = [f.name for f in schema.feature]
        if not set(feature_names).issubset(set(column_names)):
            raise ValueError(
                f"Schema features are not a subset of column names: {column_names} vs {feature_names}"
            )
    self._schema_projected = False
Attributes
raw_record_column_name property
raw_record_column_name: Optional[str]
telemetry_descriptors property
telemetry_descriptors: Optional[List[str]]
Functions
ArrowSchema
ArrowSchema() -> Schema

Returns the schema of the RecordBatch produced by self.BeamSource().

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def ArrowSchema(self) -> pa.Schema:
    schema = self._ArrowSchemaNoRawRecordColumn()
    if self._raw_record_column_name is not None:
        if schema.get_field_index(self._raw_record_column_name) != -1:
            raise ValueError(
                f"Raw record column name {self._raw_record_column_name} collided with a column in the schema."
            )
        schema = schema.append(
            pa.field(self._raw_record_column_name, pa.large_list(pa.large_binary()))
        )
    return schema
BeamSource
BeamSource(batch_size: Optional[int] = None) -> PTransform

Returns a beam PTransform that produces PCollection[pa.RecordBatch].

May NOT raise an error if the TFMD schema was not provided at construction time.

If a TFMD schema was provided at construction time, all the pa.RecordBatches in the result PCollection must be of the same schema returned by self.ArrowSchema. If a TFMD schema was not provided, the pa.RecordBatches might not be of the same schema (they may contain different numbers of columns).


batch_size: if not None, the pa.RecordBatch produced will be of the specified size. Otherwise it's automatically tuned by Beam.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def BeamSource(self, batch_size: Optional[int] = None) -> beam.PTransform:
    @beam.typehints.with_input_types(Any)
    @beam.typehints.with_output_types(pa.RecordBatch)
    def _PTransformFn(pcoll_or_pipeline: Any):
        """Converts raw records to RecordBatches."""
        return (
            pcoll_or_pipeline
            | "RawRecordBeamSource" >> self.RawRecordBeamSource()
            | "RawRecordToRecordBatch" >> self.RawRecordToRecordBatch(batch_size)
        )

    return beam.ptransform_fn(_PTransformFn)()
Project
Project(tensor_names: List[str]) -> TFXIO

Projects the dataset represented by this TFXIO.

A Projected TFXIO: - Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource() - self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors. - It retains a reference to the very original TFXIO, so its TensorAdapter knows about the specs of the tensors that would be produced by the original TensorAdapter. Also see TensorAdapter.OriginalTensorSpec().

May raise an error if the TFMD schema was not provided at construction time.


tensor_names: a set of tensor names.


A TFXIO instance that is the same as self except that: - Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource() - self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors.

Source code in tfx_bsl/tfxio/tfxio.py
def Project(self, tensor_names: List[str]) -> "TFXIO":
    """Projects the dataset represented by this TFXIO.

    A Projected TFXIO:
    - Only columns needed for given tensor_names are guaranteed to be
      produced by `self.BeamSource()`
    - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
      to contain only those tensors.
    - It retains a reference to the very original TFXIO, so its TensorAdapter
      knows about the specs of the tensors that would be produced by the
      original TensorAdapter. Also see `TensorAdapter.OriginalTensorSpec()`.

    May raise an error if the TFMD schema was not provided at construction time.

    Args:
    ----
      tensor_names: a set of tensor names.

    Returns:
    -------
      A `TFXIO` instance that is the same as `self` except that:
      - Only columns needed for given tensor_names are guaranteed to be
        produced by `self.BeamSource()`
      - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
        to contain only those tensors.
    """
    if isinstance(self, _ProjectedTFXIO):
        # pylint: disable=protected-access
        return _ProjectedTFXIO(
            self.origin, self.projected._ProjectImpl(tensor_names)
        )
    return _ProjectedTFXIO(self, self._ProjectImpl(tensor_names))
RawRecordBeamSource
RawRecordBeamSource() -> PTransform

Returns a PTransform that produces a PCollection[bytes].

Used together with RawRecordToRecordBatch(), it allows getting both the PCollection of the raw records and the PCollection of the RecordBatch from the same source. For example:

record_batch = pipeline | tfxio.BeamSource() raw_record = pipeline | tfxio.RawRecordBeamSource()

would result in the files being read twice, while the following would only read once:

raw_record = pipeline | tfxio.RawRecordBeamSource() record_batch = raw_record | tfxio.RawRecordToRecordBatch()

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordBeamSource(self) -> beam.PTransform:
    """Returns a PTransform that produces a PCollection[bytes].

    Used together with RawRecordToRecordBatch(), it allows getting both the
    PCollection of the raw records and the PCollection of the RecordBatch from
    the same source. For example:

    record_batch = pipeline | tfxio.BeamSource()
    raw_record = pipeline | tfxio.RawRecordBeamSource()

    would result in the files being read twice, while the following would only
    read once:

    raw_record = pipeline | tfxio.RawRecordBeamSource()
    record_batch = raw_record | tfxio.RawRecordToRecordBatch()
    """

    @beam.typehints.with_input_types(Any)
    @beam.typehints.with_output_types(bytes)
    def _PTransformFn(pcoll_or_pipeline: Any):
        return (
            pcoll_or_pipeline
            | "ReadRawRecords" >> self._RawRecordBeamSourceInternal()
            | "CollectRawRecordTelemetry"
            >> telemetry.ProfileRawRecords(
                self._telemetry_descriptors,
                self._logical_format,
                self._physical_format,
            )
        )

    return beam.ptransform_fn(_PTransformFn)()
RawRecordTensorFlowDataset
RawRecordTensorFlowDataset(
    options: TensorFlowDatasetOptions,
) -> Dataset

Returns a Dataset that contains nested Datasets of raw records.

May not be implemented for some TFXIOs.

This should be used when RawTfRecordTFXIO.TensorFlowDataset does not suffice. Namely, if there is some logical grouping of files which we need to perform operations on, without applying the operation to each individual group (i.e. shuffle).

The returned Dataset object is a dataset of datasets, where each nested dataset is a dataset of serialized records. When shuffle=False (default), the nested datasets are deterministically ordered. Each nested dataset can represent multiple files. The files are merged into one dataset if the files have the same format. For example:

file_patterns = ['file_1', 'file_2', 'dir_1/*']
file_formats = ['recordio', 'recordio', 'sstable']
tfxio = SomeTFXIO(file_patterns, file_formats)
datasets = tfxio.RawRecordTensorFlowDataset(options)
datasets would result in the following dataset: [ds1, ds2]. Where ds1 iterates over records from 'file_1' and 'file_2', and ds2 iterates over records from files matched by 'dir_1/*'.

Example usage:

tfxio = SomeTFXIO(file_patterns, file_formats)
ds = tfxio.RawRecordTensorFlowDataset(options=options)
ds = ds.flat_map(lambda x: x)
records = list(ds.as_numpy_iterator())
# iterating over `records` yields records from the each file in
# `file_patterns`. See `tf.data.Dataset.list_files` for more information
# about the order of files when expanding globs.
Note that we need a flat_map, because RawRecordTensorFlowDataset returns a dataset of datasets.

When shuffle=True, then the datasets not deterministically ordered, but the contents of each nested dataset are deterministcally ordered. For example, we may potentially have [ds2, ds1, ds3], where the contents of ds1, ds2, and ds3 are all deterministcally ordered.


options: A TensorFlowDatasetOptions object. Not all options will apply.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordTensorFlowDataset(
    self, options: dataset_options.TensorFlowDatasetOptions
) -> tf.data.Dataset:
    """Returns a Dataset that contains nested Datasets of raw records.

    May not be implemented for some TFXIOs.

    This should be used when RawTfRecordTFXIO.TensorFlowDataset does not
    suffice. Namely, if there is some logical grouping of files which we need
    to perform operations on, without applying the operation to each individual
    group (i.e. shuffle).

    The returned Dataset object is a dataset of datasets, where each nested
    dataset is a dataset of serialized records. When shuffle=False (default),
    the nested datasets are deterministically ordered. Each nested dataset can
    represent multiple files. The files are merged into one dataset if the files
    have the same format. For example:

    ```
    file_patterns = ['file_1', 'file_2', 'dir_1/*']
    file_formats = ['recordio', 'recordio', 'sstable']
    tfxio = SomeTFXIO(file_patterns, file_formats)
    datasets = tfxio.RawRecordTensorFlowDataset(options)
    ```
    `datasets` would result in the following dataset: `[ds1, ds2]`. Where ds1
    iterates over records from 'file_1' and 'file_2', and ds2 iterates over
    records from files matched by 'dir_1/*'.

    Example usage:
    ```
    tfxio = SomeTFXIO(file_patterns, file_formats)
    ds = tfxio.RawRecordTensorFlowDataset(options=options)
    ds = ds.flat_map(lambda x: x)
    records = list(ds.as_numpy_iterator())
    # iterating over `records` yields records from the each file in
    # `file_patterns`. See `tf.data.Dataset.list_files` for more information
    # about the order of files when expanding globs.
    ```
    Note that we need a flat_map, because `RawRecordTensorFlowDataset` returns
    a dataset of datasets.

    When shuffle=True, then the datasets not deterministically ordered,
    but the contents of each nested dataset are deterministcally ordered.
    For example, we may potentially have [ds2, ds1, ds3], where the
    contents of ds1, ds2, and ds3 are all deterministcally ordered.

    Args:
    ----
      options: A TensorFlowDatasetOptions object. Not all options will apply.
    """
    raise NotImplementedError
RawRecordToRecordBatch
RawRecordToRecordBatch(
    batch_size: Optional[int] = None,
) -> PTransform

Returns a PTransform that converts raw records to Arrow RecordBatches.

The input PCollection must be from self.RawRecordBeamSource() (also see the documentation for that method).


batch_size: if not None, the pa.RecordBatch produced will be of the specified size. Otherwise it's automatically tuned by Beam.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordToRecordBatch(
    self, batch_size: Optional[int] = None
) -> beam.PTransform:
    """Returns a PTransform that converts raw records to Arrow RecordBatches.

    The input PCollection must be from self.RawRecordBeamSource() (also see
    the documentation for that method).

    Args:
    ----
      batch_size: if not None, the `pa.RecordBatch` produced will be of the
        specified size. Otherwise it's automatically tuned by Beam.
    """

    @beam.typehints.with_input_types(bytes)
    @beam.typehints.with_output_types(pa.RecordBatch)
    def _PTransformFn(pcoll: beam.pvalue.PCollection):
        return (
            pcoll
            | "RawRecordToRecordBatch"
            >> self._RawRecordToRecordBatchInternal(batch_size)
            | "CollectRecordBatchTelemetry"
            >> telemetry.ProfileRecordBatches(
                self._telemetry_descriptors,
                self._logical_format,
                self._physical_format,
            )
        )

    return beam.ptransform_fn(_PTransformFn)()
RecordBatches
RecordBatches(options: RecordBatchesOptions)

Returns an iterable of record batches.

This can be used outside of Apache Beam or TensorFlow to access data.


options: An options object for iterating over record batches. Look at dataset_options.RecordBatchesOptions for more details.

Source code in tfx_bsl/tfxio/csv_tfxio.py
def RecordBatches(self, options: dataset_options.RecordBatchesOptions):
    raise NotImplementedError
SupportAttachingRawRecords
SupportAttachingRawRecords() -> bool
Source code in tfx_bsl/tfxio/csv_tfxio.py
def SupportAttachingRawRecords(self) -> bool:
    return True
TensorAdapter
TensorAdapter() -> TensorAdapter

Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapter(self) -> tensor_adapter.TensorAdapter:
    """Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

    May raise an error if the TFMD schema was not provided at construction time.
    """
    return tensor_adapter.TensorAdapter(self.TensorAdapterConfig())
TensorAdapterConfig
TensorAdapterConfig() -> TensorAdapterConfig

Returns the config to initialize a TensorAdapter.

Returns

a TensorAdapterConfig that is the same as what is used to initialize the TensorAdapter returned by self.TensorAdapter().

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapterConfig(self) -> tensor_adapter.TensorAdapterConfig:
    """Returns the config to initialize a `TensorAdapter`.

    Returns
    -------
      a `TensorAdapterConfig` that is the same as what is used to initialize the
      `TensorAdapter` returned by `self.TensorAdapter()`.
    """
    return tensor_adapter.TensorAdapterConfig(
        self.ArrowSchema(), self.TensorRepresentations()
    )
TensorFlowDataset
TensorFlowDataset(options: TensorFlowDatasetOptions)

Returns a tf.data.Dataset of TF inputs.

May raise an error if the TFMD schema was not provided at construction time.


options: an options object for the tf.data.Dataset. Look at dataset_options.TensorFlowDatasetOptions for more details.

Source code in tfx_bsl/tfxio/csv_tfxio.py
def TensorFlowDataset(self, options: dataset_options.TensorFlowDatasetOptions):
    raise NotImplementedError
TensorRepresentations
TensorRepresentations() -> TensorRepresentations

Returns the TensorRepresentations.

These TensorRepresentations describe the tensors or composite tensors produced by the TensorAdapter created from self.TensorAdapter() or the tf.data.Dataset created from self.TensorFlowDataset().

May raise an error if the TFMD schema was not provided at construction time. May raise an error if the tensor representations are invalid.

Source code in tfx_bsl/tfxio/csv_tfxio.py
def TensorRepresentations(self) -> tensor_adapter.TensorRepresentations:
    return self._TensorRepresentations(not self._schema_projected)

CsvTFXIO

CsvTFXIO(
    file_pattern: str,
    column_names: List[str],
    telemetry_descriptors: Optional[List[str]] = None,
    validate: bool = True,
    delimiter: Optional[str] = ",",
    skip_blank_lines: Optional[bool] = True,
    multivalent_columns: Optional[str] = None,
    secondary_delimiter: Optional[str] = None,
    schema: Optional[Schema] = None,
    raw_record_column_name: Optional[str] = None,
    skip_header_lines: int = 0,
)

Bases: _CsvTFXIOBase

TFXIO implementation for CSV.

Initializes a CSV TFXIO.


file_pattern: A file glob pattern to read csv files from. column_names: List of csv column names. Order must match the order in the CSV file. telemetry_descriptors: A set of descriptors that identify the component that is instantiating this TFXIO. These will be used to construct the namespace to contain metrics for profiling and are therefore expected to be identifiers of the component itself and not individual instances of source use. validate: Boolean flag to verify that the files exist during the pipeline creation time. delimiter: A one-character string used to separate fields. skip_blank_lines: A boolean to indicate whether to skip over blank lines rather than interpreting them as missing values. multivalent_columns: Name of column that can contain multiple values. If secondary_delimiter is provided, this must also be provided. secondary_delimiter: Delimiter used for parsing multivalent columns. If multivalent_columns is provided, this must also be provided. schema: An optional TFMD Schema describing the dataset. If schema is provided, it will determine the data type of the csv columns. Otherwise, the each column's data type will be inferred by the csv decoder. The schema should contain exactly the same features as column_names. raw_record_column_name: If not None, the generated Arrow RecordBatches will contain a column of the given name that contains raw csv rows. skip_header_lines: Number of header lines to skip. Same number is skipped from each file. Must be 0 or higher. Large number of skipped lines might impact performance.

Source code in tfx_bsl/tfxio/csv_tfxio.py
def __init__(
    self,
    file_pattern: str,
    column_names: List[str],
    telemetry_descriptors: Optional[List[str]] = None,
    validate: bool = True,
    delimiter: Optional[str] = ",",
    skip_blank_lines: Optional[bool] = True,
    multivalent_columns: Optional[str] = None,
    secondary_delimiter: Optional[str] = None,
    schema: Optional[schema_pb2.Schema] = None,
    raw_record_column_name: Optional[str] = None,
    skip_header_lines: int = 0,
):
    """Initializes a CSV TFXIO.

    Args:
    ----
      file_pattern: A file glob pattern to read csv files from.
      column_names: List of csv column names. Order must match the order in the
        CSV file.
      telemetry_descriptors: A set of descriptors that identify the component
        that is instantiating this TFXIO. These will be used to construct the
        namespace to contain metrics for profiling and are therefore expected to
        be identifiers of the component itself and not individual instances of
        source use.
      validate: Boolean flag to verify that the files exist during the pipeline
        creation time.
      delimiter: A one-character string used to separate fields.
      skip_blank_lines: A boolean to indicate whether to skip over blank lines
        rather than interpreting them as missing values.
      multivalent_columns: Name of column that can contain multiple values. If
        secondary_delimiter is provided, this must also be provided.
      secondary_delimiter: Delimiter used for parsing multivalent columns. If
        multivalent_columns is provided, this must also be provided.
      schema: An optional TFMD Schema describing the dataset. If schema is
        provided, it will determine the data type of the csv columns. Otherwise,
        the each column's data type will be inferred by the csv decoder. The
        schema should contain exactly the same features as column_names.
      raw_record_column_name: If not None, the generated Arrow RecordBatches
        will contain a column of the given name that contains raw csv rows.
      skip_header_lines: Number of header lines to skip. Same number is
        skipped from each file. Must be 0 or higher. Large number of
        skipped lines might impact performance.
    """
    super().__init__(
        column_names=column_names,
        delimiter=delimiter,
        skip_blank_lines=skip_blank_lines,
        multivalent_columns=multivalent_columns,
        secondary_delimiter=secondary_delimiter,
        schema=schema,
        raw_record_column_name=raw_record_column_name,
        telemetry_descriptors=telemetry_descriptors,
        physical_format="text",
    )
    self._file_pattern = file_pattern
    self._validate = validate
    self._skip_header_lines = skip_header_lines
Attributes
raw_record_column_name property
raw_record_column_name: Optional[str]
telemetry_descriptors property
telemetry_descriptors: Optional[List[str]]
Functions
ArrowSchema
ArrowSchema() -> Schema

Returns the schema of the RecordBatch produced by self.BeamSource().

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def ArrowSchema(self) -> pa.Schema:
    schema = self._ArrowSchemaNoRawRecordColumn()
    if self._raw_record_column_name is not None:
        if schema.get_field_index(self._raw_record_column_name) != -1:
            raise ValueError(
                f"Raw record column name {self._raw_record_column_name} collided with a column in the schema."
            )
        schema = schema.append(
            pa.field(self._raw_record_column_name, pa.large_list(pa.large_binary()))
        )
    return schema
BeamSource
BeamSource(batch_size: Optional[int] = None) -> PTransform

Returns a beam PTransform that produces PCollection[pa.RecordBatch].

May NOT raise an error if the TFMD schema was not provided at construction time.

If a TFMD schema was provided at construction time, all the pa.RecordBatches in the result PCollection must be of the same schema returned by self.ArrowSchema. If a TFMD schema was not provided, the pa.RecordBatches might not be of the same schema (they may contain different numbers of columns).


batch_size: if not None, the pa.RecordBatch produced will be of the specified size. Otherwise it's automatically tuned by Beam.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def BeamSource(self, batch_size: Optional[int] = None) -> beam.PTransform:
    @beam.typehints.with_input_types(Any)
    @beam.typehints.with_output_types(pa.RecordBatch)
    def _PTransformFn(pcoll_or_pipeline: Any):
        """Converts raw records to RecordBatches."""
        return (
            pcoll_or_pipeline
            | "RawRecordBeamSource" >> self.RawRecordBeamSource()
            | "RawRecordToRecordBatch" >> self.RawRecordToRecordBatch(batch_size)
        )

    return beam.ptransform_fn(_PTransformFn)()
Project
Project(tensor_names: List[str]) -> TFXIO

Projects the dataset represented by this TFXIO.

A Projected TFXIO: - Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource() - self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors. - It retains a reference to the very original TFXIO, so its TensorAdapter knows about the specs of the tensors that would be produced by the original TensorAdapter. Also see TensorAdapter.OriginalTensorSpec().

May raise an error if the TFMD schema was not provided at construction time.


tensor_names: a set of tensor names.


A TFXIO instance that is the same as self except that: - Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource() - self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors.

Source code in tfx_bsl/tfxio/tfxio.py
def Project(self, tensor_names: List[str]) -> "TFXIO":
    """Projects the dataset represented by this TFXIO.

    A Projected TFXIO:
    - Only columns needed for given tensor_names are guaranteed to be
      produced by `self.BeamSource()`
    - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
      to contain only those tensors.
    - It retains a reference to the very original TFXIO, so its TensorAdapter
      knows about the specs of the tensors that would be produced by the
      original TensorAdapter. Also see `TensorAdapter.OriginalTensorSpec()`.

    May raise an error if the TFMD schema was not provided at construction time.

    Args:
    ----
      tensor_names: a set of tensor names.

    Returns:
    -------
      A `TFXIO` instance that is the same as `self` except that:
      - Only columns needed for given tensor_names are guaranteed to be
        produced by `self.BeamSource()`
      - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
        to contain only those tensors.
    """
    if isinstance(self, _ProjectedTFXIO):
        # pylint: disable=protected-access
        return _ProjectedTFXIO(
            self.origin, self.projected._ProjectImpl(tensor_names)
        )
    return _ProjectedTFXIO(self, self._ProjectImpl(tensor_names))
RawRecordBeamSource
RawRecordBeamSource() -> PTransform

Returns a PTransform that produces a PCollection[bytes].

Used together with RawRecordToRecordBatch(), it allows getting both the PCollection of the raw records and the PCollection of the RecordBatch from the same source. For example:

record_batch = pipeline | tfxio.BeamSource() raw_record = pipeline | tfxio.RawRecordBeamSource()

would result in the files being read twice, while the following would only read once:

raw_record = pipeline | tfxio.RawRecordBeamSource() record_batch = raw_record | tfxio.RawRecordToRecordBatch()

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordBeamSource(self) -> beam.PTransform:
    """Returns a PTransform that produces a PCollection[bytes].

    Used together with RawRecordToRecordBatch(), it allows getting both the
    PCollection of the raw records and the PCollection of the RecordBatch from
    the same source. For example:

    record_batch = pipeline | tfxio.BeamSource()
    raw_record = pipeline | tfxio.RawRecordBeamSource()

    would result in the files being read twice, while the following would only
    read once:

    raw_record = pipeline | tfxio.RawRecordBeamSource()
    record_batch = raw_record | tfxio.RawRecordToRecordBatch()
    """

    @beam.typehints.with_input_types(Any)
    @beam.typehints.with_output_types(bytes)
    def _PTransformFn(pcoll_or_pipeline: Any):
        return (
            pcoll_or_pipeline
            | "ReadRawRecords" >> self._RawRecordBeamSourceInternal()
            | "CollectRawRecordTelemetry"
            >> telemetry.ProfileRawRecords(
                self._telemetry_descriptors,
                self._logical_format,
                self._physical_format,
            )
        )

    return beam.ptransform_fn(_PTransformFn)()
RawRecordTensorFlowDataset
RawRecordTensorFlowDataset(
    options: TensorFlowDatasetOptions,
) -> Dataset

Returns a Dataset that contains nested Datasets of raw records.

May not be implemented for some TFXIOs.

This should be used when RawTfRecordTFXIO.TensorFlowDataset does not suffice. Namely, if there is some logical grouping of files which we need to perform operations on, without applying the operation to each individual group (i.e. shuffle).

The returned Dataset object is a dataset of datasets, where each nested dataset is a dataset of serialized records. When shuffle=False (default), the nested datasets are deterministically ordered. Each nested dataset can represent multiple files. The files are merged into one dataset if the files have the same format. For example:

file_patterns = ['file_1', 'file_2', 'dir_1/*']
file_formats = ['recordio', 'recordio', 'sstable']
tfxio = SomeTFXIO(file_patterns, file_formats)
datasets = tfxio.RawRecordTensorFlowDataset(options)
datasets would result in the following dataset: [ds1, ds2]. Where ds1 iterates over records from 'file_1' and 'file_2', and ds2 iterates over records from files matched by 'dir_1/*'.

Example usage:

tfxio = SomeTFXIO(file_patterns, file_formats)
ds = tfxio.RawRecordTensorFlowDataset(options=options)
ds = ds.flat_map(lambda x: x)
records = list(ds.as_numpy_iterator())
# iterating over `records` yields records from the each file in
# `file_patterns`. See `tf.data.Dataset.list_files` for more information
# about the order of files when expanding globs.
Note that we need a flat_map, because RawRecordTensorFlowDataset returns a dataset of datasets.

When shuffle=True, then the datasets not deterministically ordered, but the contents of each nested dataset are deterministcally ordered. For example, we may potentially have [ds2, ds1, ds3], where the contents of ds1, ds2, and ds3 are all deterministcally ordered.


options: A TensorFlowDatasetOptions object. Not all options will apply.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordTensorFlowDataset(
    self, options: dataset_options.TensorFlowDatasetOptions
) -> tf.data.Dataset:
    """Returns a Dataset that contains nested Datasets of raw records.

    May not be implemented for some TFXIOs.

    This should be used when RawTfRecordTFXIO.TensorFlowDataset does not
    suffice. Namely, if there is some logical grouping of files which we need
    to perform operations on, without applying the operation to each individual
    group (i.e. shuffle).

    The returned Dataset object is a dataset of datasets, where each nested
    dataset is a dataset of serialized records. When shuffle=False (default),
    the nested datasets are deterministically ordered. Each nested dataset can
    represent multiple files. The files are merged into one dataset if the files
    have the same format. For example:

    ```
    file_patterns = ['file_1', 'file_2', 'dir_1/*']
    file_formats = ['recordio', 'recordio', 'sstable']
    tfxio = SomeTFXIO(file_patterns, file_formats)
    datasets = tfxio.RawRecordTensorFlowDataset(options)
    ```
    `datasets` would result in the following dataset: `[ds1, ds2]`. Where ds1
    iterates over records from 'file_1' and 'file_2', and ds2 iterates over
    records from files matched by 'dir_1/*'.

    Example usage:
    ```
    tfxio = SomeTFXIO(file_patterns, file_formats)
    ds = tfxio.RawRecordTensorFlowDataset(options=options)
    ds = ds.flat_map(lambda x: x)
    records = list(ds.as_numpy_iterator())
    # iterating over `records` yields records from the each file in
    # `file_patterns`. See `tf.data.Dataset.list_files` for more information
    # about the order of files when expanding globs.
    ```
    Note that we need a flat_map, because `RawRecordTensorFlowDataset` returns
    a dataset of datasets.

    When shuffle=True, then the datasets not deterministically ordered,
    but the contents of each nested dataset are deterministcally ordered.
    For example, we may potentially have [ds2, ds1, ds3], where the
    contents of ds1, ds2, and ds3 are all deterministcally ordered.

    Args:
    ----
      options: A TensorFlowDatasetOptions object. Not all options will apply.
    """
    raise NotImplementedError
RawRecordToRecordBatch
RawRecordToRecordBatch(
    batch_size: Optional[int] = None,
) -> PTransform

Returns a PTransform that converts raw records to Arrow RecordBatches.

The input PCollection must be from self.RawRecordBeamSource() (also see the documentation for that method).


batch_size: if not None, the pa.RecordBatch produced will be of the specified size. Otherwise it's automatically tuned by Beam.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordToRecordBatch(
    self, batch_size: Optional[int] = None
) -> beam.PTransform:
    """Returns a PTransform that converts raw records to Arrow RecordBatches.

    The input PCollection must be from self.RawRecordBeamSource() (also see
    the documentation for that method).

    Args:
    ----
      batch_size: if not None, the `pa.RecordBatch` produced will be of the
        specified size. Otherwise it's automatically tuned by Beam.
    """

    @beam.typehints.with_input_types(bytes)
    @beam.typehints.with_output_types(pa.RecordBatch)
    def _PTransformFn(pcoll: beam.pvalue.PCollection):
        return (
            pcoll
            | "RawRecordToRecordBatch"
            >> self._RawRecordToRecordBatchInternal(batch_size)
            | "CollectRecordBatchTelemetry"
            >> telemetry.ProfileRecordBatches(
                self._telemetry_descriptors,
                self._logical_format,
                self._physical_format,
            )
        )

    return beam.ptransform_fn(_PTransformFn)()
RecordBatches
RecordBatches(options: RecordBatchesOptions)

Returns an iterable of record batches.

This can be used outside of Apache Beam or TensorFlow to access data.


options: An options object for iterating over record batches. Look at dataset_options.RecordBatchesOptions for more details.

Source code in tfx_bsl/tfxio/csv_tfxio.py
def RecordBatches(self, options: dataset_options.RecordBatchesOptions):
    raise NotImplementedError
SupportAttachingRawRecords
SupportAttachingRawRecords() -> bool
Source code in tfx_bsl/tfxio/csv_tfxio.py
def SupportAttachingRawRecords(self) -> bool:
    return True
TensorAdapter
TensorAdapter() -> TensorAdapter

Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapter(self) -> tensor_adapter.TensorAdapter:
    """Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

    May raise an error if the TFMD schema was not provided at construction time.
    """
    return tensor_adapter.TensorAdapter(self.TensorAdapterConfig())
TensorAdapterConfig
TensorAdapterConfig() -> TensorAdapterConfig

Returns the config to initialize a TensorAdapter.

Returns

a TensorAdapterConfig that is the same as what is used to initialize the TensorAdapter returned by self.TensorAdapter().

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapterConfig(self) -> tensor_adapter.TensorAdapterConfig:
    """Returns the config to initialize a `TensorAdapter`.

    Returns
    -------
      a `TensorAdapterConfig` that is the same as what is used to initialize the
      `TensorAdapter` returned by `self.TensorAdapter()`.
    """
    return tensor_adapter.TensorAdapterConfig(
        self.ArrowSchema(), self.TensorRepresentations()
    )
TensorFlowDataset
TensorFlowDataset(options: TensorFlowDatasetOptions)

Returns a tf.data.Dataset of TF inputs.

May raise an error if the TFMD schema was not provided at construction time.


options: an options object for the tf.data.Dataset. Look at dataset_options.TensorFlowDatasetOptions for more details.

Source code in tfx_bsl/tfxio/csv_tfxio.py
def TensorFlowDataset(self, options: dataset_options.TensorFlowDatasetOptions):
    raise NotImplementedError
TensorRepresentations
TensorRepresentations() -> TensorRepresentations

Returns the TensorRepresentations.

These TensorRepresentations describe the tensors or composite tensors produced by the TensorAdapter created from self.TensorAdapter() or the tf.data.Dataset created from self.TensorFlowDataset().

May raise an error if the TFMD schema was not provided at construction time. May raise an error if the tensor representations are invalid.

Source code in tfx_bsl/tfxio/csv_tfxio.py
def TensorRepresentations(self) -> tensor_adapter.TensorRepresentations:
    return self._TensorRepresentations(not self._schema_projected)

RecordBatchToExamplesEncoder

RecordBatchToExamplesEncoder(
    schema: Optional[Schema] = None,
)

Encodes pa.RecordBatch as a list of serialized tf.Examples.

Requires TFMD schema only if RecordBatches contains nested lists with depth > 2 that represent TensorFlow's RaggedFeatures.

Source code in tfx_bsl/coders/example_coder.py
def __init__(self, schema: Optional[schema_pb2.Schema] = None):
    self._schema = schema
    self._coder = RecordBatchToExamplesEncoderCpp(
        None if schema is None else schema.SerializeToString()
    )
Functions
encode
encode(record_batch: RecordBatch) -> List[bytes]
Source code in tfx_bsl/coders/example_coder.py
def encode(self, record_batch: pa.RecordBatch) -> List[bytes]:  # pylint: disable=invalid-name
    return self._coder.Encode(record_batch)

RecordBatchesOptions

Bases: NamedTuple('RecordBatchesOptions', [('batch_size', int), ('drop_final_batch', bool), ('num_epochs', Optional[int]), ('shuffle', bool), ('shuffle_buffer_size', int), ('shuffle_seed', Optional[int])])

Options for TFXIO's RecordBatches.

Note: not all of these options may be effective. It depends on the particular TFXIO's implementation.

TFExampleBeamRecord

TFExampleBeamRecord(
    physical_format: str,
    telemetry_descriptors: Optional[List[str]] = None,
    schema: Optional[Schema] = None,
    raw_record_column_name: Optional[str] = None,
)

Bases: _TFExampleRecordBase

TFXIO implementation for serialized tf.Examples in pcoll[bytes].

This is a special TFXIO that does not actually do I/O -- it relies on the caller to prepare a PCollection of bytes (serialized tf.Examples).

Initializer.


physical_format: The physical format that describes where the input pcoll[bytes] comes from. Used for telemetry purposes. Examples: "text", "tfrecord". telemetry_descriptors: A set of descriptors that identify the component that is instantiating this TFXIO. These will be used to construct the namespace to contain metrics for profiling and are therefore expected to be identifiers of the component itself and not individual instances of source use. schema: A TFMD Schema describing the dataset. raw_record_column_name: If not None, the generated Arrow RecordBatches will contain a column of the given name that contains serialized records.

Source code in tfx_bsl/tfxio/tf_example_record.py
def __init__(
    self,
    physical_format: str,
    telemetry_descriptors: Optional[List[str]] = None,
    schema: Optional[schema_pb2.Schema] = None,
    raw_record_column_name: Optional[str] = None,
):
    """Initializer.

    Args:
    ----
      physical_format: The physical format that describes where the input
        pcoll[bytes] comes from. Used for telemetry purposes. Examples: "text",
        "tfrecord".
      telemetry_descriptors: A set of descriptors that identify the component
        that is instantiating this TFXIO. These will be used to construct the
        namespace to contain metrics for profiling and are therefore expected to
        be identifiers of the component itself and not individual instances of
        source use.
      schema: A TFMD Schema describing the dataset.
      raw_record_column_name: If not None, the generated Arrow RecordBatches
        will contain a column of the given name that contains serialized
        records.
    """
    super().__init__(
        schema=schema,
        raw_record_column_name=raw_record_column_name,
        telemetry_descriptors=telemetry_descriptors,
        physical_format=physical_format,
    )
Attributes
raw_record_column_name property
raw_record_column_name: Optional[str]
telemetry_descriptors property
telemetry_descriptors: Optional[List[str]]
Functions
ArrowSchema
ArrowSchema() -> Schema

Returns the schema of the RecordBatch produced by self.BeamSource().

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def ArrowSchema(self) -> pa.Schema:
    schema = self._ArrowSchemaNoRawRecordColumn()
    if self._raw_record_column_name is not None:
        if schema.get_field_index(self._raw_record_column_name) != -1:
            raise ValueError(
                f"Raw record column name {self._raw_record_column_name} collided with a column in the schema."
            )
        schema = schema.append(
            pa.field(self._raw_record_column_name, pa.large_list(pa.large_binary()))
        )
    return schema
BeamSource
BeamSource(batch_size: Optional[int] = None) -> PTransform

Returns a beam PTransform that produces PCollection[pa.RecordBatch].

May NOT raise an error if the TFMD schema was not provided at construction time.

If a TFMD schema was provided at construction time, all the pa.RecordBatches in the result PCollection must be of the same schema returned by self.ArrowSchema. If a TFMD schema was not provided, the pa.RecordBatches might not be of the same schema (they may contain different numbers of columns).


batch_size: if not None, the pa.RecordBatch produced will be of the specified size. Otherwise it's automatically tuned by Beam.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def BeamSource(self, batch_size: Optional[int] = None) -> beam.PTransform:
    @beam.typehints.with_input_types(Any)
    @beam.typehints.with_output_types(pa.RecordBatch)
    def _PTransformFn(pcoll_or_pipeline: Any):
        """Converts raw records to RecordBatches."""
        return (
            pcoll_or_pipeline
            | "RawRecordBeamSource" >> self.RawRecordBeamSource()
            | "RawRecordToRecordBatch" >> self.RawRecordToRecordBatch(batch_size)
        )

    return beam.ptransform_fn(_PTransformFn)()
Project
Project(tensor_names: List[str]) -> TFXIO

Projects the dataset represented by this TFXIO.

A Projected TFXIO: - Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource() - self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors. - It retains a reference to the very original TFXIO, so its TensorAdapter knows about the specs of the tensors that would be produced by the original TensorAdapter. Also see TensorAdapter.OriginalTensorSpec().

May raise an error if the TFMD schema was not provided at construction time.


tensor_names: a set of tensor names.


A TFXIO instance that is the same as self except that: - Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource() - self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors.

Source code in tfx_bsl/tfxio/tfxio.py
def Project(self, tensor_names: List[str]) -> "TFXIO":
    """Projects the dataset represented by this TFXIO.

    A Projected TFXIO:
    - Only columns needed for given tensor_names are guaranteed to be
      produced by `self.BeamSource()`
    - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
      to contain only those tensors.
    - It retains a reference to the very original TFXIO, so its TensorAdapter
      knows about the specs of the tensors that would be produced by the
      original TensorAdapter. Also see `TensorAdapter.OriginalTensorSpec()`.

    May raise an error if the TFMD schema was not provided at construction time.

    Args:
    ----
      tensor_names: a set of tensor names.

    Returns:
    -------
      A `TFXIO` instance that is the same as `self` except that:
      - Only columns needed for given tensor_names are guaranteed to be
        produced by `self.BeamSource()`
      - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
        to contain only those tensors.
    """
    if isinstance(self, _ProjectedTFXIO):
        # pylint: disable=protected-access
        return _ProjectedTFXIO(
            self.origin, self.projected._ProjectImpl(tensor_names)
        )
    return _ProjectedTFXIO(self, self._ProjectImpl(tensor_names))
RawRecordBeamSource
RawRecordBeamSource() -> PTransform

Returns a PTransform that produces a PCollection[bytes].

Used together with RawRecordToRecordBatch(), it allows getting both the PCollection of the raw records and the PCollection of the RecordBatch from the same source. For example:

record_batch = pipeline | tfxio.BeamSource() raw_record = pipeline | tfxio.RawRecordBeamSource()

would result in the files being read twice, while the following would only read once:

raw_record = pipeline | tfxio.RawRecordBeamSource() record_batch = raw_record | tfxio.RawRecordToRecordBatch()

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordBeamSource(self) -> beam.PTransform:
    """Returns a PTransform that produces a PCollection[bytes].

    Used together with RawRecordToRecordBatch(), it allows getting both the
    PCollection of the raw records and the PCollection of the RecordBatch from
    the same source. For example:

    record_batch = pipeline | tfxio.BeamSource()
    raw_record = pipeline | tfxio.RawRecordBeamSource()

    would result in the files being read twice, while the following would only
    read once:

    raw_record = pipeline | tfxio.RawRecordBeamSource()
    record_batch = raw_record | tfxio.RawRecordToRecordBatch()
    """

    @beam.typehints.with_input_types(Any)
    @beam.typehints.with_output_types(bytes)
    def _PTransformFn(pcoll_or_pipeline: Any):
        return (
            pcoll_or_pipeline
            | "ReadRawRecords" >> self._RawRecordBeamSourceInternal()
            | "CollectRawRecordTelemetry"
            >> telemetry.ProfileRawRecords(
                self._telemetry_descriptors,
                self._logical_format,
                self._physical_format,
            )
        )

    return beam.ptransform_fn(_PTransformFn)()
RawRecordTensorFlowDataset
RawRecordTensorFlowDataset(
    options: TensorFlowDatasetOptions,
) -> Dataset

Returns a Dataset that contains nested Datasets of raw records.

May not be implemented for some TFXIOs.

This should be used when RawTfRecordTFXIO.TensorFlowDataset does not suffice. Namely, if there is some logical grouping of files which we need to perform operations on, without applying the operation to each individual group (i.e. shuffle).

The returned Dataset object is a dataset of datasets, where each nested dataset is a dataset of serialized records. When shuffle=False (default), the nested datasets are deterministically ordered. Each nested dataset can represent multiple files. The files are merged into one dataset if the files have the same format. For example:

file_patterns = ['file_1', 'file_2', 'dir_1/*']
file_formats = ['recordio', 'recordio', 'sstable']
tfxio = SomeTFXIO(file_patterns, file_formats)
datasets = tfxio.RawRecordTensorFlowDataset(options)
datasets would result in the following dataset: [ds1, ds2]. Where ds1 iterates over records from 'file_1' and 'file_2', and ds2 iterates over records from files matched by 'dir_1/*'.

Example usage:

tfxio = SomeTFXIO(file_patterns, file_formats)
ds = tfxio.RawRecordTensorFlowDataset(options=options)
ds = ds.flat_map(lambda x: x)
records = list(ds.as_numpy_iterator())
# iterating over `records` yields records from the each file in
# `file_patterns`. See `tf.data.Dataset.list_files` for more information
# about the order of files when expanding globs.
Note that we need a flat_map, because RawRecordTensorFlowDataset returns a dataset of datasets.

When shuffle=True, then the datasets not deterministically ordered, but the contents of each nested dataset are deterministcally ordered. For example, we may potentially have [ds2, ds1, ds3], where the contents of ds1, ds2, and ds3 are all deterministcally ordered.


options: A TensorFlowDatasetOptions object. Not all options will apply.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordTensorFlowDataset(
    self, options: dataset_options.TensorFlowDatasetOptions
) -> tf.data.Dataset:
    """Returns a Dataset that contains nested Datasets of raw records.

    May not be implemented for some TFXIOs.

    This should be used when RawTfRecordTFXIO.TensorFlowDataset does not
    suffice. Namely, if there is some logical grouping of files which we need
    to perform operations on, without applying the operation to each individual
    group (i.e. shuffle).

    The returned Dataset object is a dataset of datasets, where each nested
    dataset is a dataset of serialized records. When shuffle=False (default),
    the nested datasets are deterministically ordered. Each nested dataset can
    represent multiple files. The files are merged into one dataset if the files
    have the same format. For example:

    ```
    file_patterns = ['file_1', 'file_2', 'dir_1/*']
    file_formats = ['recordio', 'recordio', 'sstable']
    tfxio = SomeTFXIO(file_patterns, file_formats)
    datasets = tfxio.RawRecordTensorFlowDataset(options)
    ```
    `datasets` would result in the following dataset: `[ds1, ds2]`. Where ds1
    iterates over records from 'file_1' and 'file_2', and ds2 iterates over
    records from files matched by 'dir_1/*'.

    Example usage:
    ```
    tfxio = SomeTFXIO(file_patterns, file_formats)
    ds = tfxio.RawRecordTensorFlowDataset(options=options)
    ds = ds.flat_map(lambda x: x)
    records = list(ds.as_numpy_iterator())
    # iterating over `records` yields records from the each file in
    # `file_patterns`. See `tf.data.Dataset.list_files` for more information
    # about the order of files when expanding globs.
    ```
    Note that we need a flat_map, because `RawRecordTensorFlowDataset` returns
    a dataset of datasets.

    When shuffle=True, then the datasets not deterministically ordered,
    but the contents of each nested dataset are deterministcally ordered.
    For example, we may potentially have [ds2, ds1, ds3], where the
    contents of ds1, ds2, and ds3 are all deterministcally ordered.

    Args:
    ----
      options: A TensorFlowDatasetOptions object. Not all options will apply.
    """
    raise NotImplementedError
RawRecordToRecordBatch
RawRecordToRecordBatch(
    batch_size: Optional[int] = None,
) -> PTransform

Returns a PTransform that converts raw records to Arrow RecordBatches.

The input PCollection must be from self.RawRecordBeamSource() (also see the documentation for that method).


batch_size: if not None, the pa.RecordBatch produced will be of the specified size. Otherwise it's automatically tuned by Beam.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordToRecordBatch(
    self, batch_size: Optional[int] = None
) -> beam.PTransform:
    """Returns a PTransform that converts raw records to Arrow RecordBatches.

    The input PCollection must be from self.RawRecordBeamSource() (also see
    the documentation for that method).

    Args:
    ----
      batch_size: if not None, the `pa.RecordBatch` produced will be of the
        specified size. Otherwise it's automatically tuned by Beam.
    """

    @beam.typehints.with_input_types(bytes)
    @beam.typehints.with_output_types(pa.RecordBatch)
    def _PTransformFn(pcoll: beam.pvalue.PCollection):
        return (
            pcoll
            | "RawRecordToRecordBatch"
            >> self._RawRecordToRecordBatchInternal(batch_size)
            | "CollectRecordBatchTelemetry"
            >> telemetry.ProfileRecordBatches(
                self._telemetry_descriptors,
                self._logical_format,
                self._physical_format,
            )
        )

    return beam.ptransform_fn(_PTransformFn)()
RecordBatches
RecordBatches(options: RecordBatchesOptions)

Returns an iterable of record batches.

This can be used outside of Apache Beam or TensorFlow to access data.


options: An options object for iterating over record batches. Look at dataset_options.RecordBatchesOptions for more details.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RecordBatches(self, options: dataset_options.RecordBatchesOptions):
    raise NotImplementedError
SupportAttachingRawRecords
SupportAttachingRawRecords() -> bool
Source code in tfx_bsl/tfxio/tf_example_record.py
def SupportAttachingRawRecords(self) -> bool:
    return True
TensorAdapter
TensorAdapter() -> TensorAdapter

Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapter(self) -> tensor_adapter.TensorAdapter:
    """Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

    May raise an error if the TFMD schema was not provided at construction time.
    """
    return tensor_adapter.TensorAdapter(self.TensorAdapterConfig())
TensorAdapterConfig
TensorAdapterConfig() -> TensorAdapterConfig

Returns the config to initialize a TensorAdapter.

Returns

a TensorAdapterConfig that is the same as what is used to initialize the TensorAdapter returned by self.TensorAdapter().

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapterConfig(self) -> tensor_adapter.TensorAdapterConfig:
    """Returns the config to initialize a `TensorAdapter`.

    Returns
    -------
      a `TensorAdapterConfig` that is the same as what is used to initialize the
      `TensorAdapter` returned by `self.TensorAdapter()`.
    """
    return tensor_adapter.TensorAdapterConfig(
        self.ArrowSchema(), self.TensorRepresentations()
    )
TensorFlowDataset
TensorFlowDataset(options: TensorFlowDatasetOptions)

Returns a tf.data.Dataset of TF inputs.

May raise an error if the TFMD schema was not provided at construction time.


options: an options object for the tf.data.Dataset. Look at dataset_options.TensorFlowDatasetOptions for more details.

Source code in tfx_bsl/tfxio/tf_example_record.py
def TensorFlowDataset(self, options: dataset_options.TensorFlowDatasetOptions):
    raise NotImplementedError(
        "TFExampleBeamRecord is unable to provide a TensorFlowDataset "
        "because it does not do I/O"
    )
TensorRepresentations
TensorRepresentations() -> TensorRepresentations

Returns the TensorRepresentations.

These TensorRepresentations describe the tensors or composite tensors produced by the TensorAdapter created from self.TensorAdapter() or the tf.data.Dataset created from self.TensorFlowDataset().

May raise an error if the TFMD schema was not provided at construction time. May raise an error if the tensor representations are invalid.

Source code in tfx_bsl/tfxio/tf_example_record.py
def TensorRepresentations(self) -> tensor_adapter.TensorRepresentations:
    return tensor_representation_util.InferTensorRepresentationsFromMixedSchema(
        self._schema
    )

TFExampleRecord

TFExampleRecord(
    file_pattern: Union[List[str], str],
    validate: bool = True,
    schema: Optional[Schema] = None,
    raw_record_column_name: Optional[str] = None,
    telemetry_descriptors: Optional[List[str]] = None,
)

Bases: _TFExampleRecordBase

TFXIO implementation for tf.Example on TFRecord.

Initializes a TFExampleRecord TFXIO.


file_pattern: A file glob pattern to read TFRecords from. validate: Not used. do not set. (not used since post 0.22.1). schema: A TFMD Schema describing the dataset. raw_record_column_name: If not None, the generated Arrow RecordBatches will contain a column of the given name that contains serialized records. telemetry_descriptors: A set of descriptors that identify the component that is instantiating this TFXIO. These will be used to construct the namespace to contain metrics for profiling and are therefore expected to be identifiers of the component itself and not individual instances of source use.

Source code in tfx_bsl/tfxio/tf_example_record.py
def __init__(
    self,
    file_pattern: Union[List[str], str],
    validate: bool = True,
    schema: Optional[schema_pb2.Schema] = None,
    raw_record_column_name: Optional[str] = None,
    telemetry_descriptors: Optional[List[str]] = None,
):
    """Initializes a TFExampleRecord TFXIO.

    Args:
    ----
      file_pattern: A file glob pattern to read TFRecords from.
      validate: Not used. do not set. (not used since post 0.22.1).
      schema: A TFMD Schema describing the dataset.
      raw_record_column_name: If not None, the generated Arrow RecordBatches
        will contain a column of the given name that contains serialized
        records.
      telemetry_descriptors: A set of descriptors that identify the component
        that is instantiating this TFXIO. These will be used to construct the
        namespace to contain metrics for profiling and are therefore expected to
        be identifiers of the component itself and not individual instances of
        source use.
    """
    super().__init__(
        schema=schema,
        raw_record_column_name=raw_record_column_name,
        telemetry_descriptors=telemetry_descriptors,
        physical_format="tfrecords_gzip",
    )
    del validate
    if not isinstance(file_pattern, list):
        file_pattern = [file_pattern]
    assert file_pattern, "Must provide at least one file pattern."
    self._file_pattern = file_pattern
Attributes
raw_record_column_name property
raw_record_column_name: Optional[str]
telemetry_descriptors property
telemetry_descriptors: Optional[List[str]]
Functions
ArrowSchema
ArrowSchema() -> Schema

Returns the schema of the RecordBatch produced by self.BeamSource().

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def ArrowSchema(self) -> pa.Schema:
    schema = self._ArrowSchemaNoRawRecordColumn()
    if self._raw_record_column_name is not None:
        if schema.get_field_index(self._raw_record_column_name) != -1:
            raise ValueError(
                f"Raw record column name {self._raw_record_column_name} collided with a column in the schema."
            )
        schema = schema.append(
            pa.field(self._raw_record_column_name, pa.large_list(pa.large_binary()))
        )
    return schema
BeamSource
BeamSource(batch_size: Optional[int] = None) -> PTransform

Returns a beam PTransform that produces PCollection[pa.RecordBatch].

May NOT raise an error if the TFMD schema was not provided at construction time.

If a TFMD schema was provided at construction time, all the pa.RecordBatches in the result PCollection must be of the same schema returned by self.ArrowSchema. If a TFMD schema was not provided, the pa.RecordBatches might not be of the same schema (they may contain different numbers of columns).


batch_size: if not None, the pa.RecordBatch produced will be of the specified size. Otherwise it's automatically tuned by Beam.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def BeamSource(self, batch_size: Optional[int] = None) -> beam.PTransform:
    @beam.typehints.with_input_types(Any)
    @beam.typehints.with_output_types(pa.RecordBatch)
    def _PTransformFn(pcoll_or_pipeline: Any):
        """Converts raw records to RecordBatches."""
        return (
            pcoll_or_pipeline
            | "RawRecordBeamSource" >> self.RawRecordBeamSource()
            | "RawRecordToRecordBatch" >> self.RawRecordToRecordBatch(batch_size)
        )

    return beam.ptransform_fn(_PTransformFn)()
Project
Project(tensor_names: List[str]) -> TFXIO

Projects the dataset represented by this TFXIO.

A Projected TFXIO: - Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource() - self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors. - It retains a reference to the very original TFXIO, so its TensorAdapter knows about the specs of the tensors that would be produced by the original TensorAdapter. Also see TensorAdapter.OriginalTensorSpec().

May raise an error if the TFMD schema was not provided at construction time.


tensor_names: a set of tensor names.


A TFXIO instance that is the same as self except that: - Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource() - self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors.

Source code in tfx_bsl/tfxio/tfxio.py
def Project(self, tensor_names: List[str]) -> "TFXIO":
    """Projects the dataset represented by this TFXIO.

    A Projected TFXIO:
    - Only columns needed for given tensor_names are guaranteed to be
      produced by `self.BeamSource()`
    - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
      to contain only those tensors.
    - It retains a reference to the very original TFXIO, so its TensorAdapter
      knows about the specs of the tensors that would be produced by the
      original TensorAdapter. Also see `TensorAdapter.OriginalTensorSpec()`.

    May raise an error if the TFMD schema was not provided at construction time.

    Args:
    ----
      tensor_names: a set of tensor names.

    Returns:
    -------
      A `TFXIO` instance that is the same as `self` except that:
      - Only columns needed for given tensor_names are guaranteed to be
        produced by `self.BeamSource()`
      - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
        to contain only those tensors.
    """
    if isinstance(self, _ProjectedTFXIO):
        # pylint: disable=protected-access
        return _ProjectedTFXIO(
            self.origin, self.projected._ProjectImpl(tensor_names)
        )
    return _ProjectedTFXIO(self, self._ProjectImpl(tensor_names))
RawRecordBeamSource
RawRecordBeamSource() -> PTransform

Returns a PTransform that produces a PCollection[bytes].

Used together with RawRecordToRecordBatch(), it allows getting both the PCollection of the raw records and the PCollection of the RecordBatch from the same source. For example:

record_batch = pipeline | tfxio.BeamSource() raw_record = pipeline | tfxio.RawRecordBeamSource()

would result in the files being read twice, while the following would only read once:

raw_record = pipeline | tfxio.RawRecordBeamSource() record_batch = raw_record | tfxio.RawRecordToRecordBatch()

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordBeamSource(self) -> beam.PTransform:
    """Returns a PTransform that produces a PCollection[bytes].

    Used together with RawRecordToRecordBatch(), it allows getting both the
    PCollection of the raw records and the PCollection of the RecordBatch from
    the same source. For example:

    record_batch = pipeline | tfxio.BeamSource()
    raw_record = pipeline | tfxio.RawRecordBeamSource()

    would result in the files being read twice, while the following would only
    read once:

    raw_record = pipeline | tfxio.RawRecordBeamSource()
    record_batch = raw_record | tfxio.RawRecordToRecordBatch()
    """

    @beam.typehints.with_input_types(Any)
    @beam.typehints.with_output_types(bytes)
    def _PTransformFn(pcoll_or_pipeline: Any):
        return (
            pcoll_or_pipeline
            | "ReadRawRecords" >> self._RawRecordBeamSourceInternal()
            | "CollectRawRecordTelemetry"
            >> telemetry.ProfileRawRecords(
                self._telemetry_descriptors,
                self._logical_format,
                self._physical_format,
            )
        )

    return beam.ptransform_fn(_PTransformFn)()
RawRecordTensorFlowDataset
RawRecordTensorFlowDataset(
    options: TensorFlowDatasetOptions,
) -> Dataset

Returns a Dataset that contains nested Datasets of raw records.

May not be implemented for some TFXIOs.

This should be used when RawTfRecordTFXIO.TensorFlowDataset does not suffice. Namely, if there is some logical grouping of files which we need to perform operations on, without applying the operation to each individual group (i.e. shuffle).

The returned Dataset object is a dataset of datasets, where each nested dataset is a dataset of serialized records. When shuffle=False (default), the nested datasets are deterministically ordered. Each nested dataset can represent multiple files. The files are merged into one dataset if the files have the same format. For example:

file_patterns = ['file_1', 'file_2', 'dir_1/*']
file_formats = ['recordio', 'recordio', 'sstable']
tfxio = SomeTFXIO(file_patterns, file_formats)
datasets = tfxio.RawRecordTensorFlowDataset(options)
datasets would result in the following dataset: [ds1, ds2]. Where ds1 iterates over records from 'file_1' and 'file_2', and ds2 iterates over records from files matched by 'dir_1/*'.

Example usage:

tfxio = SomeTFXIO(file_patterns, file_formats)
ds = tfxio.RawRecordTensorFlowDataset(options=options)
ds = ds.flat_map(lambda x: x)
records = list(ds.as_numpy_iterator())
# iterating over `records` yields records from the each file in
# `file_patterns`. See `tf.data.Dataset.list_files` for more information
# about the order of files when expanding globs.
Note that we need a flat_map, because RawRecordTensorFlowDataset returns a dataset of datasets.

When shuffle=True, then the datasets not deterministically ordered, but the contents of each nested dataset are deterministcally ordered. For example, we may potentially have [ds2, ds1, ds3], where the contents of ds1, ds2, and ds3 are all deterministcally ordered.


options: A TensorFlowDatasetOptions object. Not all options will apply.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordTensorFlowDataset(
    self, options: dataset_options.TensorFlowDatasetOptions
) -> tf.data.Dataset:
    """Returns a Dataset that contains nested Datasets of raw records.

    May not be implemented for some TFXIOs.

    This should be used when RawTfRecordTFXIO.TensorFlowDataset does not
    suffice. Namely, if there is some logical grouping of files which we need
    to perform operations on, without applying the operation to each individual
    group (i.e. shuffle).

    The returned Dataset object is a dataset of datasets, where each nested
    dataset is a dataset of serialized records. When shuffle=False (default),
    the nested datasets are deterministically ordered. Each nested dataset can
    represent multiple files. The files are merged into one dataset if the files
    have the same format. For example:

    ```
    file_patterns = ['file_1', 'file_2', 'dir_1/*']
    file_formats = ['recordio', 'recordio', 'sstable']
    tfxio = SomeTFXIO(file_patterns, file_formats)
    datasets = tfxio.RawRecordTensorFlowDataset(options)
    ```
    `datasets` would result in the following dataset: `[ds1, ds2]`. Where ds1
    iterates over records from 'file_1' and 'file_2', and ds2 iterates over
    records from files matched by 'dir_1/*'.

    Example usage:
    ```
    tfxio = SomeTFXIO(file_patterns, file_formats)
    ds = tfxio.RawRecordTensorFlowDataset(options=options)
    ds = ds.flat_map(lambda x: x)
    records = list(ds.as_numpy_iterator())
    # iterating over `records` yields records from the each file in
    # `file_patterns`. See `tf.data.Dataset.list_files` for more information
    # about the order of files when expanding globs.
    ```
    Note that we need a flat_map, because `RawRecordTensorFlowDataset` returns
    a dataset of datasets.

    When shuffle=True, then the datasets not deterministically ordered,
    but the contents of each nested dataset are deterministcally ordered.
    For example, we may potentially have [ds2, ds1, ds3], where the
    contents of ds1, ds2, and ds3 are all deterministcally ordered.

    Args:
    ----
      options: A TensorFlowDatasetOptions object. Not all options will apply.
    """
    raise NotImplementedError
RawRecordToRecordBatch
RawRecordToRecordBatch(
    batch_size: Optional[int] = None,
) -> PTransform

Returns a PTransform that converts raw records to Arrow RecordBatches.

The input PCollection must be from self.RawRecordBeamSource() (also see the documentation for that method).


batch_size: if not None, the pa.RecordBatch produced will be of the specified size. Otherwise it's automatically tuned by Beam.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordToRecordBatch(
    self, batch_size: Optional[int] = None
) -> beam.PTransform:
    """Returns a PTransform that converts raw records to Arrow RecordBatches.

    The input PCollection must be from self.RawRecordBeamSource() (also see
    the documentation for that method).

    Args:
    ----
      batch_size: if not None, the `pa.RecordBatch` produced will be of the
        specified size. Otherwise it's automatically tuned by Beam.
    """

    @beam.typehints.with_input_types(bytes)
    @beam.typehints.with_output_types(pa.RecordBatch)
    def _PTransformFn(pcoll: beam.pvalue.PCollection):
        return (
            pcoll
            | "RawRecordToRecordBatch"
            >> self._RawRecordToRecordBatchInternal(batch_size)
            | "CollectRecordBatchTelemetry"
            >> telemetry.ProfileRecordBatches(
                self._telemetry_descriptors,
                self._logical_format,
                self._physical_format,
            )
        )

    return beam.ptransform_fn(_PTransformFn)()
RecordBatches
RecordBatches(
    options: RecordBatchesOptions,
) -> Iterator[RecordBatch]

Returns an iterable of record batches.

This can be used outside of Apache Beam or TensorFlow to access data.


options: An options object for iterating over record batches. Look at dataset_options.RecordBatchesOptions for more details.

Source code in tfx_bsl/tfxio/tf_example_record.py
def RecordBatches(
    self, options: dataset_options.RecordBatchesOptions
) -> Iterator[pa.RecordBatch]:
    dataset = dataset_util.make_tf_record_dataset(
        self._file_pattern,
        options.batch_size,
        options.drop_final_batch,
        options.num_epochs,
        options.shuffle,
        options.shuffle_buffer_size,
        options.shuffle_seed,
    )

    decoder = example_coder.ExamplesToRecordBatchDecoder(
        self._schema.SerializeToString() if self._schema is not None else None
    )
    for examples in dataset.as_numpy_iterator():
        decoded = decoder.DecodeBatch(examples)
        if self._raw_record_column_name is None:
            yield decoded
        else:
            yield record_based_tfxio.AppendRawRecordColumn(
                decoded, self._raw_record_column_name, examples.tolist()
            )
SupportAttachingRawRecords
SupportAttachingRawRecords() -> bool
Source code in tfx_bsl/tfxio/tf_example_record.py
def SupportAttachingRawRecords(self) -> bool:
    return True
TensorAdapter
TensorAdapter() -> TensorAdapter

Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapter(self) -> tensor_adapter.TensorAdapter:
    """Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

    May raise an error if the TFMD schema was not provided at construction time.
    """
    return tensor_adapter.TensorAdapter(self.TensorAdapterConfig())
TensorAdapterConfig
TensorAdapterConfig() -> TensorAdapterConfig

Returns the config to initialize a TensorAdapter.

Returns

a TensorAdapterConfig that is the same as what is used to initialize the TensorAdapter returned by self.TensorAdapter().

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapterConfig(self) -> tensor_adapter.TensorAdapterConfig:
    """Returns the config to initialize a `TensorAdapter`.

    Returns
    -------
      a `TensorAdapterConfig` that is the same as what is used to initialize the
      `TensorAdapter` returned by `self.TensorAdapter()`.
    """
    return tensor_adapter.TensorAdapterConfig(
        self.ArrowSchema(), self.TensorRepresentations()
    )
TensorFlowDataset
TensorFlowDataset(
    options: TensorFlowDatasetOptions,
) -> Dataset

Creates a TFRecordDataset that yields Tensors.

The serialized tf.Examples are parsed by tf.io.parse_example to create Tensors.

See base class (tfxio.TFXIO) for more details.


options: an options object for the tf.data.Dataset. See dataset_options.TensorFlowDatasetOptions for more details.


A dataset of dict elements, (or a tuple of dict elements and label). Each dict maps feature keys to Tensor, SparseTensor, or RaggedTensor objects.


ValueError: if there is something wrong with the tensor_representation.

Source code in tfx_bsl/tfxio/tf_example_record.py
def TensorFlowDataset(
    self, options: dataset_options.TensorFlowDatasetOptions
) -> tf.data.Dataset:
    """Creates a TFRecordDataset that yields Tensors.

    The serialized tf.Examples are parsed by `tf.io.parse_example` to create
    Tensors.

    See base class (tfxio.TFXIO) for more details.

    Args:
    ----
      options: an options object for the tf.data.Dataset. See
        `dataset_options.TensorFlowDatasetOptions` for more details.

    Returns:
    -------
      A dataset of `dict` elements, (or a tuple of `dict` elements and label).
      Each `dict` maps feature keys to `Tensor`, `SparseTensor`, or
      `RaggedTensor` objects.

    Raises:
    ------
      ValueError: if there is something wrong with the tensor_representation.
    """
    (tf_example_parser_config, feature_name_to_tensor_name) = (
        self._GetTfExampleParserConfig()
    )

    file_pattern = tf.convert_to_tensor(self._file_pattern)
    dataset = dataset_util.make_tf_record_dataset(
        file_pattern,
        batch_size=options.batch_size,
        num_epochs=options.num_epochs,
        shuffle=options.shuffle,
        shuffle_buffer_size=options.shuffle_buffer_size,
        shuffle_seed=options.shuffle_seed,
        reader_num_threads=options.reader_num_threads,
        drop_final_batch=options.drop_final_batch,
    )

    # Parse `Example` tensors to a dictionary of `Feature` tensors.
    dataset = dataset.apply(
        tf.data.experimental.parse_example_dataset(tf_example_parser_config)
    )

    dataset = dataset.map(
        lambda x: self._RenameFeatures(x, feature_name_to_tensor_name)
    )

    label_key = options.label_key
    if label_key is not None:
        dataset = self._PopLabelFeatureFromDataset(dataset, label_key)

    return dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
TensorRepresentations
TensorRepresentations() -> TensorRepresentations

Returns the TensorRepresentations.

These TensorRepresentations describe the tensors or composite tensors produced by the TensorAdapter created from self.TensorAdapter() or the tf.data.Dataset created from self.TensorFlowDataset().

May raise an error if the TFMD schema was not provided at construction time. May raise an error if the tensor representations are invalid.

Source code in tfx_bsl/tfxio/tf_example_record.py
def TensorRepresentations(self) -> tensor_adapter.TensorRepresentations:
    return tensor_representation_util.InferTensorRepresentationsFromMixedSchema(
        self._schema
    )

TFGraphRecordDecoder

Base class for decoders that turns a list of bytes to (composite) tensors.

Sub-classes must implement decode_record() (see its docstring for requirements).

Decoder instances can be saved as a SavedModel by save_decoder(). The SavedModel can be loaded back by load_decoder(). However, the loaded decoder will always be of the type LoadedDecoder and only have the public interfaces listed in this base class available.

Attributes
record_index_tensor_name property
record_index_tensor_name: Optional[str]

The name of the tensor indicating which record a slice is from.

The decoded tensors are batch-aligned among themselves, but they don't necessarily have to be batch-aligned with the input records. If not, sub-classes should implement this method to tie the batch dimension with the input record.

The record index tensor must be a SparseTensor or a RaggedTensor of integral type, and must be 2-D and must not contain "missing" values.

A record index tensor like the following: [[0], [0], [2]] means that of 3 "rows" in the output "batch", the first two rows came from the first record, and the 3rd row came from the third record.

The name must not be an empty string.

Returns

The name of the record index tensor.

Functions
decode_record abstractmethod
decode_record(records: Tensor) -> Dict[str, TensorAlike]

Sub-classes should implement this.

Implementations must use TF ops to derive the result (composite) tensors, as this function will be traced and become a tf.function (thus a TF Graph). Note that autograph is not enabled in such tracing, which means any python control flow / loops will not be converted to TF cond / loops automatically.

The returned tensors must be batch-aligned (i.e. they should be at least of rank 1, and their outer-most dimensions must be of the same size). They do not have to be batch-aligned with the input tensor, but if that's the case, an additional tensor must be provided among the results, to indicate which input record a "row" in the output batch comes from. See record_index_tensor_name for more details.


records: a 1-D string tensor that contains the records to be decoded.


A dict of (composite) tensors.

Source code in tfx_bsl/coders/tf_graph_record_decoder.py
@abc.abstractmethod
def decode_record(self, records: tf.Tensor) -> Dict[str, TensorAlike]:
    """Sub-classes should implement this.

    Implementations must use TF ops to derive the result (composite) tensors, as
    this function will be traced and become a tf.function (thus a TF Graph).
    Note that autograph is not enabled in such tracing, which means any python
    control flow / loops will not be converted to TF cond / loops automatically.

    The returned tensors must be batch-aligned (i.e. they should be at least
    of rank 1, and their outer-most dimensions must be of the same size). They
    do not have to be batch-aligned with the input tensor, but if that's the
    case, an additional tensor must be provided among the results, to indicate
    which input record a "row" in the output batch comes from. See
    `record_index_tensor_name` for more details.

    Args:
    ----
      records: a 1-D string tensor that contains the records to be decoded.

    Returns:
    -------
      A dict of (composite) tensors.
    """
output_type_specs
output_type_specs() -> Dict[str, TypeSpec]

Returns the tf.TypeSpecs of the decoded tensors.

Returns

A dict whose keys are the same as keys of the dict returned by decode_record() and values are the tf.TypeSpec of the corresponding (composite) tensor.

Source code in tfx_bsl/coders/tf_graph_record_decoder.py
def output_type_specs(self) -> Dict[str, tf.TypeSpec]:
    """Returns the tf.TypeSpecs of the decoded tensors.

    Returns
    -------
      A dict whose keys are the same as keys of the dict returned by
      `decode_record()` and values are the tf.TypeSpec of the corresponding
      (composite) tensor.
    """
    return {
        k: tf.type_spec_from_value(v)
        for k, v in self._make_concrete_decode_function().structured_outputs.items()
    }
save
save(path: str) -> None

Saves this TFGraphRecordDecoder to a SavedModel at path.

This functions the same as tf_graph_record_decoder.save_decoder(). This is provided purely for convenience, and should not impact the actual saved model, since only the tf.function from _make_concrete_decode_function is saved.


path: The path to where the saved_model is saved.

Source code in tfx_bsl/coders/tf_graph_record_decoder.py
def save(self, path: str) -> None:
    """Saves this TFGraphRecordDecoder to a SavedModel at `path`.

    This functions the same as `tf_graph_record_decoder.save_decoder()`. This is
    provided purely for convenience, and should not impact the actual saved
    model, since only the `tf.function` from `_make_concrete_decode_function` is
    saved.

    Args:
    ----
      path: The path to where the saved_model is saved.
    """
    save_decoder(self, path)

TFSequenceExampleBeamRecord

TFSequenceExampleBeamRecord(
    physical_format: str,
    telemetry_descriptors: List[str],
    schema: Optional[Schema] = None,
    raw_record_column_name: Optional[str] = None,
)

Bases: _TFSequenceExampleRecordBase

TFXIO implementation for serialized tf.SequenceExamples in pcoll[bytes].

This is a special TFXIO that does not actually do I/O -- it relies on the caller to prepare a PCollection of bytes (serialized tf.SequenceExamples).

Initializer.


physical_format: The physical format that describes where the input pcoll[bytes] comes from. Used for telemetry purposes. Examples: "text", "tfrecord". telemetry_descriptors: A set of descriptors that identify the component that is instantiating this TFXIO. These will be used to construct the namespace to contain metrics for profiling and are therefore expected to be identifiers of the component itself and not individual instances of source use. schema: A TFMD Schema describing the dataset. raw_record_column_name: If not None, the generated Arrow RecordBatches will contain a column of the given name that contains serialized records.

Source code in tfx_bsl/tfxio/tf_sequence_example_record.py
def __init__(
    self,
    physical_format: str,
    telemetry_descriptors: List[str],
    schema: Optional[schema_pb2.Schema] = None,
    raw_record_column_name: Optional[str] = None,
):
    """Initializer.

    Args:
    ----
      physical_format: The physical format that describes where the input
        pcoll[bytes] comes from. Used for telemetry purposes. Examples: "text",
        "tfrecord".
      telemetry_descriptors: A set of descriptors that identify the component
        that is instantiating this TFXIO. These will be used to construct the
        namespace to contain metrics for profiling and are therefore expected to
        be identifiers of the component itself and not individual instances of
        source use.
      schema: A TFMD Schema describing the dataset.
      raw_record_column_name: If not None, the generated Arrow RecordBatches
        will contain a column of the given name that contains serialized
        records.
    """
    super().__init__(
        schema=schema,
        raw_record_column_name=raw_record_column_name,
        telemetry_descriptors=telemetry_descriptors,
        physical_format=physical_format,
    )
Attributes
raw_record_column_name property
raw_record_column_name: Optional[str]
schema property
schema: Schema
telemetry_descriptors property
telemetry_descriptors: Optional[List[str]]
Functions
ArrowSchema
ArrowSchema() -> Schema

Returns the schema of the RecordBatch produced by self.BeamSource().

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def ArrowSchema(self) -> pa.Schema:
    schema = self._ArrowSchemaNoRawRecordColumn()
    if self._raw_record_column_name is not None:
        if schema.get_field_index(self._raw_record_column_name) != -1:
            raise ValueError(
                f"Raw record column name {self._raw_record_column_name} collided with a column in the schema."
            )
        schema = schema.append(
            pa.field(self._raw_record_column_name, pa.large_list(pa.large_binary()))
        )
    return schema
BeamSource
BeamSource(batch_size: Optional[int] = None) -> PTransform

Returns a beam PTransform that produces PCollection[pa.RecordBatch].

May NOT raise an error if the TFMD schema was not provided at construction time.

If a TFMD schema was provided at construction time, all the pa.RecordBatches in the result PCollection must be of the same schema returned by self.ArrowSchema. If a TFMD schema was not provided, the pa.RecordBatches might not be of the same schema (they may contain different numbers of columns).


batch_size: if not None, the pa.RecordBatch produced will be of the specified size. Otherwise it's automatically tuned by Beam.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def BeamSource(self, batch_size: Optional[int] = None) -> beam.PTransform:
    @beam.typehints.with_input_types(Any)
    @beam.typehints.with_output_types(pa.RecordBatch)
    def _PTransformFn(pcoll_or_pipeline: Any):
        """Converts raw records to RecordBatches."""
        return (
            pcoll_or_pipeline
            | "RawRecordBeamSource" >> self.RawRecordBeamSource()
            | "RawRecordToRecordBatch" >> self.RawRecordToRecordBatch(batch_size)
        )

    return beam.ptransform_fn(_PTransformFn)()
Project
Project(tensor_names: List[str]) -> TFXIO

Projects the dataset represented by this TFXIO.

A Projected TFXIO: - Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource() - self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors. - It retains a reference to the very original TFXIO, so its TensorAdapter knows about the specs of the tensors that would be produced by the original TensorAdapter. Also see TensorAdapter.OriginalTensorSpec().

May raise an error if the TFMD schema was not provided at construction time.


tensor_names: a set of tensor names.


A TFXIO instance that is the same as self except that: - Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource() - self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors.

Source code in tfx_bsl/tfxio/tfxio.py
def Project(self, tensor_names: List[str]) -> "TFXIO":
    """Projects the dataset represented by this TFXIO.

    A Projected TFXIO:
    - Only columns needed for given tensor_names are guaranteed to be
      produced by `self.BeamSource()`
    - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
      to contain only those tensors.
    - It retains a reference to the very original TFXIO, so its TensorAdapter
      knows about the specs of the tensors that would be produced by the
      original TensorAdapter. Also see `TensorAdapter.OriginalTensorSpec()`.

    May raise an error if the TFMD schema was not provided at construction time.

    Args:
    ----
      tensor_names: a set of tensor names.

    Returns:
    -------
      A `TFXIO` instance that is the same as `self` except that:
      - Only columns needed for given tensor_names are guaranteed to be
        produced by `self.BeamSource()`
      - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
        to contain only those tensors.
    """
    if isinstance(self, _ProjectedTFXIO):
        # pylint: disable=protected-access
        return _ProjectedTFXIO(
            self.origin, self.projected._ProjectImpl(tensor_names)
        )
    return _ProjectedTFXIO(self, self._ProjectImpl(tensor_names))
RawRecordBeamSource
RawRecordBeamSource() -> PTransform

Returns a PTransform that produces a PCollection[bytes].

Used together with RawRecordToRecordBatch(), it allows getting both the PCollection of the raw records and the PCollection of the RecordBatch from the same source. For example:

record_batch = pipeline | tfxio.BeamSource() raw_record = pipeline | tfxio.RawRecordBeamSource()

would result in the files being read twice, while the following would only read once:

raw_record = pipeline | tfxio.RawRecordBeamSource() record_batch = raw_record | tfxio.RawRecordToRecordBatch()

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordBeamSource(self) -> beam.PTransform:
    """Returns a PTransform that produces a PCollection[bytes].

    Used together with RawRecordToRecordBatch(), it allows getting both the
    PCollection of the raw records and the PCollection of the RecordBatch from
    the same source. For example:

    record_batch = pipeline | tfxio.BeamSource()
    raw_record = pipeline | tfxio.RawRecordBeamSource()

    would result in the files being read twice, while the following would only
    read once:

    raw_record = pipeline | tfxio.RawRecordBeamSource()
    record_batch = raw_record | tfxio.RawRecordToRecordBatch()
    """

    @beam.typehints.with_input_types(Any)
    @beam.typehints.with_output_types(bytes)
    def _PTransformFn(pcoll_or_pipeline: Any):
        return (
            pcoll_or_pipeline
            | "ReadRawRecords" >> self._RawRecordBeamSourceInternal()
            | "CollectRawRecordTelemetry"
            >> telemetry.ProfileRawRecords(
                self._telemetry_descriptors,
                self._logical_format,
                self._physical_format,
            )
        )

    return beam.ptransform_fn(_PTransformFn)()
RawRecordTensorFlowDataset
RawRecordTensorFlowDataset(
    options: TensorFlowDatasetOptions,
) -> Dataset

Returns a Dataset that contains nested Datasets of raw records.

May not be implemented for some TFXIOs.

This should be used when RawTfRecordTFXIO.TensorFlowDataset does not suffice. Namely, if there is some logical grouping of files which we need to perform operations on, without applying the operation to each individual group (i.e. shuffle).

The returned Dataset object is a dataset of datasets, where each nested dataset is a dataset of serialized records. When shuffle=False (default), the nested datasets are deterministically ordered. Each nested dataset can represent multiple files. The files are merged into one dataset if the files have the same format. For example:

file_patterns = ['file_1', 'file_2', 'dir_1/*']
file_formats = ['recordio', 'recordio', 'sstable']
tfxio = SomeTFXIO(file_patterns, file_formats)
datasets = tfxio.RawRecordTensorFlowDataset(options)
datasets would result in the following dataset: [ds1, ds2]. Where ds1 iterates over records from 'file_1' and 'file_2', and ds2 iterates over records from files matched by 'dir_1/*'.

Example usage:

tfxio = SomeTFXIO(file_patterns, file_formats)
ds = tfxio.RawRecordTensorFlowDataset(options=options)
ds = ds.flat_map(lambda x: x)
records = list(ds.as_numpy_iterator())
# iterating over `records` yields records from the each file in
# `file_patterns`. See `tf.data.Dataset.list_files` for more information
# about the order of files when expanding globs.
Note that we need a flat_map, because RawRecordTensorFlowDataset returns a dataset of datasets.

When shuffle=True, then the datasets not deterministically ordered, but the contents of each nested dataset are deterministcally ordered. For example, we may potentially have [ds2, ds1, ds3], where the contents of ds1, ds2, and ds3 are all deterministcally ordered.


options: A TensorFlowDatasetOptions object. Not all options will apply.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordTensorFlowDataset(
    self, options: dataset_options.TensorFlowDatasetOptions
) -> tf.data.Dataset:
    """Returns a Dataset that contains nested Datasets of raw records.

    May not be implemented for some TFXIOs.

    This should be used when RawTfRecordTFXIO.TensorFlowDataset does not
    suffice. Namely, if there is some logical grouping of files which we need
    to perform operations on, without applying the operation to each individual
    group (i.e. shuffle).

    The returned Dataset object is a dataset of datasets, where each nested
    dataset is a dataset of serialized records. When shuffle=False (default),
    the nested datasets are deterministically ordered. Each nested dataset can
    represent multiple files. The files are merged into one dataset if the files
    have the same format. For example:

    ```
    file_patterns = ['file_1', 'file_2', 'dir_1/*']
    file_formats = ['recordio', 'recordio', 'sstable']
    tfxio = SomeTFXIO(file_patterns, file_formats)
    datasets = tfxio.RawRecordTensorFlowDataset(options)
    ```
    `datasets` would result in the following dataset: `[ds1, ds2]`. Where ds1
    iterates over records from 'file_1' and 'file_2', and ds2 iterates over
    records from files matched by 'dir_1/*'.

    Example usage:
    ```
    tfxio = SomeTFXIO(file_patterns, file_formats)
    ds = tfxio.RawRecordTensorFlowDataset(options=options)
    ds = ds.flat_map(lambda x: x)
    records = list(ds.as_numpy_iterator())
    # iterating over `records` yields records from the each file in
    # `file_patterns`. See `tf.data.Dataset.list_files` for more information
    # about the order of files when expanding globs.
    ```
    Note that we need a flat_map, because `RawRecordTensorFlowDataset` returns
    a dataset of datasets.

    When shuffle=True, then the datasets not deterministically ordered,
    but the contents of each nested dataset are deterministcally ordered.
    For example, we may potentially have [ds2, ds1, ds3], where the
    contents of ds1, ds2, and ds3 are all deterministcally ordered.

    Args:
    ----
      options: A TensorFlowDatasetOptions object. Not all options will apply.
    """
    raise NotImplementedError
RawRecordToRecordBatch
RawRecordToRecordBatch(
    batch_size: Optional[int] = None,
) -> PTransform

Returns a PTransform that converts raw records to Arrow RecordBatches.

The input PCollection must be from self.RawRecordBeamSource() (also see the documentation for that method).


batch_size: if not None, the pa.RecordBatch produced will be of the specified size. Otherwise it's automatically tuned by Beam.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordToRecordBatch(
    self, batch_size: Optional[int] = None
) -> beam.PTransform:
    """Returns a PTransform that converts raw records to Arrow RecordBatches.

    The input PCollection must be from self.RawRecordBeamSource() (also see
    the documentation for that method).

    Args:
    ----
      batch_size: if not None, the `pa.RecordBatch` produced will be of the
        specified size. Otherwise it's automatically tuned by Beam.
    """

    @beam.typehints.with_input_types(bytes)
    @beam.typehints.with_output_types(pa.RecordBatch)
    def _PTransformFn(pcoll: beam.pvalue.PCollection):
        return (
            pcoll
            | "RawRecordToRecordBatch"
            >> self._RawRecordToRecordBatchInternal(batch_size)
            | "CollectRecordBatchTelemetry"
            >> telemetry.ProfileRecordBatches(
                self._telemetry_descriptors,
                self._logical_format,
                self._physical_format,
            )
        )

    return beam.ptransform_fn(_PTransformFn)()
RecordBatches
RecordBatches(options: RecordBatchesOptions)

Returns an iterable of record batches.

This can be used outside of Apache Beam or TensorFlow to access data.


options: An options object for iterating over record batches. Look at dataset_options.RecordBatchesOptions for more details.

Source code in tfx_bsl/tfxio/tf_sequence_example_record.py
def RecordBatches(self, options: dataset_options.RecordBatchesOptions):
    raise NotImplementedError
SupportAttachingRawRecords
SupportAttachingRawRecords() -> bool
Source code in tfx_bsl/tfxio/tf_sequence_example_record.py
def SupportAttachingRawRecords(self) -> bool:
    return True
TensorAdapter
TensorAdapter() -> TensorAdapter

Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapter(self) -> tensor_adapter.TensorAdapter:
    """Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

    May raise an error if the TFMD schema was not provided at construction time.
    """
    return tensor_adapter.TensorAdapter(self.TensorAdapterConfig())
TensorAdapterConfig
TensorAdapterConfig() -> TensorAdapterConfig

Returns the config to initialize a TensorAdapter.

Returns

a TensorAdapterConfig that is the same as what is used to initialize the TensorAdapter returned by self.TensorAdapter().

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapterConfig(self) -> tensor_adapter.TensorAdapterConfig:
    """Returns the config to initialize a `TensorAdapter`.

    Returns
    -------
      a `TensorAdapterConfig` that is the same as what is used to initialize the
      `TensorAdapter` returned by `self.TensorAdapter()`.
    """
    return tensor_adapter.TensorAdapterConfig(
        self.ArrowSchema(), self.TensorRepresentations()
    )
TensorFlowDataset
TensorFlowDataset(options: TensorFlowDatasetOptions)

Returns a tf.data.Dataset of TF inputs.

May raise an error if the TFMD schema was not provided at construction time.


options: an options object for the tf.data.Dataset. Look at dataset_options.TensorFlowDatasetOptions for more details.

Source code in tfx_bsl/tfxio/tf_sequence_example_record.py
def TensorFlowDataset(self, options: dataset_options.TensorFlowDatasetOptions):
    raise NotImplementedError(
        "TFExampleBeamRecord is unable to provide a TensorFlowDataset "
        "because it does not do I/O"
    )
TensorRepresentations
TensorRepresentations() -> TensorRepresentations

Returns the TensorRepresentations.

These TensorRepresentations describe the tensors or composite tensors produced by the TensorAdapter created from self.TensorAdapter() or the tf.data.Dataset created from self.TensorFlowDataset().

May raise an error if the TFMD schema was not provided at construction time. May raise an error if the tensor representations are invalid.

Source code in tfx_bsl/tfxio/tf_sequence_example_record.py
def TensorRepresentations(self) -> tensor_adapter.TensorRepresentations:
    return tensor_representation_util.InferTensorRepresentationsFromMixedSchema(
        self._schema
    )

TFSequenceExampleRecord

TFSequenceExampleRecord(
    file_pattern: Union[List[str], str],
    telemetry_descriptors: List[str],
    validate: bool = True,
    schema: Optional[Schema] = None,
    raw_record_column_name: Optional[str] = None,
)

Bases: _TFSequenceExampleRecordBase

TFXIO implementation for tf.SequenceExample on TFRecord.

Initializes a TFSequenceExampleRecord TFXIO.


file_pattern: One or a list of glob patterns. If a list, must not be empty. telemetry_descriptors: A set of descriptors that identify the component that is instantiating this TFXIO. These will be used to construct the namespace to contain metrics for profiling and are therefore expected to be identifiers of the component itself and not individual instances of source use. validate: Not used. do not set. (not used since post 0.22.1). schema: A TFMD Schema describing the dataset. raw_record_column_name: If not None, the generated Arrow RecordBatches will contain a column of the given name that contains serialized records.

Source code in tfx_bsl/tfxio/tf_sequence_example_record.py
def __init__(
    self,
    file_pattern: Union[List[str], str],
    telemetry_descriptors: List[str],
    validate: bool = True,
    schema: Optional[schema_pb2.Schema] = None,
    raw_record_column_name: Optional[str] = None,
):
    """Initializes a TFSequenceExampleRecord TFXIO.

    Args:
    ----
      file_pattern: One or a list of glob patterns. If a list, must not be
        empty.
      telemetry_descriptors: A set of descriptors that identify the component
        that is instantiating this TFXIO. These will be used to construct the
        namespace to contain metrics for profiling and are therefore expected to
        be identifiers of the component itself and not individual instances of
        source use.
      validate: Not used. do not set. (not used since post 0.22.1).
      schema: A TFMD Schema describing the dataset.
      raw_record_column_name: If not None, the generated Arrow RecordBatches
        will contain a column of the given name that contains serialized
        records.
    """
    super().__init__(
        schema=schema,
        raw_record_column_name=raw_record_column_name,
        telemetry_descriptors=telemetry_descriptors,
        physical_format="tfrecords_gzip",
    )
    del validate
    if not isinstance(file_pattern, list):
        file_pattern = [file_pattern]
    assert file_pattern, "Must provide at least one file pattern."
    self._file_pattern = file_pattern
Attributes
raw_record_column_name property
raw_record_column_name: Optional[str]
schema property
schema: Schema
telemetry_descriptors property
telemetry_descriptors: Optional[List[str]]
Functions
ArrowSchema
ArrowSchema() -> Schema

Returns the schema of the RecordBatch produced by self.BeamSource().

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def ArrowSchema(self) -> pa.Schema:
    schema = self._ArrowSchemaNoRawRecordColumn()
    if self._raw_record_column_name is not None:
        if schema.get_field_index(self._raw_record_column_name) != -1:
            raise ValueError(
                f"Raw record column name {self._raw_record_column_name} collided with a column in the schema."
            )
        schema = schema.append(
            pa.field(self._raw_record_column_name, pa.large_list(pa.large_binary()))
        )
    return schema
BeamSource
BeamSource(batch_size: Optional[int] = None) -> PTransform

Returns a beam PTransform that produces PCollection[pa.RecordBatch].

May NOT raise an error if the TFMD schema was not provided at construction time.

If a TFMD schema was provided at construction time, all the pa.RecordBatches in the result PCollection must be of the same schema returned by self.ArrowSchema. If a TFMD schema was not provided, the pa.RecordBatches might not be of the same schema (they may contain different numbers of columns).


batch_size: if not None, the pa.RecordBatch produced will be of the specified size. Otherwise it's automatically tuned by Beam.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def BeamSource(self, batch_size: Optional[int] = None) -> beam.PTransform:
    @beam.typehints.with_input_types(Any)
    @beam.typehints.with_output_types(pa.RecordBatch)
    def _PTransformFn(pcoll_or_pipeline: Any):
        """Converts raw records to RecordBatches."""
        return (
            pcoll_or_pipeline
            | "RawRecordBeamSource" >> self.RawRecordBeamSource()
            | "RawRecordToRecordBatch" >> self.RawRecordToRecordBatch(batch_size)
        )

    return beam.ptransform_fn(_PTransformFn)()
Project
Project(tensor_names: List[str]) -> TFXIO

Projects the dataset represented by this TFXIO.

A Projected TFXIO: - Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource() - self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors. - It retains a reference to the very original TFXIO, so its TensorAdapter knows about the specs of the tensors that would be produced by the original TensorAdapter. Also see TensorAdapter.OriginalTensorSpec().

May raise an error if the TFMD schema was not provided at construction time.


tensor_names: a set of tensor names.


A TFXIO instance that is the same as self except that: - Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource() - self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors.

Source code in tfx_bsl/tfxio/tfxio.py
def Project(self, tensor_names: List[str]) -> "TFXIO":
    """Projects the dataset represented by this TFXIO.

    A Projected TFXIO:
    - Only columns needed for given tensor_names are guaranteed to be
      produced by `self.BeamSource()`
    - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
      to contain only those tensors.
    - It retains a reference to the very original TFXIO, so its TensorAdapter
      knows about the specs of the tensors that would be produced by the
      original TensorAdapter. Also see `TensorAdapter.OriginalTensorSpec()`.

    May raise an error if the TFMD schema was not provided at construction time.

    Args:
    ----
      tensor_names: a set of tensor names.

    Returns:
    -------
      A `TFXIO` instance that is the same as `self` except that:
      - Only columns needed for given tensor_names are guaranteed to be
        produced by `self.BeamSource()`
      - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
        to contain only those tensors.
    """
    if isinstance(self, _ProjectedTFXIO):
        # pylint: disable=protected-access
        return _ProjectedTFXIO(
            self.origin, self.projected._ProjectImpl(tensor_names)
        )
    return _ProjectedTFXIO(self, self._ProjectImpl(tensor_names))
RawRecordBeamSource
RawRecordBeamSource() -> PTransform

Returns a PTransform that produces a PCollection[bytes].

Used together with RawRecordToRecordBatch(), it allows getting both the PCollection of the raw records and the PCollection of the RecordBatch from the same source. For example:

record_batch = pipeline | tfxio.BeamSource() raw_record = pipeline | tfxio.RawRecordBeamSource()

would result in the files being read twice, while the following would only read once:

raw_record = pipeline | tfxio.RawRecordBeamSource() record_batch = raw_record | tfxio.RawRecordToRecordBatch()

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordBeamSource(self) -> beam.PTransform:
    """Returns a PTransform that produces a PCollection[bytes].

    Used together with RawRecordToRecordBatch(), it allows getting both the
    PCollection of the raw records and the PCollection of the RecordBatch from
    the same source. For example:

    record_batch = pipeline | tfxio.BeamSource()
    raw_record = pipeline | tfxio.RawRecordBeamSource()

    would result in the files being read twice, while the following would only
    read once:

    raw_record = pipeline | tfxio.RawRecordBeamSource()
    record_batch = raw_record | tfxio.RawRecordToRecordBatch()
    """

    @beam.typehints.with_input_types(Any)
    @beam.typehints.with_output_types(bytes)
    def _PTransformFn(pcoll_or_pipeline: Any):
        return (
            pcoll_or_pipeline
            | "ReadRawRecords" >> self._RawRecordBeamSourceInternal()
            | "CollectRawRecordTelemetry"
            >> telemetry.ProfileRawRecords(
                self._telemetry_descriptors,
                self._logical_format,
                self._physical_format,
            )
        )

    return beam.ptransform_fn(_PTransformFn)()
RawRecordTensorFlowDataset
RawRecordTensorFlowDataset(
    options: TensorFlowDatasetOptions,
) -> Dataset

Returns a Dataset that contains nested Datasets of raw records.

May not be implemented for some TFXIOs.

This should be used when RawTfRecordTFXIO.TensorFlowDataset does not suffice. Namely, if there is some logical grouping of files which we need to perform operations on, without applying the operation to each individual group (i.e. shuffle).

The returned Dataset object is a dataset of datasets, where each nested dataset is a dataset of serialized records. When shuffle=False (default), the nested datasets are deterministically ordered. Each nested dataset can represent multiple files. The files are merged into one dataset if the files have the same format. For example:

file_patterns = ['file_1', 'file_2', 'dir_1/*']
file_formats = ['recordio', 'recordio', 'sstable']
tfxio = SomeTFXIO(file_patterns, file_formats)
datasets = tfxio.RawRecordTensorFlowDataset(options)
datasets would result in the following dataset: [ds1, ds2]. Where ds1 iterates over records from 'file_1' and 'file_2', and ds2 iterates over records from files matched by 'dir_1/*'.

Example usage:

tfxio = SomeTFXIO(file_patterns, file_formats)
ds = tfxio.RawRecordTensorFlowDataset(options=options)
ds = ds.flat_map(lambda x: x)
records = list(ds.as_numpy_iterator())
# iterating over `records` yields records from the each file in
# `file_patterns`. See `tf.data.Dataset.list_files` for more information
# about the order of files when expanding globs.
Note that we need a flat_map, because RawRecordTensorFlowDataset returns a dataset of datasets.

When shuffle=True, then the datasets not deterministically ordered, but the contents of each nested dataset are deterministcally ordered. For example, we may potentially have [ds2, ds1, ds3], where the contents of ds1, ds2, and ds3 are all deterministcally ordered.


options: A TensorFlowDatasetOptions object. Not all options will apply.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordTensorFlowDataset(
    self, options: dataset_options.TensorFlowDatasetOptions
) -> tf.data.Dataset:
    """Returns a Dataset that contains nested Datasets of raw records.

    May not be implemented for some TFXIOs.

    This should be used when RawTfRecordTFXIO.TensorFlowDataset does not
    suffice. Namely, if there is some logical grouping of files which we need
    to perform operations on, without applying the operation to each individual
    group (i.e. shuffle).

    The returned Dataset object is a dataset of datasets, where each nested
    dataset is a dataset of serialized records. When shuffle=False (default),
    the nested datasets are deterministically ordered. Each nested dataset can
    represent multiple files. The files are merged into one dataset if the files
    have the same format. For example:

    ```
    file_patterns = ['file_1', 'file_2', 'dir_1/*']
    file_formats = ['recordio', 'recordio', 'sstable']
    tfxio = SomeTFXIO(file_patterns, file_formats)
    datasets = tfxio.RawRecordTensorFlowDataset(options)
    ```
    `datasets` would result in the following dataset: `[ds1, ds2]`. Where ds1
    iterates over records from 'file_1' and 'file_2', and ds2 iterates over
    records from files matched by 'dir_1/*'.

    Example usage:
    ```
    tfxio = SomeTFXIO(file_patterns, file_formats)
    ds = tfxio.RawRecordTensorFlowDataset(options=options)
    ds = ds.flat_map(lambda x: x)
    records = list(ds.as_numpy_iterator())
    # iterating over `records` yields records from the each file in
    # `file_patterns`. See `tf.data.Dataset.list_files` for more information
    # about the order of files when expanding globs.
    ```
    Note that we need a flat_map, because `RawRecordTensorFlowDataset` returns
    a dataset of datasets.

    When shuffle=True, then the datasets not deterministically ordered,
    but the contents of each nested dataset are deterministcally ordered.
    For example, we may potentially have [ds2, ds1, ds3], where the
    contents of ds1, ds2, and ds3 are all deterministcally ordered.

    Args:
    ----
      options: A TensorFlowDatasetOptions object. Not all options will apply.
    """
    raise NotImplementedError
RawRecordToRecordBatch
RawRecordToRecordBatch(
    batch_size: Optional[int] = None,
) -> PTransform

Returns a PTransform that converts raw records to Arrow RecordBatches.

The input PCollection must be from self.RawRecordBeamSource() (also see the documentation for that method).


batch_size: if not None, the pa.RecordBatch produced will be of the specified size. Otherwise it's automatically tuned by Beam.

Source code in tfx_bsl/tfxio/record_based_tfxio.py
def RawRecordToRecordBatch(
    self, batch_size: Optional[int] = None
) -> beam.PTransform:
    """Returns a PTransform that converts raw records to Arrow RecordBatches.

    The input PCollection must be from self.RawRecordBeamSource() (also see
    the documentation for that method).

    Args:
    ----
      batch_size: if not None, the `pa.RecordBatch` produced will be of the
        specified size. Otherwise it's automatically tuned by Beam.
    """

    @beam.typehints.with_input_types(bytes)
    @beam.typehints.with_output_types(pa.RecordBatch)
    def _PTransformFn(pcoll: beam.pvalue.PCollection):
        return (
            pcoll
            | "RawRecordToRecordBatch"
            >> self._RawRecordToRecordBatchInternal(batch_size)
            | "CollectRecordBatchTelemetry"
            >> telemetry.ProfileRecordBatches(
                self._telemetry_descriptors,
                self._logical_format,
                self._physical_format,
            )
        )

    return beam.ptransform_fn(_PTransformFn)()
RecordBatches
RecordBatches(options: RecordBatchesOptions)

Returns an iterable of record batches.

This can be used outside of Apache Beam or TensorFlow to access data.


options: An options object for iterating over record batches. Look at dataset_options.RecordBatchesOptions for more details.

Source code in tfx_bsl/tfxio/tf_sequence_example_record.py
def RecordBatches(self, options: dataset_options.RecordBatchesOptions):
    raise NotImplementedError
SupportAttachingRawRecords
SupportAttachingRawRecords() -> bool
Source code in tfx_bsl/tfxio/tf_sequence_example_record.py
def SupportAttachingRawRecords(self) -> bool:
    return True
TensorAdapter
TensorAdapter() -> TensorAdapter

Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapter(self) -> tensor_adapter.TensorAdapter:
    """Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

    May raise an error if the TFMD schema was not provided at construction time.
    """
    return tensor_adapter.TensorAdapter(self.TensorAdapterConfig())
TensorAdapterConfig
TensorAdapterConfig() -> TensorAdapterConfig

Returns the config to initialize a TensorAdapter.

Returns

a TensorAdapterConfig that is the same as what is used to initialize the TensorAdapter returned by self.TensorAdapter().

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapterConfig(self) -> tensor_adapter.TensorAdapterConfig:
    """Returns the config to initialize a `TensorAdapter`.

    Returns
    -------
      a `TensorAdapterConfig` that is the same as what is used to initialize the
      `TensorAdapter` returned by `self.TensorAdapter()`.
    """
    return tensor_adapter.TensorAdapterConfig(
        self.ArrowSchema(), self.TensorRepresentations()
    )
TensorFlowDataset
TensorFlowDataset(
    options: TensorFlowDatasetOptions,
) -> Dataset

Creates a tf.data.Dataset that yields Tensors.

The serialized tf.SequenceExamples are parsed by tf.io.parse_sequence_example.

See base class (tfxio.TFXIO) for more details.


options: an options object for the tf.data.Dataset. See dataset_options.TensorFlowDatasetOptions for more details.


A dataset of dict elements, (or a tuple of dict elements and label). Each dict maps feature keys to Tensor, SparseTensor, or RaggedTensor objects.


ValueError: if there is something wrong with the provided schema.

Source code in tfx_bsl/tfxio/tf_sequence_example_record.py
def TensorFlowDataset(
    self, options: dataset_options.TensorFlowDatasetOptions
) -> tf.data.Dataset:
    """Creates a tf.data.Dataset that yields Tensors.

    The serialized tf.SequenceExamples are parsed by
    `tf.io.parse_sequence_example`.

    See base class (tfxio.TFXIO) for more details.

    Args:
    ----
      options: an options object for the tf.data.Dataset. See
        `dataset_options.TensorFlowDatasetOptions` for more details.

    Returns:
    -------
      A dataset of `dict` elements, (or a tuple of `dict` elements and label).
      Each `dict` maps feature keys to `Tensor`, `SparseTensor`, or
      `RaggedTensor` objects.

    Raises:
    ------
      ValueError: if there is something wrong with the provided schema.
    """
    file_pattern = tf.convert_to_tensor(self._file_pattern)
    dataset = dataset_util.make_tf_record_dataset(
        file_pattern,
        batch_size=options.batch_size,
        num_epochs=options.num_epochs,
        shuffle=options.shuffle,
        shuffle_buffer_size=options.shuffle_buffer_size,
        shuffle_seed=options.shuffle_seed,
        reader_num_threads=options.reader_num_threads,
        drop_final_batch=options.drop_final_batch,
    )

    return self._ParseRawRecordTensorFlowDataset(dataset, options.label_key)
TensorRepresentations
TensorRepresentations() -> TensorRepresentations

Returns the TensorRepresentations.

These TensorRepresentations describe the tensors or composite tensors produced by the TensorAdapter created from self.TensorAdapter() or the tf.data.Dataset created from self.TensorFlowDataset().

May raise an error if the TFMD schema was not provided at construction time. May raise an error if the tensor representations are invalid.

Source code in tfx_bsl/tfxio/tf_sequence_example_record.py
def TensorRepresentations(self) -> tensor_adapter.TensorRepresentations:
    return tensor_representation_util.InferTensorRepresentationsFromMixedSchema(
        self._schema
    )

TFXIO

Abstract basic class of all TFXIO API implementations.

Functions
ArrowSchema abstractmethod
ArrowSchema() -> Schema

Returns the schema of the RecordBatch produced by self.BeamSource().

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/tfxio.py
@abc.abstractmethod
def ArrowSchema(self) -> pa.Schema:
    """Returns the schema of the `RecordBatch` produced by `self.BeamSource()`.

    May raise an error if the TFMD schema was not provided at construction time.
    """
BeamSource abstractmethod
BeamSource(batch_size: Optional[int] = None) -> PTransform

Returns a beam PTransform that produces PCollection[pa.RecordBatch].

May NOT raise an error if the TFMD schema was not provided at construction time.

If a TFMD schema was provided at construction time, all the pa.RecordBatches in the result PCollection must be of the same schema returned by self.ArrowSchema. If a TFMD schema was not provided, the pa.RecordBatches might not be of the same schema (they may contain different numbers of columns).


batch_size: if not None, the pa.RecordBatch produced will be of the specified size. Otherwise it's automatically tuned by Beam.

Source code in tfx_bsl/tfxio/tfxio.py
@abc.abstractmethod
def BeamSource(self, batch_size: Optional[int] = None) -> beam.PTransform:
    """Returns a beam `PTransform` that produces `PCollection[pa.RecordBatch]`.

    May NOT raise an error if the TFMD schema was not provided at construction
    time.

    If a TFMD schema was provided at construction time, all the
    `pa.RecordBatch`es in the result `PCollection` must be of the same schema
    returned by `self.ArrowSchema`. If a TFMD schema was not provided, the
    `pa.RecordBatch`es might not be of the same schema (they may contain
    different numbers of columns).

    Args:
    ----
      batch_size: if not None, the `pa.RecordBatch` produced will be of the
        specified size. Otherwise it's automatically tuned by Beam.
    """
Project
Project(tensor_names: List[str]) -> TFXIO

Projects the dataset represented by this TFXIO.

A Projected TFXIO: - Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource() - self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors. - It retains a reference to the very original TFXIO, so its TensorAdapter knows about the specs of the tensors that would be produced by the original TensorAdapter. Also see TensorAdapter.OriginalTensorSpec().

May raise an error if the TFMD schema was not provided at construction time.


tensor_names: a set of tensor names.


A TFXIO instance that is the same as self except that: - Only columns needed for given tensor_names are guaranteed to be produced by self.BeamSource() - self.TensorAdapterConfig() and self.TensorFlowDataset() are trimmed to contain only those tensors.

Source code in tfx_bsl/tfxio/tfxio.py
def Project(self, tensor_names: List[str]) -> "TFXIO":
    """Projects the dataset represented by this TFXIO.

    A Projected TFXIO:
    - Only columns needed for given tensor_names are guaranteed to be
      produced by `self.BeamSource()`
    - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
      to contain only those tensors.
    - It retains a reference to the very original TFXIO, so its TensorAdapter
      knows about the specs of the tensors that would be produced by the
      original TensorAdapter. Also see `TensorAdapter.OriginalTensorSpec()`.

    May raise an error if the TFMD schema was not provided at construction time.

    Args:
    ----
      tensor_names: a set of tensor names.

    Returns:
    -------
      A `TFXIO` instance that is the same as `self` except that:
      - Only columns needed for given tensor_names are guaranteed to be
        produced by `self.BeamSource()`
      - `self.TensorAdapterConfig()` and `self.TensorFlowDataset()` are trimmed
        to contain only those tensors.
    """
    if isinstance(self, _ProjectedTFXIO):
        # pylint: disable=protected-access
        return _ProjectedTFXIO(
            self.origin, self.projected._ProjectImpl(tensor_names)
        )
    return _ProjectedTFXIO(self, self._ProjectImpl(tensor_names))
RecordBatches abstractmethod
RecordBatches(
    options: RecordBatchesOptions,
) -> Iterator[RecordBatch]

Returns an iterable of record batches.

This can be used outside of Apache Beam or TensorFlow to access data.


options: An options object for iterating over record batches. Look at dataset_options.RecordBatchesOptions for more details.

Source code in tfx_bsl/tfxio/tfxio.py
@abc.abstractmethod
def RecordBatches(
    self, options: dataset_options.RecordBatchesOptions
) -> Iterator[pa.RecordBatch]:
    """Returns an iterable of record batches.

    This can be used outside of Apache Beam or TensorFlow to access data.

    Args:
    ----
      options: An options object for iterating over record batches. Look at
        `dataset_options.RecordBatchesOptions` for more details.
    """
TensorAdapter
TensorAdapter() -> TensorAdapter

Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

May raise an error if the TFMD schema was not provided at construction time.

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapter(self) -> tensor_adapter.TensorAdapter:
    """Returns a TensorAdapter that converts pa.RecordBatch to TF inputs.

    May raise an error if the TFMD schema was not provided at construction time.
    """
    return tensor_adapter.TensorAdapter(self.TensorAdapterConfig())
TensorAdapterConfig
TensorAdapterConfig() -> TensorAdapterConfig

Returns the config to initialize a TensorAdapter.

Returns

a TensorAdapterConfig that is the same as what is used to initialize the TensorAdapter returned by self.TensorAdapter().

Source code in tfx_bsl/tfxio/tfxio.py
def TensorAdapterConfig(self) -> tensor_adapter.TensorAdapterConfig:
    """Returns the config to initialize a `TensorAdapter`.

    Returns
    -------
      a `TensorAdapterConfig` that is the same as what is used to initialize the
      `TensorAdapter` returned by `self.TensorAdapter()`.
    """
    return tensor_adapter.TensorAdapterConfig(
        self.ArrowSchema(), self.TensorRepresentations()
    )
TensorFlowDataset abstractmethod
TensorFlowDataset(
    options: TensorFlowDatasetOptions,
) -> Dataset

Returns a tf.data.Dataset of TF inputs.

May raise an error if the TFMD schema was not provided at construction time.


options: an options object for the tf.data.Dataset. Look at dataset_options.TensorFlowDatasetOptions for more details.

Source code in tfx_bsl/tfxio/tfxio.py
@abc.abstractmethod
def TensorFlowDataset(
    self, options: dataset_options.TensorFlowDatasetOptions
) -> tf.data.Dataset:
    """Returns a tf.data.Dataset of TF inputs.

    May raise an error if the TFMD schema was not provided at construction time.

    Args:
    ----
      options: an options object for the tf.data.Dataset. Look at
        `dataset_options.TensorFlowDatasetOptions` for more details.
    """
TensorRepresentations abstractmethod
TensorRepresentations() -> TensorRepresentations

Returns the TensorRepresentations.

These TensorRepresentations describe the tensors or composite tensors produced by the TensorAdapter created from self.TensorAdapter() or the tf.data.Dataset created from self.TensorFlowDataset().

May raise an error if the TFMD schema was not provided at construction time. May raise an error if the tensor representations are invalid.

Source code in tfx_bsl/tfxio/tfxio.py
@abc.abstractmethod
def TensorRepresentations(self) -> tensor_adapter.TensorRepresentations:
    """Returns the `TensorRepresentations`.

    These `TensorRepresentation`s describe the tensors or composite tensors
    produced by the `TensorAdapter` created from `self.TensorAdapter()` or
    the tf.data.Dataset created from `self.TensorFlowDataset()`.

    May raise an error if the TFMD schema was not provided at construction time.
    May raise an error if the tensor representations are invalid.
    """

TensorAdapter

TensorAdapter(config: TensorAdapterConfig)

A TensorAdapter converts a RecordBatch to a collection of TF Tensors.

The conversion is determined by both the Arrow schema and the TensorRepresentations, which must be provided at the initialization time. Each TensorRepresentation contains the information needed to translates one or more columns in a RecordBatch of the given Arrow schema into a TF Tensor or CompositeTensor. They are contained in a Dict whose keys are the names of the tensors, which will be the keys of the Dict produced by ToBatchTensors().

TypeSpecs() returns static TypeSpecs of those tensors by their names, i.e. if they have a shape, then the size of the first (batch) dimension is always unknown (None) because it depends on the size of the RecordBatch passed to ToBatchTensors().

It is guaranteed that for any tensor_name in the given TensorRepresentations self.TypeSpecs()[tensor_name].is_compatible_with( self.ToBatchedTensors(...)[tensor_name])

Sliced RecordBatches and LargeListArray columns having null elements backed by non-empty sub-lists are not supported and will yield undefined behaviour.

Source code in tfx_bsl/tfxio/tensor_adapter.py
def __init__(self, config: TensorAdapterConfig):
    self._arrow_schema = config.arrow_schema
    self._type_handlers = _BuildTypeHandlers(
        config.tensor_representations, config.arrow_schema
    )
    self._type_specs = {
        tensor_name: handler.type_spec
        for tensor_name, handler in self._type_handlers
    }

    self._original_type_specs = (
        self._type_specs
        if config.original_type_specs is None
        else config.original_type_specs
    )

    for tensor_name, type_spec in self._type_specs.items():
        original_type_spec = self._original_type_specs.get(tensor_name, None)
        if original_type_spec is None or original_type_spec != type_spec:
            raise ValueError(
                "original_type_specs must be a superset of type_specs derived from "
                f"TensorRepresentations. But for tensor {tensor_name}, got {original_type_spec} vs {type_spec}"
            )
Functions
OriginalTypeSpecs
OriginalTypeSpecs() -> Dict[str, TypeSpec]

Returns the origin's type specs.

A TFXIO 'Y' may be a result of projection of another TFXIO 'X', in which case then 'X' is the origin of 'Y'. And this method returns what X.TensorAdapter().TypeSpecs() would return.

May equal to self.TypeSpecs().

Returns: a mapping from tensor names to tf.TypeSpecs.

Source code in tfx_bsl/tfxio/tensor_adapter.py
def OriginalTypeSpecs(self) -> Dict[str, tf.TypeSpec]:
    """Returns the origin's type specs.

    A TFXIO 'Y' may be a result of projection of another TFXIO 'X', in which
    case then 'X' is the origin of 'Y'. And this method returns what
    X.TensorAdapter().TypeSpecs() would return.

    May equal to `self.TypeSpecs()`.

    Returns: a mapping from tensor names to `tf.TypeSpec`s.
    """
    return self._original_type_specs
ToBatchTensors
ToBatchTensors(
    record_batch: RecordBatch,
    produce_eager_tensors: Optional[bool] = None,
) -> Dict[str, Any]

Returns a batch of tensors translated from record_batch.


record_batch: input RecordBatch. produce_eager_tensors: controls whether the ToBatchTensors() produces eager tensors or ndarrays (or Tensor value objects). If None, determine that from whether TF Eager mode is enabled.


RuntimeError: when Eager Tensors are requested but TF is not executing eagerly. ValueError: when Any handler failed to produce a Tensor.

Source code in tfx_bsl/tfxio/tensor_adapter.py
def ToBatchTensors(
    self, record_batch: pa.RecordBatch, produce_eager_tensors: Optional[bool] = None
) -> Dict[str, Any]:
    """Returns a batch of tensors translated from `record_batch`.

    Args:
    ----
      record_batch: input RecordBatch.
      produce_eager_tensors: controls whether the ToBatchTensors() produces
        eager tensors or ndarrays (or Tensor value objects). If None, determine
        that from whether TF Eager mode is enabled.

    Raises:
    ------
      RuntimeError: when Eager Tensors are requested but TF is not executing
        eagerly.
      ValueError: when Any handler failed to produce a Tensor.
    """
    tf_executing_eagerly = tf.executing_eagerly()
    if produce_eager_tensors and not tf_executing_eagerly:
        raise RuntimeError(
            "Eager Tensors were requested but eager mode was not enabled."
        )
    if produce_eager_tensors is None:
        produce_eager_tensors = tf_executing_eagerly

    if not record_batch.schema.equals(self._arrow_schema):
        raise ValueError("Expected same schema.")
    result = {}
    for tensor_name, handler in self._type_handlers:
        try:
            result[tensor_name] = handler.GetTensor(
                record_batch, produce_eager_tensors
            )
        except Exception as e:
            raise ValueError(
                f"Error raised when handling tensor '{tensor_name}'"
            ) from e

    return result
TypeSpecs
TypeSpecs() -> Dict[str, TypeSpec]

Returns the TypeSpec for each tensor.

Source code in tfx_bsl/tfxio/tensor_adapter.py
def TypeSpecs(self) -> Dict[str, tf.TypeSpec]:
    """Returns the TypeSpec for each tensor."""
    return self._type_specs

TensorAdapterConfig

TensorAdapterConfig(
    arrow_schema: Schema,
    tensor_representations: TensorRepresentations,
    original_type_specs: Optional[
        Dict[str, TypeSpec]
    ] = None,
)

Config to a TensorAdapter.

Contains all the information needed to create a TensorAdapter.

Source code in tfx_bsl/tfxio/tensor_adapter.py
def __init__(
    self,
    arrow_schema: pa.Schema,
    tensor_representations: TensorRepresentations,
    original_type_specs: Optional[Dict[str, tf.TypeSpec]] = None,
):
    self.arrow_schema = arrow_schema
    self.tensor_representations = tensor_representations
    self.original_type_specs = original_type_specs
Attributes
arrow_schema instance-attribute
arrow_schema = arrow_schema
original_type_specs instance-attribute
original_type_specs = original_type_specs
tensor_representations instance-attribute
tensor_representations = tensor_representations
Functions

TensorFlowDatasetOptions

Bases: NamedTuple('TensorFlowDatasetOptions', [('batch_size', int), ('drop_final_batch', bool), ('num_epochs', Optional[int]), ('shuffle', bool), ('shuffle_buffer_size', int), ('shuffle_seed', Optional[int]), ('prefetch_buffer_size', int), ('reader_num_threads', int), ('parser_num_threads', int), ('sloppy_ordering', bool), ('label_key', Optional[str])])

Options for TFXIO's TensorFlowDataset.

Note: not all of these options may be effective. It depends on the particular TFXIO's implementation.