Copyright 2022 The TensorFlow Authors.¶
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Recommending Movies: Recommender Models in TFX¶
Note: We recommend running this tutorial in a Colab notebook, with no setup required! Just click "Run in Google Colab".
TFRS Tutorial Ported to TFX¶
This is a port of a basic TensorFlow Recommenders (TFRS) tutorial to TFX, which is designed to demonstrate how to use TFRS in a TFX pipeline. It mirrors the basic tutorial.
For context, real-world recommender systems are often composed of two stages:
- The retrieval stage is responsible for selecting an initial set of hundreds of candidates from all possible candidates. The main objective of this model is to efficiently weed out all candidates that the user is not interested in. Because the retrieval model may be dealing with millions of candidates, it has to be computationally efficient.
- The ranking stage takes the outputs of the retrieval model and fine-tunes them to select the best possible handful of recommendations. Its task is to narrow down the set of items the user may be interested in to a shortlist of likely candidates.
In this tutorial, we're going to focus on the first stage, retrieval. Retrieval models are often composed of two sub-models:
- A query model computing the query representation (normally a fixed-dimensionality embedding vector) using query features.
- A candidate model computing the candidate representation (an equally-sized vector) using the candidate features
The outputs of the two models are then multiplied together to give a query-candidate affinity score, with higher scores expressing a better match between the candidate and the query.
In this tutorial, we're going to build and train such a two-tower model using the Movielens dataset.
We're going to:
- Ingest and inspect the MovieLens dataset.
- Implement a retrieval model.
- Train and export the model.
- Make predictions
The dataset¶
The Movielens dataset is a classic dataset from the GroupLens research group at the University of Minnesota. It contains a set of ratings given to movies by a set of users, and is a workhorse of recommender system research.
The data can be treated in two ways:
- It can be interpreted as expressesing which movies the users watched (and rated), and which they did not. This is a form of implicit feedback, where users' watches tell us which things they prefer to see and which they'd rather not see.
- It can also be seen as expressesing how much the users liked the movies they did watch. This is a form of explicit feedback: given that a user watched a movie, we can tell roughly how much they liked by looking at the rating they have given.
In this tutorial, we are focusing on a retrieval system: a model that predicts a set of movies from the catalogue that the user is likely to watch. Often, implicit data is more useful here, and so we are going to treat Movielens as an implicit system. This means that every movie a user watched is a positive example, and every movie they have not seen is an implicit negative example.
Imports¶
Let's first get our imports out of the way.
!pip install -Uq tfx
!pip install -Uq tensorflow-recommenders
!pip install -Uq tensorflow-datasets
Did you restart the runtime?¶
If you are using Google Colab, the first time that you run the cell above, you must restart the runtime (Runtime > Restart runtime ...). This is because of the way that Colab loads packages.
import os
import absl
import json
import pprint
import tempfile
from typing import Any, Dict, List, Text
import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds
import tensorflow_recommenders as tfrs
import apache_beam as beam
from absl import logging
from tfx.components.example_gen.base_example_gen_executor import BaseExampleGenExecutor
from tfx.components.example_gen.component import FileBasedExampleGen
from tfx.components.example_gen import utils
from tfx.dsl.components.base import executor_spec
from tfx.types import artifact
from tfx.types import artifact_utils
from tfx.types import channel
from tfx.types import standard_artifacts
from tfx.types.standard_artifacts import Examples
from tfx.dsl.component.experimental.annotations import InputArtifact
from tfx.dsl.component.experimental.annotations import OutputArtifact
from tfx.dsl.component.experimental.annotations import Parameter
from tfx.dsl.component.experimental.decorators import component
from tfx.types.experimental.simple_artifacts import Dataset
from tfx import v1 as tfx
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
# Set up logging.
tf.get_logger().propagate = False
absl.logging.set_verbosity(absl.logging.INFO)
pp = pprint.PrettyPrinter()
print(f"TensorFlow version: {tf.__version__}")
print(f"TFX version: {tfx.__version__}")
print(f"TensorFlow Recommenders version: {tfrs.__version__}")
%load_ext tfx.orchestration.experimental.interactive.notebook_extensions.skip
Create a TFDS ExampleGen¶
We create a custom ExampleGen component which we use to load a TensorFlow Datasets (TFDS) dataset. This uses a custom executor in a FileBasedExampleGen.
@beam.ptransform_fn
@beam.typehints.with_input_types(beam.Pipeline)
@beam.typehints.with_output_types(tf.train.Example)
def _TFDatasetToExample( # pylint: disable=invalid-name
pipeline: beam.Pipeline,
exec_properties: Dict[str, Any],
split_pattern: str
) -> beam.pvalue.PCollection:
"""Read a TensorFlow Dataset and create tf.Examples"""
custom_config = json.loads(exec_properties['custom_config'])
dataset_name = custom_config['dataset']
split_name = custom_config['split']
builder = tfds.builder(dataset_name)
builder.download_and_prepare()
return (pipeline
| 'MakeExamples' >> tfds.beam.ReadFromTFDS(builder, split=split_name)
| 'AsNumpy' >> beam.Map(tfds.as_numpy)
| 'ToDict' >> beam.Map(dict)
| 'ToTFExample' >> beam.Map(utils.dict_to_example)
)
class TFDSExecutor(BaseExampleGenExecutor):
def GetInputSourceToExamplePTransform(self) -> beam.PTransform:
"""Returns PTransform for TF Dataset to TF examples."""
return _TFDatasetToExample
Init TFX Pipeline Context¶
context = InteractiveContext()
Preparing the dataset¶
We will use our custom executor in a FileBasedExampleGen
to load our datasets from TFDS. Since we have two datasets, we will create two ExampleGen
components.
# Ratings data.
ratings_example_gen = FileBasedExampleGen(
input_base='dummy',
custom_config={'dataset':'movielens/100k-ratings', 'split':'train'},
custom_executor_spec=executor_spec.ExecutorClassSpec(TFDSExecutor))
context.run(ratings_example_gen, enable_cache=True)
# Features of all the available movies.
movies_example_gen = FileBasedExampleGen(
input_base='dummy',
custom_config={'dataset':'movielens/100k-movies', 'split':'train'},
custom_executor_spec=executor_spec.ExecutorClassSpec(TFDSExecutor))
context.run(movies_example_gen, enable_cache=True)
Create inspect_examples
utility¶
We create a convenience utility to inspect datasets of TF.Examples. The ratings dataset returns a dictionary of movie id, user id, the assigned rating, timestamp, movie information, and user information:
def inspect_examples(component,
channel_name='examples',
split_name='train',
num_examples=1):
# Get the URI of the output artifact, which is a directory
full_split_name = 'Split-{}'.format(split_name)
print('channel_name: {}, split_name: {} (\"{}\"), num_examples: {}\n'.format(
channel_name, split_name, full_split_name, num_examples))
train_uri = os.path.join(
component.outputs[channel_name].get()[0].uri, full_split_name)
# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
for name in os.listdir(train_uri)]
# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
# Iterate over the records and print them
for tfrecord in dataset.take(num_examples):
serialized_example = tfrecord.numpy()
example = tf.train.Example()
example.ParseFromString(serialized_example)
pp.pprint(example)
inspect_examples(ratings_example_gen)
The movies dataset contains the movie id, movie title, and data on what genres it belongs to. Note that the genres are encoded with integer labels.
inspect_examples(movies_example_gen)
ExampleGen did the split¶
When we ingested the movie lens dataset, our ExampleGen
component split the data into train
and eval
splits. They are actually named Split-train
and Split-eval
. By default the split is 66% training, 34% evaluation.
Generate statistics for movies and ratings¶
For a TFX pipeline we need to generate statistics for the dataset. We do that by using a StatisticsGen component. These will be used by the SchemaGen component below when we generate a schema for our dataset. This is good practice anyway, because it's important to examine and analyze your data on an ongoing basis. Since we have two datasets we will create two StatisticsGen components.
movies_stats_gen = tfx.components.StatisticsGen(
examples=movies_example_gen.outputs['examples'])
context.run(movies_stats_gen, enable_cache=True)
context.show(movies_stats_gen.outputs['statistics'])
ratings_stats_gen = tfx.components.StatisticsGen(
examples=ratings_example_gen.outputs['examples'])
context.run(ratings_stats_gen, enable_cache=True)
context.show(ratings_stats_gen.outputs['statistics'])
Create schemas for movies and ratings¶
For a TFX pipeline we need to generate a data schema from our dataset. We do that by using a SchemaGen component. This will be used by the Transform component below to do our feature engineering in a way that is highly scalable to large datasets, and avoids training/serving skew. Since we have two datasets we will create two SchemaGen components.
movies_schema_gen = tfx.components.SchemaGen(
statistics=movies_stats_gen.outputs['statistics'],
infer_feature_shape=False)
context.run(movies_schema_gen, enable_cache=True)
context.show(movies_schema_gen.outputs['schema'])
ratings_schema_gen = tfx.components.SchemaGen(
statistics=ratings_stats_gen.outputs['statistics'],
infer_feature_shape=False)
context.run(ratings_schema_gen, enable_cache=True)
context.show(ratings_schema_gen.outputs['schema'])
Feature Engineering using Transform¶
For a structured and repeatable design of a TFX pipeline we will need a scalable approach to feature engineering. This allows us to handle the large datasets which are usually part of many recommender systems, and it also avoids training/serving skew. We will do that using the Transform component.
The Transform component uses a module file to supply user code for the feature engineering what we want to do, so our first step is to create that module file. Since we have two datasets, we will create two of these module files and two Transform components.
One of the things that our recommender needs is vocabularies for the user_id
and movie_title
fields. In the basic_retrieval tutorial those are created with inline Numpy, but here we will use Transform.
Note: The %%writefile {_movies_transform_module_file}
cell magic below creates and writes the contents of that cell to a file on the notebook server where this notebook is running (for example, the Colab VM). When doing this outside of a notebook you would just create a Python file.
_movies_transform_module_file = 'movies_transform_module.py'
%%writefile {_movies_transform_module_file}
import tensorflow as tf
import tensorflow_transform as tft
def preprocessing_fn(inputs):
# We only want the movie title
return {'movie_title':inputs['movie_title']}
movies_transform = tfx.components.Transform(
examples=movies_example_gen.outputs['examples'],
schema=movies_schema_gen.outputs['schema'],
module_file=os.path.abspath(_movies_transform_module_file))
context.run(movies_transform, enable_cache=True)
context.show(movies_transform.outputs['post_transform_schema'])
inspect_examples(movies_transform, channel_name='transformed_examples')
_ratings_transform_module_file = 'ratings_transform_module.py'
%%writefile {_ratings_transform_module_file}
import tensorflow as tf
import tensorflow_transform as tft
import pdb
NUM_OOV_BUCKETS = 1
def preprocessing_fn(inputs):
# We only want the user ID and the movie title, but we also need vocabularies
# for both of them. The vocabularies aren't features, they're only used by
# the lookup.
outputs = {}
outputs['user_id'] = tft.sparse_tensor_to_dense_with_shape(inputs['user_id'], [None, 1], '-1')
outputs['movie_title'] = tft.sparse_tensor_to_dense_with_shape(inputs['movie_title'], [None, 1], '-1')
tft.compute_and_apply_vocabulary(
inputs['user_id'],
num_oov_buckets=NUM_OOV_BUCKETS,
vocab_filename='user_id_vocab')
tft.compute_and_apply_vocabulary(
inputs['movie_title'],
num_oov_buckets=NUM_OOV_BUCKETS,
vocab_filename='movie_title_vocab')
return outputs
ratings_transform = tfx.components.Transform(
examples=ratings_example_gen.outputs['examples'],
schema=ratings_schema_gen.outputs['schema'],
module_file=os.path.abspath(_ratings_transform_module_file))
context.run(ratings_transform, enable_cache=True)
context.show(ratings_transform.outputs['post_transform_schema'])
inspect_examples(ratings_transform, channel_name='transformed_examples')
Implementing a model in TFX¶
In the basic_retrieval tutorial the model was created inline in the Python runtime. In a TFX pipeline, the model, metric, and loss are defined and trained in the module file for a pipeline component called Trainer. This makes the model, metric, and loss part of a repeatable process which can be automated and monitored.
TensorFlow Recommenders model architecture¶
We are going to build a two-tower retrieval model. The concept of two-tower means we will have a query tower computing the user representation using user features, and another item tower computing the movie representation using the movie features. We can build each tower separately (in the _build_user_model()
and _build_movie_model()
methods below) and then combine them in the final model (as in the MobieLensModel
class). MovieLensModel
is a subclass of tfrs.Model
base class, which streamlines building models: all we need to do is to set up the components in the __init__
method, and implement the compute_loss
method, taking in the raw features and returning a loss value.
# We're now going to create the module file for Trainer, which will include the
# code above with some modifications for TFX.
_trainer_module_file = 'trainer_module.py'
%%writefile {_trainer_module_file}
from typing import Dict, List, Text
import pdb
import os
import absl
import datetime
import glob
import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_recommenders as tfrs
from absl import logging
from tfx.types import artifact_utils
from tfx import v1 as tfx
from tfx_bsl.coders import example_coder
from tfx_bsl.public import tfxio
absl.logging.set_verbosity(absl.logging.INFO)
EMBEDDING_DIMENSION = 32
INPUT_FN_BATCH_SIZE = 1
def extract_str_feature(dataset, feature_name):
np_dataset = []
for example in dataset:
np_example = example_coder.ExampleToNumpyDict(example.numpy())
np_dataset.append(np_example[feature_name][0].decode())
return tf.data.Dataset.from_tensor_slices(np_dataset)
class MovielensModel(tfrs.Model):
def __init__(self, user_model, movie_model, tf_transform_output, movies_uri):
super().__init__()
self.movie_model: tf.keras.Model = movie_model
self.user_model: tf.keras.Model = user_model
movies_artifact = movies_uri.get()[0]
input_dir = artifact_utils.get_split_uri([movies_artifact], 'train')
movie_files = glob.glob(os.path.join(input_dir, '*'))
movies = tf.data.TFRecordDataset(movie_files, compression_type="GZIP")
movies_dataset = extract_str_feature(movies, 'movie_title')
loss_metrics = tfrs.metrics.FactorizedTopK(
candidates=movies_dataset.batch(128).map(movie_model)
)
self.task: tf.keras.layers.Layer = tfrs.tasks.Retrieval(
metrics=loss_metrics
)
def compute_loss(self, features: Dict[Text, tf.Tensor], training=False) -> tf.Tensor:
# We pick out the user features and pass them into the user model.
try:
user_embeddings = tf.squeeze(self.user_model(features['user_id']), axis=1)
# And pick out the movie features and pass them into the movie model,
# getting embeddings back.
positive_movie_embeddings = self.movie_model(features['movie_title'])
# The task computes the loss and the metrics.
_task = self.task(user_embeddings, positive_movie_embeddings)
except BaseException as err:
logging.error('######## ERROR IN compute_loss:\n{}\n###############'.format(err))
return _task
# This function will apply the same transform operation to training data
# and serving requests.
def _apply_preprocessing(raw_features, tft_layer):
try:
transformed_features = tft_layer(raw_features)
except BaseException as err:
logging.error('######## ERROR IN _apply_preprocessing:\n{}\n###############'.format(err))
return transformed_features
def _input_fn(file_pattern: List[Text],
data_accessor: tfx.components.DataAccessor,
tf_transform_output: tft.TFTransformOutput,
batch_size: int = 200) -> tf.data.Dataset:
"""Generates features and label for tuning/training.
Args:
file_pattern: List of paths or patterns of input tfrecord files.
data_accessor: DataAccessor for converting input to RecordBatch.
tf_transform_output: A TFTransformOutput.
batch_size: representing the number of consecutive elements of returned
dataset to combine in a single batch
Returns:
A dataset that contains (features, indices) tuple where features is a
dictionary of Tensors, and indices is a single Tensor of label indices.
"""
try:
return data_accessor.tf_dataset_factory(
file_pattern,
tfxio.TensorFlowDatasetOptions(
batch_size=batch_size),
tf_transform_output.transformed_metadata.schema)
except BaseException as err:
logging.error('######## ERROR IN _input_fn:\n{}\n###############'.format(err))
return None
def _get_serve_tf_examples_fn(model, tf_transform_output):
"""Returns a function that parses a serialized tf.Example and applies TFT."""
try:
model.tft_layer = tf_transform_output.transform_features_layer()
@tf.function
def serve_tf_examples_fn(serialized_tf_examples):
"""Returns the output to be used in the serving signature."""
try:
feature_spec = tf_transform_output.raw_feature_spec()
parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
transformed_features = model.tft_layer(parsed_features)
result = model(transformed_features)
except BaseException as err:
logging.error('######## ERROR IN serve_tf_examples_fn:\n{}\n###############'.format(err))
return result
except BaseException as err:
logging.error('######## ERROR IN _get_serve_tf_examples_fn:\n{}\n###############'.format(err))
return serve_tf_examples_fn
def _build_user_model(
tf_transform_output: tft.TFTransformOutput, # Specific to ratings
embedding_dimension: int = 32) -> tf.keras.Model:
"""Creates a Keras model for the query tower.
Args:
tf_transform_output: [tft.TFTransformOutput], the results of Transform
embedding_dimension: [int], the dimensionality of the embedding space
Returns:
A keras Model.
"""
try:
unique_user_ids = tf_transform_output.vocabulary_by_name('user_id_vocab')
users_vocab_str = [b.decode() for b in unique_user_ids]
model = tf.keras.Sequential(
[
tf.keras.layers.StringLookup(
vocabulary=users_vocab_str, mask_token=None),
# We add an additional embedding to account for unknown tokens.
tf.keras.layers.Embedding(len(users_vocab_str) + 1, embedding_dimension)
])
except BaseException as err:
logging.error('######## ERROR IN _build_user_model:\n{}\n###############'.format(err))
return model
def _build_movie_model(
tf_transform_output: tft.TFTransformOutput, # Specific to movies
embedding_dimension: int = 32) -> tf.keras.Model:
"""Creates a Keras model for the candidate tower.
Args:
tf_transform_output: [tft.TFTransformOutput], the results of Transform
embedding_dimension: [int], the dimensionality of the embedding space
Returns:
A keras Model.
"""
try:
unique_movie_titles = tf_transform_output.vocabulary_by_name('movie_title_vocab')
titles_vocab_str = [b.decode() for b in unique_movie_titles]
model = tf.keras.Sequential(
[
tf.keras.layers.StringLookup(
vocabulary=titles_vocab_str, mask_token=None),
# We add an additional embedding to account for unknown tokens.
tf.keras.layers.Embedding(len(titles_vocab_str) + 1, embedding_dimension)
])
except BaseException as err:
logging.error('######## ERROR IN _build_movie_model:\n{}\n###############'.format(err))
return model
# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
"""Train the model based on given args.
Args:
fn_args: Holds args used to train the model as name/value pairs.
"""
try:
tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
train_dataset = _input_fn(fn_args.train_files, fn_args.data_accessor,
tf_transform_output, INPUT_FN_BATCH_SIZE)
eval_dataset = _input_fn(fn_args.eval_files, fn_args.data_accessor,
tf_transform_output, INPUT_FN_BATCH_SIZE)
model = MovielensModel(
_build_user_model(tf_transform_output, EMBEDDING_DIMENSION),
_build_movie_model(tf_transform_output, EMBEDDING_DIMENSION),
tf_transform_output,
fn_args.custom_config['movies']
)
tensorboard_callback = tf.keras.callbacks.TensorBoard(
log_dir=fn_args.model_run_dir, update_freq='batch')
model.compile(optimizer=tf.keras.optimizers.Adagrad(learning_rate=0.1))
except BaseException as err:
logging.error('######## ERROR IN run_fn before fit:\n{}\n###############'.format(err))
try:
model.fit(
train_dataset,
epochs=fn_args.custom_config['epochs'],
steps_per_epoch=fn_args.train_steps,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps,
callbacks=[tensorboard_callback])
except BaseException as err:
logging.error('######## ERROR IN run_fn during fit:\n{}\n###############'.format(err))
try:
index = tfrs.layers.factorized_top_k.BruteForce(model.user_model)
movies_artifact = fn_args.custom_config['movies'].get()[0]
input_dir = artifact_utils.get_split_uri([movies_artifact], 'eval')
movie_files = glob.glob(os.path.join(input_dir, '*'))
movies = tf.data.TFRecordDataset(movie_files, compression_type="GZIP")
movies_dataset = extract_str_feature(movies, 'movie_title')
index.index_from_dataset(
tf.data.Dataset.zip((
movies_dataset.batch(100),
movies_dataset.batch(100).map(model.movie_model))
)
)
# Run once so that we can get the right signatures into SavedModel
_, titles = index(tf.constant(["42"]))
print(f"Recommendations for user 42: {titles[0, :3]}")
signatures = {
'serving_default':
_get_serve_tf_examples_fn(index,
tf_transform_output).get_concrete_function(
tf.TensorSpec(
shape=[None],
dtype=tf.string,
name='examples')),
}
index.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)
except BaseException as err:
logging.error('######## ERROR IN run_fn during export:\n{}\n###############'.format(err))
Training the model¶
After defining the model, we can run the Trainer component to do the model training.
trainer = tfx.components.Trainer(
module_file=os.path.abspath(_trainer_module_file),
examples=ratings_transform.outputs['transformed_examples'],
transform_graph=ratings_transform.outputs['transform_graph'],
schema=ratings_transform.outputs['post_transform_schema'],
train_args=tfx.proto.TrainArgs(num_steps=500),
eval_args=tfx.proto.EvalArgs(num_steps=10),
custom_config={
'epochs':5,
'movies':movies_transform.outputs['transformed_examples'],
'movie_schema':movies_transform.outputs['post_transform_schema'],
'ratings':ratings_transform.outputs['transformed_examples'],
'ratings_schema':ratings_transform.outputs['post_transform_schema']
})
context.run(trainer, enable_cache=False)
Exporting the model¶
After training the model, we can use the Pusher component to export the model.
_serving_model_dir = os.path.join(tempfile.mkdtemp(), 'serving_model/tfrs_retrieval')
pusher = tfx.components.Pusher(
model=trainer.outputs['model'],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory=_serving_model_dir)))
context.run(pusher, enable_cache=True)
Make predictions¶
Now that we have a model, we load it back and make predictions.
loaded = tf.saved_model.load(pusher.outputs['pushed_model'].get()[0].uri)
scores, titles = loaded(["42"])
print(f"Recommendations: {titles[0][:3]}")
Next step¶
In this tutorial, you have learned how to implement a retrieval model with TensorFlow Recommenders and TFX. To expand on what is presented here, have a look at the TFRS ranking with TFX tutorial.