Copyright 2024 The TensorFlow Authors.¶
Licensed under the Apache License, Version 2.0 (the "License");¶
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Note: We recommend running this tutorial in a Colab notebook, with no setup required! Just click "Run in Google Colab".
Why is this pipeline useful?¶
TFX pipelines provide a powerful and structured approach to building and managing machine learning workflows, particularly those involving large language models. They offer significant advantages over traditional Python code, including:
Enhanced Reproducibility: TFX pipelines ensure consistent results by capturing all steps and dependencies, eliminating the inconsistencies often associated with manual workflows.
Scalability and Modularity: TFX allows for breaking down complex workflows into manageable, reusable components, promoting code organization.
Streamlined Fine-Tuning and Conversion: The pipeline structure streamlines the fine-tuning and conversion processes of large language models, significantly reducing manual effort and time.
Comprehensive Lineage Tracking: Through metadata tracking, TFX pipelines provide a clear understanding of data and model provenance, making debugging, auditing, and performance analysis much easier and more efficient.
By leveraging the benefits of TFX pipelines, organizations can effectively manage the complexity of large language model development and deployment, achieving greater efficiency and control over their machine learning processes.
Note¶
GPT-2 is used here only to demonstrate the end-to-end process; the techniques and tooling introduced in this codelab are potentially transferrable to other generative language models such as Google T5.
Before You Begin¶
Colab offers different kinds of runtimes. Make sure to go to Runtime -> Change runtime type and choose the GPU Hardware Accelerator runtime since you will finetune the GPT-2 model.
This tutorial's interactive pipeline is designed to function seamlessly with free Colab GPUs. However, for users opting to run the pipeline using the LocalDagRunner orchestrator (code provided at the end of this tutorial), a more substantial amount of GPU memory is required. Therefore, Colab Pro or a local machine equipped with a higher-capacity GPU is recommended for this approach.
Set Up¶
We first install required python packages.
Upgrade Pip¶
To avoid upgrading Pip in a system when running locally, check to make sure that we are running in Colab. Local systems can of course be upgraded separately.
try:
import colab
!pip install --upgrade pip
except:
pass
Install TFX, Keras 3, KerasNLP and required Libraries¶
!pip install -q tfx tensorflow-text more_itertools tensorflow_datasets
!pip install -q --upgrade keras-nlp
!pip install -q --upgrade keras
Note: pip's dependency resolver errors can be ignored. The required packages for this tutorial works as expected.
Did you restart the runtime?¶
If you are using Google Colab, the first time that you run the cell above, you must restart the runtime by clicking above "RESTART SESSION" button or using "Runtime > Restart session" menu. This is because of the way that Colab loads packages.
Let's check the TensorFlow, Keras, Keras-nlp and TFX library versions.
import os
os.environ["KERAS_BACKEND"] = "tensorflow"
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import keras
print('Keras version: {}'.format(keras.__version__))
import keras_nlp
print('Keras NLP version: {}'.format(keras_nlp.__version__))
keras.mixed_precision.set_global_policy("mixed_float16")
Using TFX Interactive Context¶
An interactive context is used to provide global context when running a TFX pipeline in a notebook without using a runner or orchestrator such as Apache Airflow or Kubeflow. This style of development is only useful when developing the code for a pipeline, and cannot currently be used to deploy a working pipeline to production.
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
context = InteractiveContext()
Pipeline Overview¶
Below are the components that this pipeline follows.
Custom Artifacts are artifacts that we have created for this pipeline. Artifacts are data that is produced by a component or consumed by a component. Artifacts are stored in a system for managing the storage and versioning of artifacts called MLMD.
Components are defined as the implementation of an ML task that you can use as a step in your pipeline
Aside from artifacts, Parameters are passed into the components to specify an argument.
ExampleGen¶
We create a custom ExampleGen component which we use to load a TensorFlow Datasets (TFDS) dataset. This uses a custom executor in a FileBasedExampleGen.
from typing import Any, Dict, List, Text
import tensorflow_datasets as tfds
import apache_beam as beam
import json
from tfx.components.example_gen.base_example_gen_executor import BaseExampleGenExecutor
from tfx.components.example_gen.component import FileBasedExampleGen
from tfx.components.example_gen import utils
from tfx.dsl.components.base import executor_spec
import os
import pprint
pp = pprint.PrettyPrinter()
@beam.ptransform_fn
@beam.typehints.with_input_types(beam.Pipeline)
@beam.typehints.with_output_types(tf.train.Example)
def _TFDatasetToExample(
pipeline: beam.Pipeline,
exec_properties: Dict[str, Any],
split_pattern: str
) -> beam.pvalue.PCollection:
"""Read a TensorFlow Dataset and create tf.Examples"""
custom_config = json.loads(exec_properties['custom_config'])
dataset_name = custom_config['dataset']
split_name = custom_config['split']
builder = tfds.builder(dataset_name)
builder.download_and_prepare()
return (pipeline
| 'MakeExamples' >> tfds.beam.ReadFromTFDS(builder, split=split_name)
| 'AsNumpy' >> beam.Map(tfds.as_numpy)
| 'ToDict' >> beam.Map(dict)
| 'ToTFExample' >> beam.Map(utils.dict_to_example)
)
class TFDSExecutor(BaseExampleGenExecutor):
def GetInputSourceToExamplePTransform(self) -> beam.PTransform:
"""Returns PTransform for TF Dataset to TF examples."""
return _TFDatasetToExample
For this demonstration, we're using a subset of the IMDb reviews dataset, representing 20% of the total data. This allows for a more manageable training process. You can modify the "custom_config" settings to experiment with larger amounts of data, up to the full dataset, depending on your computational resources.
example_gen = FileBasedExampleGen(
input_base='dummy',
custom_config={'dataset':'imdb_reviews', 'split':'train[:20%]'},
custom_executor_spec=executor_spec.BeamExecutorSpec(TFDSExecutor))
context.run(example_gen, enable_cache=False)
We've developed a handy utility for examining datasets composed of TFExamples. When used with the reviews dataset, this tool returns a clear dictionary containing both the text and the corresponding label.
def inspect_examples(component,
channel_name='examples',
split_name='train',
num_examples=1):
# Get the URI of the output artifact, which is a directory
full_split_name = 'Split-{}'.format(split_name)
print('channel_name: {}, split_name: {} (\"{}\"), num_examples: {}\n'.format(
channel_name, split_name, full_split_name, num_examples))
train_uri = os.path.join(
component.outputs[channel_name].get()[0].uri, full_split_name)
print('train_uri: {}'.format(train_uri))
# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
for name in os.listdir(train_uri)]
# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
# Iterate over the records and print them
print()
for tfrecord in dataset.take(num_examples):
serialized_example = tfrecord.numpy()
example = tf.train.Example()
example.ParseFromString(serialized_example)
pp.pprint(example)
inspect_examples(example_gen, num_examples=1, split_name='eval')
StatisticsGen¶
StatisticsGen component computes statistics over your dataset for data analysis, such as the number of examples, the number of features, and the data types of the features. It uses the TensorFlow Data Validation library. StatisticsGen takes as input the dataset we just ingested using ExampleGen.
Note that the statistics generator is appropriate for tabular data, and therefore, text dataset for this LLM tutorial may not be the optimal dataset for the analysis with statistics generator.
from tfx.components import StatisticsGen
statistics_gen = tfx.components.StatisticsGen(
examples=example_gen.outputs['examples'], exclude_splits=['eval']
)
context.run(statistics_gen, enable_cache=False)
context.show(statistics_gen.outputs['statistics'])
SchemaGen¶
The SchemaGen component generates a schema based on your data statistics. (A schema defines the expected bounds, types, and properties of the features in your dataset.) It also uses the TensorFlow Data Validation library.
Note: The generated schema is best-effort and only tries to infer basic properties of the data. It is expected that you review and modify it as needed.
SchemaGen will take as input the statistics that we generated with StatisticsGen, looking at the training split by default.
schema_gen = tfx.components.SchemaGen(
statistics=statistics_gen.outputs['statistics'],
infer_feature_shape=False,
exclude_splits=['eval'],
)
context.run(schema_gen, enable_cache=False)
context.show(schema_gen.outputs['schema'])
ExampleValidator¶
The ExampleValidator component detects anomalies in your data, based on the expectations defined by the schema. It also uses the TensorFlow Data Validation library.
ExampleValidator will take as input the statistics from StatisticsGen, and the schema from SchemaGen.
example_validator = tfx.components.ExampleValidator(
statistics=statistics_gen.outputs['statistics'],
schema=schema_gen.outputs['schema'],
exclude_splits=['eval'],
)
context.run(example_validator, enable_cache=False)
After ExampleValidator finishes running, we can visualize the anomalies as a table.
context.show(example_validator.outputs['anomalies'])
Transform¶
For a structured and repeatable design of a TFX pipeline we will need a scalable approach to feature engineering. The Transform component performs feature engineering for both training and serving. It uses the TensorFlow Transform library.
The Transform component uses a module file to supply user code for the feature engineering what we want to do, so our first step is to create that module file. We will only be working with the summary field.
Note: The %%writefile {_movies_transform_module_file} cell magic below creates and writes the contents of that cell to a file on the notebook server where this notebook is running (for example, the Colab VM). When doing this outside of a notebook you would just create a Python file.
import os
if not os.path.exists("modules"):
os.mkdir("modules")
_transform_module_file = 'modules/_transform_module.py'
%%writefile {_transform_module_file}
import tensorflow as tf
def _fill_in_missing(x, default_value):
"""Replace missing values in a SparseTensor.
Fills in missing values of `x` with the default_value.
Args:
x: A `SparseTensor` of rank 2. Its dense shape should have size at most 1
in the second dimension.
default_value: the value with which to replace the missing values.
Returns:
A rank 1 tensor where missing values of `x` have been filled in.
"""
if not isinstance(x, tf.sparse.SparseTensor):
return x
return tf.squeeze(
tf.sparse.to_dense(
tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
default_value),
axis=1)
def preprocessing_fn(inputs):
outputs = {}
# outputs["summary"] = _fill_in_missing(inputs["summary"],"")
outputs["summary"] = _fill_in_missing(inputs["text"],"")
return outputs
preprocessor = tfx.components.Transform(
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
module_file=os.path.abspath(_transform_module_file))
context.run(preprocessor, enable_cache=False)
Let's take a look at some of the transformed examples and check that they are indeed processed as intended.
def pprint_examples(artifact, n_examples=2):
print("artifact:", artifact, "\n")
uri = os.path.join(artifact.uri, "Split-eval")
print("uri:", uri, "\n")
tfrecord_filenames = [os.path.join(uri, name) for name in os.listdir(uri)]
print("tfrecord_filenames:", tfrecord_filenames, "\n")
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
for tfrecord in dataset.take(n_examples):
serialized_example = tfrecord.numpy()
example = tf.train.Example.FromString(serialized_example)
pp.pprint(example)
pprint_examples(preprocessor.outputs['transformed_examples'].get()[0])
Trainer¶
Trainer component trains an ML model, and it requires a model definition code from users.
The run_fn function in TFX's Trainer component is the entry point for training a machine learning model. It is a user-supplied function that takes in a set of arguments and returns a model artifact.
The run_fn function is responsible for:
- Building the machine learning model.
- Training the model on the training data.
- Saving the trained model to the serving model directory.
Write model training code¶
We will create a very simple fine-tuned model, with the preprocessing GPT-2 model. First, we need to create a module that contains the run_fn function for TFX Trainer because TFX Trainer expects the run_fn function to be defined in a module.
model_file = "modules/model.py"
model_fn = "modules.model.run_fn"
Now, we write the run_fn function:
This run_fn function first gets the training data from the fn_args.examples argument. It then gets the schema of the training data from the fn_args.schema argument. Next, it loads finetuned GPT-2 model along with its preprocessor. The model is then trained on the training data using the model.train() method.
Finally, the trained model weights are saved to the fn_args.serving_model_dir argument.
Now, we are going to work with Keras NLP's GPT-2 Model! You can learn about the full GPT-2 model implementation in KerasNLP on GitHub or can read and interactively test the model on Google IO2023 colab notebook.
import keras_nlp
import keras
import tensorflow as tf
Note: To accommodate the limited resources of a free Colab GPU, we've adjusted the GPT-2 model's sequence_length parameter to 128 from its default 256. This optimization enables efficient model training on the T4 GPU, facilitating faster fine-tuning while adhering to resource constraints.
%%writefile {model_file}
import os
import time
from absl import logging
import keras_nlp
import more_itertools
import pandas as pd
import tensorflow as tf
import keras
import tfx
import tfx.components.trainer.fn_args_utils
import gc
_EPOCH = 1
_BATCH_SIZE = 20
_INITIAL_LEARNING_RATE = 5e-5
_END_LEARNING_RATE = 0.0
_SEQUENCE_LENGTH = 128 # default value is 256
def _input_fn(file_pattern: str) -> list:
"""Retrieves training data and returns a list of articles for training.
For each row in the TFRecordDataset, generated in the previous ExampleGen
component, create a new tf.train.Example object and parse the TFRecord into
the example object. Articles, which are initially in bytes objects, are
decoded into a string.
Args:
file_pattern: Path to the TFRecord file of the training dataset.
Returns:
A list of training articles.
Raises:
FileNotFoundError: If TFRecord dataset is not found in the file_pattern
directory.
"""
if os.path.basename(file_pattern) == '*':
file_loc = os.path.dirname(file_pattern)
else:
raise FileNotFoundError(
f"There is no file in the current directory: '{file_pattern}."
)
file_paths = [os.path.join(file_loc, name) for name in os.listdir(file_loc)]
train_articles = []
parsed_dataset = tf.data.TFRecordDataset(file_paths, compression_type="GZIP")
for raw_record in parsed_dataset:
example = tf.train.Example()
example.ParseFromString(raw_record.numpy())
train_articles.append(
example.features.feature["summary"].bytes_list.value[0].decode('utf-8')
)
return train_articles
def run_fn(fn_args: tfx.components.trainer.fn_args_utils.FnArgs) -> None:
"""Trains the model and outputs the trained model to a the desired location given by FnArgs.
Args:
FnArgs : Args to pass to user defined training/tuning function(s)
"""
train_articles = pd.Series(_input_fn(
fn_args.train_files[0],
))
tf_train_ds = tf.data.Dataset.from_tensor_slices(train_articles)
gpt2_preprocessor = keras_nlp.models.GPT2CausalLMPreprocessor.from_preset(
'gpt2_base_en',
sequence_length=_SEQUENCE_LENGTH,
add_end_token=True,
)
gpt2_lm = keras_nlp.models.GPT2CausalLM.from_preset(
'gpt2_base_en', preprocessor=gpt2_preprocessor
)
processed_ds = (
tf_train_ds
.batch(_BATCH_SIZE)
.cache()
.prefetch(tf.data.AUTOTUNE)
)
gpt2_lm.include_preprocessing = False
lr = tf.keras.optimizers.schedules.PolynomialDecay(
5e-5,
decay_steps=processed_ds.cardinality() * _EPOCH,
end_learning_rate=0.0,
)
loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
gpt2_lm.compile(
optimizer=keras.optimizers.Adam(lr),
loss=loss,
weighted_metrics=['accuracy'],
)
gpt2_lm.fit(processed_ds, epochs=_EPOCH)
if os.path.exists(fn_args.serving_model_dir):
os.rmdir(fn_args.serving_model_dir)
os.mkdir(fn_args.serving_model_dir)
gpt2_lm.save_weights(
filepath=os.path.join(fn_args.serving_model_dir, "model_weights.weights.h5")
)
del gpt2_lm, gpt2_preprocessor, processed_ds, tf_train_ds
gc.collect()
trainer = tfx.components.Trainer(
run_fn=model_fn,
examples=preprocessor.outputs['transformed_examples'],
train_args=tfx.proto.TrainArgs(splits=['train']),
eval_args=tfx.proto.EvalArgs(splits=['train']),
schema=schema_gen.outputs['schema'],
)
context.run(trainer, enable_cache=False)
Inference and Evaluation¶
With our model fine-tuned, let's evaluate its performance by generating inferences. To capture and preserve these results, we'll create an EvaluationMetric artifact.
from tfx.types import artifact
from tfx import types
Property = artifact.Property
PropertyType = artifact.PropertyType
DURATION_PROPERTY = Property(type=PropertyType.FLOAT)
EVAL_OUTPUT_PROPERTY = Property(type=PropertyType.STRING)
class EvaluationMetric(types.Artifact):
"""Artifact that contains metrics for a model.
* Properties:
- 'model_prediction_time' : time it took for the model to make predictions
based on the input text.
- 'model_evaluation_output_path' : saves the path to the CSV file that
contains the model's prediction based on the testing inputs.
"""
TYPE_NAME = 'Evaluation_Metric'
PROPERTIES = {
'model_prediction_time': DURATION_PROPERTY,
'model_evaluation_output_path': EVAL_OUTPUT_PROPERTY,
}
These helper functions contribute to the evaluation of a language model (LLM) by providing tools for calculating perplexity, a key metric reflecting the model's ability to predict the next word in a sequence, and by facilitating the extraction, preparation, and processing of evaluation data. The input_fn function retrieves training data from a specified TFRecord file, while the trim_sentence function ensures consistency by limiting sentence length. A lower perplexity score indicates higher prediction confidence and generally better model performance, making these functions essential for comprehensive evaluation within the LLM pipeline.
"""This is an evaluation component for the LLM pipeline takes in a
standard trainer artifact and outputs a custom evaluation artifact.
It displays the evaluation output in the colab notebook.
"""
import os
import time
import keras_nlp
import numpy as np
import pandas as pd
import tensorflow as tf
import tfx.v1 as tfx
def input_fn(file_pattern: str) -> list:
"""Retrieves training data and returns a list of articles for training.
Args:
file_pattern: Path to the TFRecord file of the training dataset.
Returns:
A list of test articles
Raises:
FileNotFoundError: If the file path does not exist.
"""
if os.path.exists(file_pattern):
file_paths = [os.path.join(file_pattern, name) for name in os.listdir(file_pattern)]
test_articles = []
parsed_dataset = tf.data.TFRecordDataset(file_paths, compression_type="GZIP")
for raw_record in parsed_dataset:
example = tf.train.Example()
example.ParseFromString(raw_record.numpy())
test_articles.append(
example.features.feature["summary"].bytes_list.value[0].decode('utf-8')
)
return test_articles
else:
raise FileNotFoundError(f'File path "{file_pattern}" does not exist.')
def trim_sentence(sentence: str, max_words: int = 20):
"""Trims the sentence to include up to the given number of words.
Args:
sentence: The sentence to trim.
max_words: The maximum number of words to include in the trimmed sentence.
Returns:
The trimmed sentence.
"""
words = sentence.split(' ')
if len(words) <= max_words:
return sentence
return ' '.join(words[:max_words])

One of the useful metrics for evaluating a Large Language Model is Perplexity. Perplexity is a measure of how well a language model predicts the next token in a sequence. It is calculated by taking the exponentiation of the average negative log-likelihood of the next token. A lower perplexity score indicates that the language model is better at predicting the next token.
This is the formula for calculating perplexity.
$\text{Perplexity} = \exp(-1 * $ Average Negative Log Likelihood $) = \exp\left(-\frac{1}{T} \sum_{t=1}^T \log p(w_t | w_{<t})\right)$.
In this colab notebook, we calculate perplexity using keras_nlp's perplexity.
Computing Perplexity for Base GPT-2 Model and Finetuned Model
The code below is the function which will be used later in the notebook for computing perplexity for the base GPT-2 model and the finetuned model.
def calculate_perplexity(gpt2_model, gpt2_tokenizer, sentence) -> int:
"""Calculates perplexity of a model given a sentence.
Args:
gpt2_model: GPT-2 Language Model
gpt2_tokenizer: A GPT-2 tokenizer using Byte-Pair Encoding subword segmentation.
sentence: Sentence that the model's perplexity is calculated upon.
Returns:
A perplexity score.
"""
# gpt2_tokenizer([sentence])[0] produces a tensor containing an array of tokens that form the sentence.
tokens = gpt2_tokenizer([sentence])[0].numpy()
# decoded_sentences is an array containing sentences that increase by one token in size.
# e.g. if tokens for a sentence "I love dogs" are ["I", "love", "dogs"], then decoded_sentences = ["I love", "I love dogs"]
decoded_sentences = [gpt2_tokenizer.detokenize([tokens[:i]])[0].numpy() for i in range(1, len(tokens))]
predictions = gpt2_model.predict(decoded_sentences)
logits = [predictions[i - 1][i] for i in range(1, len(tokens))]
target = tokens[1:].reshape(len(tokens) - 1, 1)
perplexity = keras_nlp.metrics.Perplexity(from_logits=True)
perplexity.update_state(target, logits)
result = perplexity.result()
return result.numpy()
def average_perplexity(gpt2_model, gpt2_tokenizer, sentences):
perplexity_lst = [calculate_perplexity(gpt2_model, gpt2_tokenizer, sent) for sent in sentences]
return np.mean(perplexity_lst)
Evaluator¶
Having established the necessary helper functions for evaluation, we proceed to define the Evaluator component. This component facilitates model inference using both base and fine-tuned models, computes perplexity scores for all models, and measures inference time. The Evaluator's output provides comprehensive insights for a thorough comparison and assessment of each model's performance.
@tfx.dsl.components.component
def Evaluator(
examples: tfx.dsl.components.InputArtifact[
tfx.types.standard_artifacts.Examples
],
trained_model: tfx.dsl.components.InputArtifact[
tfx.types.standard_artifacts.Model
],
max_length: tfx.dsl.components.Parameter[int],
evaluation: tfx.dsl.components.OutputArtifact[EvaluationMetric],
) -> None:
"""Makes inferences with base model, finetuned model, TFlite model, and quantized model.
Args:
examples: Standard TFX examples artifacts for retreiving test dataset.
trained_model: Standard TFX trained model artifact finetuned with imdb-reviews
dataset.
tflite_model: Unquantized TFLite model.
quantized_model: Quantized TFLite model.
max_length: Length of the text that the model generates given custom input
statements.
evaluation: An evaluation artifact that saves predicted outcomes of custom
inputs in a csv document and inference speed of the model.
"""
_TEST_SIZE = 10
_INPUT_LENGTH = 10
_SEQUENCE_LENGTH = 128
path = os.path.join(examples.uri, 'Split-eval')
test_data = input_fn(path)
evaluation_inputs = [
trim_sentence(article, max_words=_INPUT_LENGTH)
for article in test_data[:_TEST_SIZE]
]
true_test = [
trim_sentence(article, max_words=max_length)
for article in test_data[:_TEST_SIZE]
]
# Loading base model, making inference, and calculating perplexity on the base model.
gpt2_preprocessor = keras_nlp.models.GPT2CausalLMPreprocessor.from_preset(
'gpt2_base_en',
sequence_length=_SEQUENCE_LENGTH,
add_end_token=True,
)
gpt2_lm = keras_nlp.models.GPT2CausalLM.from_preset(
'gpt2_base_en', preprocessor=gpt2_preprocessor
)
gpt2_tokenizer = keras_nlp.models.GPT2Tokenizer.from_preset('gpt2_base_en')
base_average_perplexity = average_perplexity(
gpt2_lm, gpt2_tokenizer, true_test
)
start_base_model = time.time()
base_evaluation = [
gpt2_lm.generate(input, max_length)
for input in evaluation_inputs
]
end_base_model = time.time()
# Loading finetuned model and making inferences with the finetuned model.
model_weights_path = os.path.join(
trained_model.uri, "Format-Serving", "model_weights.weights.h5"
)
gpt2_lm.load_weights(model_weights_path)
trained_model_average_perplexity = average_perplexity(
gpt2_lm, gpt2_tokenizer, true_test
)
start_trained = time.time()
trained_evaluation = [
gpt2_lm.generate(input, max_length)
for input in evaluation_inputs
]
end_trained = time.time()
# Building an inference table.
inference_data = {
'input': evaluation_inputs,
'actual_test_output': true_test,
'base_model_prediction': base_evaluation,
'trained_model_prediction': trained_evaluation,
}
models = [
'Base Model',
'Finetuned Model',
]
inference_time = [
(end_base_model - start_base_model),
(end_trained - start_trained),
]
average_inference_time = [time / _TEST_SIZE for time in inference_time]
average_perplexity_lst = [
base_average_perplexity,
trained_model_average_perplexity,
]
evaluation_data = {
'Model': models,
'Average Inference Time (sec)': average_inference_time,
'Average Perplexity': average_perplexity_lst,
}
# creating directory in examples artifact to save metric dataframes
metrics_path = os.path.join(evaluation.uri, 'metrics')
if not os.path.exists(metrics_path):
os.mkdir(metrics_path)
evaluation_df = pd.DataFrame(evaluation_data).set_index('Model').transpose()
evaluation_path = os.path.join(metrics_path, 'evaluation_output.csv')
evaluation_df.to_csv(evaluation_path)
inference_df = pd.DataFrame(inference_data)
inference_path = os.path.join(metrics_path, 'inference_output.csv')
inference_df.to_csv(inference_path)
evaluation.model_evaluation_output_path = inference_path
evaluator = Evaluator(examples = preprocessor.outputs['transformed_examples'],
trained_model = trainer.outputs['model'],
max_length = 50)
context.run(evaluator, enable_cache = False)
Evaluator Results¶
Once our evaluation component execution is completed, we will load the evaluation metrics from evaluator URI and display them.
Note:
Perplexity Calculation: Perplexity is only one of many ways to evaluate LLMs. LLM evaluation is an active research topic and a comprehensive treatment is beyond the scope of this notebook.
evaluation_path = os.path.join(evaluator.outputs['evaluation']._artifacts[0].uri, 'metrics')
inference_df = pd.read_csv(os.path.join(evaluation_path, 'inference_output.csv'), index_col=0)
evaluation_df = pd.read_csv(os.path.join(evaluation_path, 'evaluation_output.csv'), index_col=0)
The fine-tuned GPT-2 model exhibits a slight improvement in perplexity compared to the baseline model. Further training with more epochs or a larger dataset may yield more substantial perplexity reductions.
from IPython import display
display.display(display.HTML(inference_df.to_html()))
display.display(display.HTML(evaluation_df.to_html()))
Running the Entire Pipeline¶
Note: For running below section, a more substantial amount of GPU memory is required. Therefore, Colab Pro or a local machine equipped with a higher-capacity GPU is recommended for running below pipeline.
TFX supports multiple orchestrators to run pipelines. In this tutorial we will use LocalDagRunner which is included in the TFX Python package and runs pipelines on local environment. We often call TFX pipelines "DAGs" which stands for directed acyclic graph.
LocalDagRunner provides fast iterations for development and debugging. TFX also supports other orchestrators including Kubeflow Pipelines and Apache Airflow which are suitable for production use cases. See TFX on Cloud AI Platform Pipelines or TFX Airflow Tutorial to learn more about other orchestration systems.
Now we create a LocalDagRunner and pass a Pipeline object created from the function we already defined. The pipeline runs directly and you can see logs for the progress of the pipeline including ML model training.
import urllib.request
import tempfile
import os
PIPELINE_NAME = "tfx-llm-imdb-reviews"
model_fn = "modules.model.run_fn"
_transform_module_file = "modules/_transform_module.py"
# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')
# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)
from absl import logging
logging.set_verbosity(logging.INFO) # Set default logging level.
def _create_pipeline(
pipeline_name: str,
pipeline_root: str,
model_fn: str,
serving_model_dir: str,
metadata_path: str,
) -> tfx.dsl.Pipeline:
"""Creates a Pipeline for Fine-Tuning and Converting an Large Language Model with TFX."""
example_gen = FileBasedExampleGen(
input_base='dummy',
custom_config={'dataset':'imdb_reviews', 'split':'train[:5%]'},
custom_executor_spec=executor_spec.BeamExecutorSpec(TFDSExecutor))
statistics_gen = tfx.components.StatisticsGen(
examples=example_gen.outputs['examples'], exclude_splits=['eval']
)
schema_gen = tfx.components.SchemaGen(
statistics=statistics_gen.outputs['statistics'],
infer_feature_shape=False,
exclude_splits=['eval'],
)
example_validator = tfx.components.ExampleValidator(
statistics=statistics_gen.outputs['statistics'],
schema=schema_gen.outputs['schema'],
exclude_splits=['eval'],
)
preprocessor = tfx.components.Transform(
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
module_file= _transform_module_file,
)
trainer = tfx.components.Trainer(
run_fn=model_fn,
examples=preprocessor.outputs['transformed_examples'],
train_args=tfx.proto.TrainArgs(splits=['train']),
eval_args=tfx.proto.EvalArgs(splits=['train']),
schema=schema_gen.outputs['schema'],
)
evaluator = Evaluator(
examples=preprocessor.outputs['transformed_examples'],
trained_model=trainer.outputs['model'],
max_length=50,
)
# Following 7 components will be included in the pipeline.
components = [
example_gen,
statistics_gen,
schema_gen,
example_validator,
preprocessor,
trainer,
evaluator,
]
return tfx.dsl.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(
metadata_path
),
components=components,
)
tfx.orchestration.LocalDagRunner().run(
_create_pipeline(
pipeline_name=PIPELINE_NAME,
pipeline_root=PIPELINE_ROOT,
model_fn=model_fn,
serving_model_dir=SERVING_MODEL_DIR,
metadata_path=METADATA_PATH,
)
)
You should see INFO:absl:Component Evaluator is finished." at the end of the logs if the pipeline finished successfully because evaluator component is the last component of the pipeline.
View on TensorFlow.org
Run in Google Colab
View source on GitHub
Download notebook