TFX Pipelines - Titanic Dataset
import tempfile
import tensorflow as tf
import urllib.request
import os
import pandas as pd
import shutil
import tensorflow_data_validation as tfdv
import tensorflow_model_analysis as tfma
from absl import logging
from pathlib import Path
from tensorflow_metadata.proto.v0 import schema_pb2
from tfx import v1 as tfx
print(f"TensorFlow version: {tf.__version__}")
print(f"TFX version: {tfx.__version__}")
print(f"TensorFlow Data Validation version: {tfdv.__version__}")
logging.set_verbosity(logging.INFO)
DATA_DIRECTORY = "titanic"
DATA_SOURCE_TRAIN_PATH = Path(DATA_DIRECTORY) / "titanic-train.csv"
DATA_SOURCE_TEST_PATH = Path(DATA_DIRECTORY) / "titanic-test.csv"
DATA_TRAIN_FILENAME = "train.csv"
DATA_EVAL_FILENAME = "eval.csv"
DATA_TEST_FILENAME = "test.csv"
PIPELINE_NAME = "titanic-pipeline"
PIPELINE_DIRECTORY = Path("pipelines") / PIPELINE_NAME
METADATA_PATH = Path("metadata") / PIPELINE_NAME / "metadata.db"
SCHEMA_DIRECTORY = PIPELINE_DIRECTORY / "schema"
SCHEMA_FILENAME = str(Path(SCHEMA_DIRECTORY) / "schema.pbtxt")
MODEL_DIRECTORY = Path("model")
train_df = pd.read_csv(DATA_SOURCE_TRAIN_PATH)
test_df = pd.read_csv(DATA_SOURCE_TEST_PATH)
datasets = [train_df, test_df]
for dataset in datasets:
dataset.drop(["PassengerId", "Name", "Ticket"], axis=1, inplace=True)
dataset.Fare = dataset.Fare.fillna(train_df.Fare.median())
dataset.Age = dataset.Age.fillna(train_df.Age.median()).astype(int)
dataset.Embarked = dataset.Embarked.fillna("S")
# Let's save the modified data back to the disk. Notice we are saving the
# training data twice (as both train and eval.)
train_df.to_csv(Path(DATA_DIRECTORY) / DATA_TRAIN_FILENAME, index=False)
train_df.to_csv(Path(DATA_DIRECTORY) / DATA_EVAL_FILENAME, index=False)
test_df.to_csv(Path(DATA_DIRECTORY) / DATA_TEST_FILENAME, index=False)
Common functions
def _examples(df):
"""
Converts a DataFrame into a serialized list of examples supported by the model.
"""
examples = []
for index, row in df.iterrows():
features = {
"Pclass": tf.train.Feature(int64_list=tf.train.Int64List(value=[row.Pclass])),
"Sex": tf.train.Feature(bytes_list=tf.train.BytesList(value=[bytes(str(row.Sex), encoding="raw_unicode_escape")])),
"Age": tf.train.Feature(int64_list=tf.train.Int64List(value=[row.Age])),
"SibSp": tf.train.Feature(int64_list=tf.train.Int64List(value=[row.SibSp])),
"Parch": tf.train.Feature(int64_list=tf.train.Int64List(value=[row.Parch])),
"Fare": tf.train.Feature(float_list=tf.train.FloatList(value=[row.Fare])),
"Cabin": tf.train.Feature(bytes_list=tf.train.BytesList(value=[bytes(str(row.Cabin), encoding="raw_unicode_escape")])),
"Embarked": tf.train.Feature(bytes_list=tf.train.BytesList(value=[bytes(str(row.Embarked), encoding="raw_unicode_escape")]))
}
example_proto = tf.train.Example(features=tf.train.Features(feature=features))
examples.append(example_proto.SerializeToString())
return examples
def get_inference_fn(model_directory):
"""
Returns the inference function of the latest published model.
"""
model_directories = (d for d in os.scandir(model_directory) if d.is_dir())
model_path = max(model_directories, key=lambda i: int(i.name)).path
loaded_model = tf.keras.models.load_model(model_path)
return loaded_model.signatures["serving_default"]
Running each component interactively
from tfx.orchestration.experimental.interactive.interactive_context import (
InteractiveContext
)
context = InteractiveContext()
Loading the data
from tfx.proto import example_gen_pb2
# Both train.csv and eval.csv contain the same data.
input_config = tfx.proto.Input(splits=[
example_gen_pb2.Input.Split(name='train', pattern=DATA_TRAIN_FILENAME),
example_gen_pb2.Input.Split(name='eval', pattern=DATA_EVAL_FILENAME)
])
example_gen = tfx.components.CsvExampleGen(
input_base=DATA_DIRECTORY,
input_config=input_config
)
context.run(example_gen)
Computing statistics about the data
statistics_gen = tfx.components.StatisticsGen(
examples=example_gen.outputs["examples"]
)
context.run(statistics_gen)
context.show(statistics_gen.outputs["statistics"])
Inferring the schema from the data
schema_gen = tfx.components.SchemaGen(
statistics=statistics_gen.outputs["statistics"],
infer_feature_shape=True
)
context.run(schema_gen)
context.show(schema_gen.outputs["schema"])
Adjusting the schema
schema = tfdv.load_schema_text(
os.path.join(schema_gen.outputs['schema']._artifacts[0].uri, "schema.pbtxt")
)
# Let's add the two environments we need.
schema.default_environment.append("TRAINING")
schema.default_environment.append("SERVING")
# We don't have the `Survived` column in the SERVING environment
tfdv.get_feature(schema, "Survived").not_in_environment.append("SERVING")
# Let's adjust the `Cabin` feature to not requiere a specific percentage
# of values.
tfdv.get_feature(schema, "Cabin").presence.min_fraction = 0.0
# We can now display the updated schema
tfdv.display_schema(schema=schema)
!mkdir -p {SCHEMA_DIRECTORY}
tfdv.write_schema_text(schema, SCHEMA_FILENAME)
# Let's load the updated schema and display it
schema_importer = tfx.dsl.Importer(
source_uri=str(SCHEMA_DIRECTORY),
artifact_type=tfx.types.standard_artifacts.Schema
).with_id("schema_importer")
context.run(schema_importer)
context.show(schema_importer.outputs["result"])
Validating the data using the updated schema
example_validator = tfx.components.ExampleValidator(
statistics=statistics_gen.outputs["statistics"],
schema=schema_importer.outputs["result"]
)
context.run(example_validator)
context.show(example_validator.outputs["anomalies"])
Transforming the data
CONSTANTS_MODULE_PATH = "titanic_constants.py"
%%writefile {CONSTANTS_MODULE_PATH}
LABEL = "Survived"
TRANSFORM_MODULE_PATH = "titanic_transform.py"
%%writefile {TRANSFORM_MODULE_PATH}
import tensorflow as tf
import tensorflow_transform as tft
from tfx import v1 as tfx
import titanic_constants
LABEL = titanic_constants.LABEL
EMBARKED = ["S", "C", "Q"]
def preprocessing_fn(inputs):
"""
This is the Transform's callback function that preprocesses the
input data.
Args:
inputs: The map from feature keys to raw values that need to be
transformed.
Returns:
A map from feature keys to the transformed values.
"""
outputs = dict()
# Let's categorical-encode `Embarked`.
embarked_input = _fillna(inputs["Embarked"], "S")
embarked_initializer = tf.lookup.KeyValueTensorInitializer(
keys=EMBARKED,
values=tf.cast(tf.range(len(EMBARKED)), tf.int64),
key_dtype=tf.string,
value_dtype=tf.int64,
)
embarked_table = tf.lookup.StaticHashTable(embarked_initializer, default_value=-1)
outputs["embarked"] = embarked_table.lookup(embarked_input)
# We are going to create a new feature `has_a_cabin` that indicates
# whether the passenger had a cabin in the ship.
cabin = _fillna(inputs["Cabin"], "")
outputs["has_a_cabin"] = tf.cast(tf.greater(tf.strings.length(cabin), 0), tf.int64)
# Let's now create another new feature `is_traveling_alone` that
# indicates whether the passenger was traveling alone.
sibsp = _fillna(inputs["SibSp"])
parch = _fillna(inputs["Parch"])
family_size = tf.math.add(sibsp, parch)
outputs["is_traveling_alone"] = tf.cast(tf.equal(family_size, 0), tf.int64)
# Let's categorical-encode `Sex`.
outputs["sex"] = tf.cast(tf.equal(inputs["Sex"], "male"), tf.int64)
# Let's bucketize `Fare` into 4 different buckets.
fare = _fillna(inputs["Fare"])
outputs["fare"] = tf.cast(tf.where(
tf.less_equal(fare, 7.91), 0,
tf.where(tf.math.logical_and(tf.greater(fare, 7.91), tf.less_equal(fare, 14.454)), 1,
tf.where(tf.math.logical_and(tf.greater(fare, 14.454), tf.less_equal(fare, 31)), 2, 3))), tf.int64)
# We are going to keep `Pclass` as is.
outputs["pclass"] = inputs["Pclass"]
outputs[LABEL] = inputs[LABEL]
return outputs
def _fillna(t, value=0):
"""
Replaces missing values in a SparseTensor with the supplied value.
Args:
t: A `SparseTensor` of rank 2. Its dense shape should have size at most 1
in the second dimension.
Returns:
A rank 1 tensor where missing values have been filled in.
"""
if not isinstance(t, tf.sparse.SparseTensor):
return t
return tf.squeeze(tf.sparse.to_dense(
tf.SparseTensor(
t.indices,
t.values,
[t.dense_shape[0], 1]
), value),
axis=1)
transform = tfx.components.Transform(
examples=example_gen.outputs["examples"],
schema=schema_importer.outputs["result"],
module_file=os.path.abspath(TRANSFORM_MODULE_PATH),
)
context.run(transform, enable_cache=False)
train_uri = os.path.join(
transform.outputs['transformed_examples'].get()[0].uri,
'Split-train'
)
tfrecord_filenames = [
os.path.join(train_uri, name)
for name in os.listdir(train_uri)
]
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
for tfrecord in dataset.take(3):
serialized_example = tfrecord.numpy()
example = tf.train.Example()
example.ParseFromString(serialized_example)
print(example)
Training a model
TRAINER_MODULE_PATH = "titanic_trainer.py"
%%writefile {TRAINER_MODULE_PATH}
import tensorflow as tf
import tensorflow_decision_forests as tfdf
import tensorflow_transform as tft
from absl import logging
from tensorflow.keras import layers, Model, optimizers, losses, metrics
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from typing import List, Text
import titanic_constants
LABEL = titanic_constants.LABEL
BATCH_SIZE = 32
EPOCHS = 10
def _input_fn(
file_pattern: List[Text],
data_accessor: tfx.components.DataAccessor,
tf_transform_output: tft.TFTransformOutput,
batch_size: int,
) -> tf.data.Dataset:
"""
Generates a dataset of features that can be used to train
and evaluate the model.
Args:
file_pattern: List of paths or patterns of input data files.
data_accessor: An instance of DataAccessor that we can use to
convert the input to a RecordBatch.
tf_transform_output: The transformation output.
batch_size: The number of consecutive elements that we should
combine in a single batch.
Returns:
A dataset that contains a tuple of (features, indices) where
features is a dictionary of Tensors, and indices is a single
Tensor of label indices.
"""
dataset = data_accessor.tf_dataset_factory(
file_pattern,
tfxio.TensorFlowDatasetOptions(batch_size=batch_size),
schema=tf_transform_output.raw_metadata.schema,
)
tft_layer = tf_transform_output.transform_features_layer()
def apply_transform(raw_features):
transformed_features = tft_layer(raw_features)
transformed_label = transformed_features.pop(LABEL)
return transformed_features, transformed_label
return dataset.map(apply_transform).repeat()
def _get_serve_tf_examples_fn(model, tf_transform_output):
"""
Returns a function that parses a serialized tf.Example and applies
the transformations during inference.
Args:
model: The model that we are serving.
tf_transform_output: The transformation output that we want to
include with the model.
"""
# Let's make sure we set up the model's tft_layer.
model.tft_layer = tf_transform_output.transform_features_layer()
@tf.function(input_signature=[
tf.TensorSpec(shape=[None], dtype=tf.string, name="examples")
])
def serve_tf_examples_fn(serialized_tf_examples):
feature_spec = tf_transform_output.raw_feature_spec()
# We need to make sure we don't include the target column
# as part of the required list of features. This is important
# because the target column will not be provided when the model
# is deployed.
required_feature_spec = {
k: v for k, v in feature_spec.items() if k != LABEL
}
parsed_features = tf.io.parse_example(
serialized_tf_examples,
required_feature_spec
)
transformed_features = model.tft_layer(parsed_features)
# if LABEL in parsed_features:
# transformed_features.pop(LABEL)
# Run inference with the model on the transformed features.
return model(transformed_features)
return serve_tf_examples_fn
def _model() -> tf.keras.Model:
"""
Creates the Keras model.
"""
inputs = [
layers.Input(shape=(1,), name="embarked"),
layers.Input(shape=(1,), name="fare"),
layers.Input(shape=(1,), name="has_a_cabin"),
layers.Input(shape=(1,), name="is_traveling_alone"),
layers.Input(shape=(1,), name="pclass"),
layers.Input(shape=(1,), name="sex"),
]
x = layers.concatenate(inputs)
x = layers.Dense(8, activation="relu")(x)
x = layers.Dense(8, activation="relu")(x)
outputs = layers.Dense(1, activation="sigmoid")(x)
model = Model(inputs=inputs, outputs=outputs)
model.compile(
optimizer=optimizers.Adam(1e-2),
loss="binary_crossentropy",
metrics=[metrics.BinaryAccuracy()],
)
model.summary(print_fn=logging.info)
return model
def run_fn(fn_args: tfx.components.FnArgs):
"""
The callback function that will be called by the Trainer component
to train the model using the suplied arguments.
Args:
fn_args: A collection of name/value pairs representing the
arguments to train the model.
"""
tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
train_dataset = _input_fn(
fn_args.train_files,
fn_args.data_accessor,
tf_transform_output,
batch_size=BATCH_SIZE,
)
eval_dataset = _input_fn(
fn_args.eval_files,
fn_args.data_accessor,
tf_transform_output,
batch_size=BATCH_SIZE,
)
model = _model()
model.fit(
train_dataset,
steps_per_epoch=fn_args.train_steps,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps,
epochs=EPOCHS
)
# We need to modify the default signature to include the transform layer in
# the computational graph.
signatures = {
"serving_default": _get_serve_tf_examples_fn(model, tf_transform_output),
}
model.save(fn_args.serving_model_dir, save_format="tf", signatures=signatures)
trainer = tfx.components.Trainer(
examples=example_gen.outputs["examples"],
transform_graph=transform.outputs["transform_graph"],
train_args=tfx.proto.TrainArgs(num_steps=100),
eval_args=tfx.proto.EvalArgs(num_steps=5),
module_file=os.path.abspath(TRAINER_MODULE_PATH),
)
context.run(trainer, enable_cache=False)
Evaluating the model
eval_config = tfma.EvalConfig(
model_specs=[
tfma.ModelSpec(
signature_name="serving_default",
preprocessing_function_names=["tft_layer"],
label_key="Survived",
)
],
metrics_specs=[
tfma.MetricsSpec(
per_slice_thresholds={
"binary_accuracy": tfma.config.PerSliceMetricThresholds(
thresholds=[
tfma.PerSliceMetricThreshold(
slicing_specs=[tfma.SlicingSpec()],
threshold=tfma.MetricThreshold(
value_threshold=tfma.GenericValueThreshold(
lower_bound={"value": 0.7}
),
change_threshold=tfma.GenericChangeThreshold(
direction=tfma.MetricDirection.HIGHER_IS_BETTER,
absolute={"value": -1e-10},
),
),
)
]
),
}
)
],
slicing_specs=[
tfma.SlicingSpec(),
tfma.SlicingSpec(feature_keys=["sex"])
],
)
model_resolver = tfx.dsl.Resolver(
strategy_class=tfx.dsl.experimental.LatestBlessedModelStrategy,
model=tfx.dsl.Channel(type=tfx.types.standard_artifacts.Model),
model_blessing=tfx.dsl.Channel(type=tfx.types.standard_artifacts.ModelBlessing),
).with_id("latest_blessed_model_resolver")
context.run(model_resolver)
evaluator = tfx.components.Evaluator(
examples=example_gen.outputs["examples"],
model=trainer.outputs["model"],
eval_config=eval_config,
baseline_model=model_resolver.outputs["model"],
)
context.run(evaluator, enable_cache=False)
Pushing the model
pusher = tfx.components.Pusher(
model=trainer.outputs["model"],
model_blessing=evaluator.outputs["blessing"],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory=str(MODEL_DIRECTORY)
)
),
)
context.run(pusher)
Running inference
inference_fn = get_inference_fn(MODEL_DIRECTORY)
result = inference_fn(examples=tf.constant(_examples(test_df)))
print(result["output_0"].numpy())
Pipeline
Setting up the schema
!mkdir -p {SCHEMA_DIRECTORY}
%%writefile {SCHEMA_FILENAME}
feature {
name: "Embarked"
type: BYTES
domain: "Embarked"
presence {
min_fraction: 1.0
min_count: 1
}
shape {
dim {
size: 1
}
}
}
feature {
name: "Sex"
type: BYTES
domain: "Sex"
presence {
min_fraction: 1.0
min_count: 1
}
shape {
dim {
size: 1
}
}
}
feature {
name: "Cabin"
type: BYTES
presence {
min_fraction: 0.0
min_count: 1
}
}
feature {
name: "Age"
type: INT
presence {
min_fraction: 1.0
min_count: 1
}
shape {
dim {
size: 1
}
}
}
feature {
name: "Fare"
type: FLOAT
presence {
min_fraction: 1.0
min_count: 1
}
shape {
dim {
size: 1
}
}
}
feature {
name: "Parch"
type: INT
presence {
min_fraction: 1.0
min_count: 1
}
shape {
dim {
size: 1
}
}
}
feature {
name: "Pclass"
type: INT
presence {
min_fraction: 1.0
min_count: 1
}
shape {
dim {
size: 1
}
}
}
feature {
name: "SibSp"
type: INT
presence {
min_fraction: 1.0
min_count: 1
}
shape {
dim {
size: 1
}
}
}
feature {
name: "Survived"
type: INT
bool_domain {
}
presence {
min_fraction: 1.0
min_count: 1
}
not_in_environment: "SERVING"
shape {
dim {
size: 1
}
}
}
string_domain {
name: "Embarked"
value: "C"
value: "Q"
value: "S"
}
string_domain {
name: "Sex"
value: "female"
value: "male"
}
default_environment: "TRAINING"
default_environment: "SERVING"
Running the pipeline
import tensorflow_model_analysis as tfma
def create_pipeline(
pipeline_name: str,
pipeline_directory: str,
data_directory: str,
schema_path: str,
model_directory: str,
metadata_path: str,
transform_module_path: str,
trainer_module_path: str,
) -> tfx.dsl.Pipeline:
input_config = tfx.proto.Input(splits=[
example_gen_pb2.Input.Split(name='train', pattern=DATA_TRAIN_FILENAME),
example_gen_pb2.Input.Split(name='eval', pattern=DATA_EVAL_FILENAME)
])
example_gen = tfx.components.CsvExampleGen(
input_base=DATA_DIRECTORY,
input_config=input_config
)
statistics_gen = tfx.components.StatisticsGen(
examples=example_gen.outputs["examples"]
)
schema_importer = tfx.dsl.Importer(
source_uri=schema_path,
artifact_type=tfx.types.standard_artifacts.Schema
).with_id("schema_importer")
example_validator = tfx.components.ExampleValidator(
statistics=statistics_gen.outputs["statistics"],
schema=schema_importer.outputs["result"]
)
transform = tfx.components.Transform(
examples=example_gen.outputs["examples"],
schema=schema_importer.outputs["result"],
module_file=os.path.abspath(transform_module_path),
)
trainer = tfx.components.Trainer(
module_file=os.path.abspath(trainer_module_path),
examples=example_gen.outputs["examples"],
transform_graph=transform.outputs["transform_graph"],
train_args=tfx.proto.TrainArgs(num_steps=100),
eval_args=tfx.proto.EvalArgs(num_steps=5),
)
eval_config = tfma.EvalConfig(
model_specs=[
tfma.ModelSpec(
signature_name="serving_default",
preprocessing_function_names=["tft_layer"],
label_key="Survived",
)
],
metrics_specs=[
tfma.MetricsSpec(
per_slice_thresholds={
"binary_accuracy": tfma.config.PerSliceMetricThresholds(
thresholds=[
tfma.PerSliceMetricThreshold(
slicing_specs=[tfma.SlicingSpec()],
threshold=tfma.MetricThreshold(
value_threshold=tfma.GenericValueThreshold(
lower_bound={"value": 0.7}
),
change_threshold=tfma.GenericChangeThreshold(
direction=tfma.MetricDirection.HIGHER_IS_BETTER,
absolute={"value": -1e-10},
),
),
)
]
),
}
)
],
slicing_specs=[
tfma.SlicingSpec(),
tfma.SlicingSpec(feature_keys=["sex"])
],
)
model_resolver = tfx.dsl.Resolver(
strategy_class=tfx.dsl.experimental.LatestBlessedModelStrategy,
model=tfx.dsl.Channel(type=tfx.types.standard_artifacts.Model),
model_blessing=tfx.dsl.Channel(type=tfx.types.standard_artifacts.ModelBlessing),
).with_id("latest_blessed_model_resolver")
evaluator = tfx.components.Evaluator(
examples=example_gen.outputs["examples"],
model=trainer.outputs["model"],
eval_config=eval_config,
baseline_model=model_resolver.outputs["model"],
)
pusher = tfx.components.Pusher(
model=trainer.outputs["model"],
model_blessing=evaluator.outputs["blessing"],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory=model_directory
)
),
)
components = [
example_gen,
statistics_gen,
schema_importer,
example_validator,
transform,
trainer,
model_resolver,
evaluator,
pusher,
]
return tfx.dsl.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_directory,
metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(
metadata_path
),
components=components,
)
tfx.orchestration.LocalDagRunner().run(
create_pipeline(
pipeline_name=PIPELINE_NAME,
pipeline_directory=str(PIPELINE_DIRECTORY),
data_directory=DATA_DIRECTORY,
schema_path=str(SCHEMA_DIRECTORY),
model_directory=str(MODEL_DIRECTORY),
metadata_path=str(METADATA_PATH),
transform_module_path=TRANSFORM_MODULE_PATH,
trainer_module_path=TRAINER_MODULE_PATH
)
)
Running inference
inference_fn = get_inference_fn(MODEL_DIRECTORY)
result = inference_fn(examples=tf.constant(_examples(test_df)))
print(result["output_0"].numpy())