TFX Pipelines - Titanic Dataset
import tempfile
import tensorflow as tf
import urllib.request
import os
import pandas as pd
import shutil
import tensorflow_data_validation as tfdv
import tensorflow_model_analysis as tfma
from absl import logging
from pathlib import Path
from tensorflow_metadata.proto.v0 import schema_pb2
from tfx import v1 as tfx
print(f"TensorFlow version: {tf.__version__}")
print(f"TFX version: {tfx.__version__}")
print(f"TensorFlow Data Validation version: {tfdv.__version__}")
logging.set_verbosity(logging.INFO)
DATA_DIRECTORY = "titanic"
DATA_SOURCE_TRAIN_PATH = Path(DATA_DIRECTORY) / "titanic-train.csv"
DATA_SOURCE_TEST_PATH = Path(DATA_DIRECTORY) / "titanic-test.csv"
DATA_TRAIN_FILENAME = "train.csv"
DATA_EVAL_FILENAME = "eval.csv"
DATA_TEST_FILENAME = "test.csv"
PIPELINE_NAME = "titanic-pipeline"
PIPELINE_DIRECTORY = Path("pipelines") / PIPELINE_NAME
METADATA_PATH = Path("metadata") / PIPELINE_NAME / "metadata.db"
SCHEMA_DIRECTORY = PIPELINE_DIRECTORY / "schema"
SCHEMA_FILENAME = str(Path(SCHEMA_DIRECTORY) / "schema.pbtxt")
MODEL_DIRECTORY = Path("model")
train_df = pd.read_csv(DATA_SOURCE_TRAIN_PATH)
test_df = pd.read_csv(DATA_SOURCE_TEST_PATH)
datasets = [train_df, test_df]
for dataset in datasets:
    dataset.drop(["PassengerId", "Name", "Ticket"], axis=1, inplace=True)
    dataset.Fare = dataset.Fare.fillna(train_df.Fare.median())
    dataset.Age = dataset.Age.fillna(train_df.Age.median()).astype(int)
    dataset.Embarked = dataset.Embarked.fillna("S")
# Let's save the modified data back to the disk. Notice we are saving the
# training data twice (as both train and eval.)
train_df.to_csv(Path(DATA_DIRECTORY) / DATA_TRAIN_FILENAME, index=False)
train_df.to_csv(Path(DATA_DIRECTORY) / DATA_EVAL_FILENAME, index=False)
test_df.to_csv(Path(DATA_DIRECTORY) / DATA_TEST_FILENAME, index=False)
Common functions
def _examples(df):
    """
    Converts a DataFrame into a serialized list of examples supported by the model.
    """
    examples = []
    for index, row in df.iterrows():
        features = {
            "Pclass": tf.train.Feature(int64_list=tf.train.Int64List(value=[row.Pclass])),
            "Sex": tf.train.Feature(bytes_list=tf.train.BytesList(value=[bytes(str(row.Sex), encoding="raw_unicode_escape")])),
            "Age": tf.train.Feature(int64_list=tf.train.Int64List(value=[row.Age])),
            "SibSp": tf.train.Feature(int64_list=tf.train.Int64List(value=[row.SibSp])),
            "Parch": tf.train.Feature(int64_list=tf.train.Int64List(value=[row.Parch])),
            "Fare": tf.train.Feature(float_list=tf.train.FloatList(value=[row.Fare])),
            "Cabin": tf.train.Feature(bytes_list=tf.train.BytesList(value=[bytes(str(row.Cabin), encoding="raw_unicode_escape")])),
            "Embarked": tf.train.Feature(bytes_list=tf.train.BytesList(value=[bytes(str(row.Embarked), encoding="raw_unicode_escape")]))
        }
        example_proto = tf.train.Example(features=tf.train.Features(feature=features))
        examples.append(example_proto.SerializeToString())
    return examples
def get_inference_fn(model_directory):
    """
    Returns the inference function of the latest published model.
    """
    model_directories = (d for d in os.scandir(model_directory) if d.is_dir())
    model_path = max(model_directories, key=lambda i: int(i.name)).path
    loaded_model = tf.keras.models.load_model(model_path)
    return loaded_model.signatures["serving_default"]
Running each component interactively
from tfx.orchestration.experimental.interactive.interactive_context import (
    InteractiveContext
)
context = InteractiveContext()
Loading the data
from tfx.proto import example_gen_pb2
# Both train.csv and eval.csv contain the same data.
input_config = tfx.proto.Input(splits=[
    example_gen_pb2.Input.Split(name='train', pattern=DATA_TRAIN_FILENAME),
    example_gen_pb2.Input.Split(name='eval', pattern=DATA_EVAL_FILENAME)
])
example_gen = tfx.components.CsvExampleGen(
    input_base=DATA_DIRECTORY,
    input_config=input_config
)
context.run(example_gen)
Computing statistics about the data
statistics_gen = tfx.components.StatisticsGen(
    examples=example_gen.outputs["examples"]
)
context.run(statistics_gen)
context.show(statistics_gen.outputs["statistics"])
Inferring the schema from the data
schema_gen = tfx.components.SchemaGen(
    statistics=statistics_gen.outputs["statistics"],
    infer_feature_shape=True
)
context.run(schema_gen)
context.show(schema_gen.outputs["schema"])
Adjusting the schema
schema = tfdv.load_schema_text(
    os.path.join(schema_gen.outputs['schema']._artifacts[0].uri, "schema.pbtxt")
)
# Let's add the two environments we need.
schema.default_environment.append("TRAINING")
schema.default_environment.append("SERVING")
# We don't have the `Survived` column in the SERVING environment
tfdv.get_feature(schema, "Survived").not_in_environment.append("SERVING")
# Let's adjust the `Cabin` feature to not requiere a specific percentage
# of values.
tfdv.get_feature(schema, "Cabin").presence.min_fraction = 0.0
# We can now display the updated schema
tfdv.display_schema(schema=schema)
!mkdir -p {SCHEMA_DIRECTORY}
tfdv.write_schema_text(schema, SCHEMA_FILENAME)
# Let's load the updated schema and display it
schema_importer = tfx.dsl.Importer(
    source_uri=str(SCHEMA_DIRECTORY),
    artifact_type=tfx.types.standard_artifacts.Schema
).with_id("schema_importer")
context.run(schema_importer)
context.show(schema_importer.outputs["result"])
Validating the data using the updated schema
example_validator = tfx.components.ExampleValidator(
    statistics=statistics_gen.outputs["statistics"],
    schema=schema_importer.outputs["result"]
)
context.run(example_validator)
context.show(example_validator.outputs["anomalies"])
Transforming the data
CONSTANTS_MODULE_PATH = "titanic_constants.py"
%%writefile {CONSTANTS_MODULE_PATH}
LABEL = "Survived"
TRANSFORM_MODULE_PATH = "titanic_transform.py"
%%writefile {TRANSFORM_MODULE_PATH}
import tensorflow as tf
import tensorflow_transform as tft
from tfx import v1 as tfx
import titanic_constants
LABEL = titanic_constants.LABEL
EMBARKED = ["S", "C", "Q"]
def preprocessing_fn(inputs):
    """
    This is the Transform's callback function that preprocesses the 
    input data.
    Args:
        inputs: The map from feature keys to raw values that need to be
            transformed.
    Returns:
        A map from feature keys to the transformed values.
    """
    outputs = dict()
    # Let's categorical-encode `Embarked`.
    embarked_input = _fillna(inputs["Embarked"], "S")
    embarked_initializer = tf.lookup.KeyValueTensorInitializer(
        keys=EMBARKED,
        values=tf.cast(tf.range(len(EMBARKED)), tf.int64),
        key_dtype=tf.string,
        value_dtype=tf.int64,
    )
    embarked_table = tf.lookup.StaticHashTable(embarked_initializer, default_value=-1)
    outputs["embarked"] = embarked_table.lookup(embarked_input)
    # We are going to create a new feature `has_a_cabin` that indicates
    # whether the passenger had a cabin in the ship.
    cabin = _fillna(inputs["Cabin"], "")
    outputs["has_a_cabin"] = tf.cast(tf.greater(tf.strings.length(cabin), 0), tf.int64)
    # Let's now create another new feature `is_traveling_alone` that
    # indicates whether the passenger was traveling alone.
    sibsp = _fillna(inputs["SibSp"])
    parch = _fillna(inputs["Parch"])
    family_size = tf.math.add(sibsp, parch)
    outputs["is_traveling_alone"] = tf.cast(tf.equal(family_size, 0), tf.int64)
    # Let's categorical-encode `Sex`.
    outputs["sex"] = tf.cast(tf.equal(inputs["Sex"], "male"), tf.int64)
    # Let's bucketize `Fare` into 4 different buckets.
    fare = _fillna(inputs["Fare"])
    outputs["fare"] = tf.cast(tf.where(
        tf.less_equal(fare, 7.91), 0, 
        tf.where(tf.math.logical_and(tf.greater(fare, 7.91), tf.less_equal(fare, 14.454)), 1, 
        tf.where(tf.math.logical_and(tf.greater(fare, 14.454), tf.less_equal(fare, 31)), 2, 3))), tf.int64)
    # We are going to keep `Pclass` as is.
    outputs["pclass"] = inputs["Pclass"]
    outputs[LABEL] = inputs[LABEL]
    return outputs
def _fillna(t, value=0):
    """
    Replaces missing values in a SparseTensor with the supplied value.
    Args:
        t: A `SparseTensor` of rank 2.  Its dense shape should have size at most 1
        in the second dimension.
    Returns:
        A rank 1 tensor where missing values have been filled in.
    """
    if not isinstance(t, tf.sparse.SparseTensor):
        return t
    return tf.squeeze(tf.sparse.to_dense(
        tf.SparseTensor(
            t.indices, 
            t.values, 
            [t.dense_shape[0], 1]
        ), value),
      axis=1)
transform = tfx.components.Transform(
    examples=example_gen.outputs["examples"],
    schema=schema_importer.outputs["result"],
    module_file=os.path.abspath(TRANSFORM_MODULE_PATH),
)
context.run(transform, enable_cache=False)
train_uri = os.path.join(
    transform.outputs['transformed_examples'].get()[0].uri, 
    'Split-train'
)
tfrecord_filenames = [
    os.path.join(train_uri, name)
    for name in os.listdir(train_uri)
]
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
for tfrecord in dataset.take(3):
  serialized_example = tfrecord.numpy()
  example = tf.train.Example()
  example.ParseFromString(serialized_example)
  print(example)
Training a model
TRAINER_MODULE_PATH = "titanic_trainer.py"
%%writefile {TRAINER_MODULE_PATH}
import tensorflow as tf
import tensorflow_decision_forests as tfdf
import tensorflow_transform as tft
from absl import logging
from tensorflow.keras import layers, Model, optimizers, losses, metrics
from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from typing import List, Text
import titanic_constants
LABEL = titanic_constants.LABEL
BATCH_SIZE = 32
EPOCHS = 10
def _input_fn(
    file_pattern: List[Text],
    data_accessor: tfx.components.DataAccessor,
    tf_transform_output: tft.TFTransformOutput,
    batch_size: int,
) -> tf.data.Dataset:
    """
    Generates a dataset of features that can be used to train
    and evaluate the model.
    Args:
        file_pattern: List of paths or patterns of input data files.
        data_accessor: An instance of DataAccessor that we can use to
            convert the input to a RecordBatch.
        tf_transform_output: The transformation output.
        batch_size: The number of consecutive elements that we should
            combine in a single batch.
    Returns:
        A dataset that contains a tuple of (features, indices) where 
            features is a dictionary of Tensors, and indices is a single
            Tensor of label indices.
    """
    dataset = data_accessor.tf_dataset_factory(
        file_pattern,
        tfxio.TensorFlowDatasetOptions(batch_size=batch_size),
        schema=tf_transform_output.raw_metadata.schema,
    )
    tft_layer = tf_transform_output.transform_features_layer()
    def apply_transform(raw_features):
        transformed_features = tft_layer(raw_features)
        transformed_label = transformed_features.pop(LABEL)
        return transformed_features, transformed_label
    return dataset.map(apply_transform).repeat()
def _get_serve_tf_examples_fn(model, tf_transform_output):
    """
    Returns a function that parses a serialized tf.Example and applies
    the transformations during inference.
    Args:
        model: The model that we are serving.
        tf_transform_output: The transformation output that we want to 
            include with the model.
    """
    # Let's make sure we set up the model's tft_layer.
    model.tft_layer = tf_transform_output.transform_features_layer()
    def serve_tf_examples_fn(serialized_tf_examples):
        feature_spec = tf_transform_output.raw_feature_spec()
        # We need to make sure we don't include the target column
        # as part of the required list of features. This is important
        # because the target column will not be provided when the model
        # is deployed.
        required_feature_spec = {
            k: v for k, v in feature_spec.items() if k != LABEL
        }
        parsed_features = tf.io.parse_example(
            serialized_tf_examples, 
            required_feature_spec
        )
        
        transformed_features = model.tft_layer(parsed_features)
        # if LABEL in parsed_features:
        #     transformed_features.pop(LABEL)
        # Run inference with the model on the transformed features.
        return model(transformed_features)
    return serve_tf_examples_fn
def _model() -> tf.keras.Model:
    """
    Creates the Keras model.
    """
    inputs = [
        layers.Input(shape=(1,), name="embarked"),
        layers.Input(shape=(1,), name="fare"),
        layers.Input(shape=(1,), name="has_a_cabin"),
        layers.Input(shape=(1,), name="is_traveling_alone"),
        layers.Input(shape=(1,), name="pclass"),
        layers.Input(shape=(1,), name="sex"),
    ]
    x = layers.concatenate(inputs)
    x = layers.Dense(8, activation="relu")(x)
    x = layers.Dense(8, activation="relu")(x)
    outputs = layers.Dense(1, activation="sigmoid")(x)
    model = Model(inputs=inputs, outputs=outputs)
    model.compile(
        optimizer=optimizers.Adam(1e-2),
        loss="binary_crossentropy",
        metrics=[metrics.BinaryAccuracy()],
    )
    model.summary(print_fn=logging.info)
    return model
    
def run_fn(fn_args: tfx.components.FnArgs):
    """
    The callback function that will be called by the Trainer component
    to train the model using the suplied arguments.
    Args:
        fn_args: A collection of name/value pairs representing the 
            arguments to train the model.
    """
    tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
    train_dataset = _input_fn(
        fn_args.train_files,
        fn_args.data_accessor,
        tf_transform_output,
        batch_size=BATCH_SIZE,
    )
    eval_dataset = _input_fn(
        fn_args.eval_files,
        fn_args.data_accessor,
        tf_transform_output,
        batch_size=BATCH_SIZE,
    )
    model = _model()
    model.fit(
        train_dataset,
        steps_per_epoch=fn_args.train_steps,
        validation_data=eval_dataset,
        validation_steps=fn_args.eval_steps,
        epochs=EPOCHS
    )
    # We need to modify the default signature to include the transform layer in 
    # the computational graph.
    signatures = {
        "serving_default": _get_serve_tf_examples_fn(model, tf_transform_output),
    }
    model.save(fn_args.serving_model_dir, save_format="tf", signatures=signatures)
trainer = tfx.components.Trainer(
    examples=example_gen.outputs["examples"],
    transform_graph=transform.outputs["transform_graph"],
    train_args=tfx.proto.TrainArgs(num_steps=100),
    eval_args=tfx.proto.EvalArgs(num_steps=5),
    module_file=os.path.abspath(TRAINER_MODULE_PATH),
)
context.run(trainer, enable_cache=False)
Evaluating the model
eval_config = tfma.EvalConfig(
    model_specs=[
        tfma.ModelSpec(
            signature_name="serving_default",
            preprocessing_function_names=["tft_layer"],
            label_key="Survived",
        )
    ],
    metrics_specs=[
        tfma.MetricsSpec(
            per_slice_thresholds={
                "binary_accuracy": tfma.config.PerSliceMetricThresholds(
                    thresholds=[
                        tfma.PerSliceMetricThreshold(
                            slicing_specs=[tfma.SlicingSpec()],
                            threshold=tfma.MetricThreshold(
                                value_threshold=tfma.GenericValueThreshold(
                                    lower_bound={"value": 0.7}
                                ),
                                change_threshold=tfma.GenericChangeThreshold(
                                    direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                                    absolute={"value": -1e-10},
                                ),
                            ),
                        )
                    ]
                ),
            }
        )
    ],
    slicing_specs=[
        tfma.SlicingSpec(), 
        tfma.SlicingSpec(feature_keys=["sex"])
    ],
)
model_resolver = tfx.dsl.Resolver(
    strategy_class=tfx.dsl.experimental.LatestBlessedModelStrategy,
    model=tfx.dsl.Channel(type=tfx.types.standard_artifacts.Model),
    model_blessing=tfx.dsl.Channel(type=tfx.types.standard_artifacts.ModelBlessing),
).with_id("latest_blessed_model_resolver")
context.run(model_resolver)
evaluator = tfx.components.Evaluator(
    examples=example_gen.outputs["examples"],
    model=trainer.outputs["model"],
    eval_config=eval_config,
    baseline_model=model_resolver.outputs["model"],
)
context.run(evaluator, enable_cache=False)
Pushing the model
pusher = tfx.components.Pusher(
    model=trainer.outputs["model"],
    model_blessing=evaluator.outputs["blessing"],
    push_destination=tfx.proto.PushDestination(
        filesystem=tfx.proto.PushDestination.Filesystem(
            base_directory=str(MODEL_DIRECTORY)
        )
    ),
)
context.run(pusher)
Running inference
inference_fn = get_inference_fn(MODEL_DIRECTORY)
result = inference_fn(examples=tf.constant(_examples(test_df)))
print(result["output_0"].numpy())
Pipeline
Setting up the schema
!mkdir -p {SCHEMA_DIRECTORY}
%%writefile {SCHEMA_FILENAME}
feature {
  name: "Embarked"
  type: BYTES
  domain: "Embarked"
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "Sex"
  type: BYTES
  domain: "Sex"
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "Cabin"
  type: BYTES
  presence {
    min_fraction: 0.0
    min_count: 1
  }
}
feature {
  name: "Age"
  type: INT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "Fare"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "Parch"
  type: INT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "Pclass"
  type: INT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "SibSp"
  type: INT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "Survived"
  type: INT
  bool_domain {
  }
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  not_in_environment: "SERVING"
  shape {
    dim {
      size: 1
    }
  }
}
string_domain {
  name: "Embarked"
  value: "C"
  value: "Q"
  value: "S"
}
string_domain {
  name: "Sex"
  value: "female"
  value: "male"
}
default_environment: "TRAINING"
default_environment: "SERVING"
Running the pipeline
import tensorflow_model_analysis as tfma
def create_pipeline(
    pipeline_name: str,
    pipeline_directory: str,
    data_directory: str,
    schema_path: str,
    model_directory: str,
    metadata_path: str,
    transform_module_path: str,
    trainer_module_path: str,
) -> tfx.dsl.Pipeline:
    input_config = tfx.proto.Input(splits=[
        example_gen_pb2.Input.Split(name='train', pattern=DATA_TRAIN_FILENAME),
        example_gen_pb2.Input.Split(name='eval', pattern=DATA_EVAL_FILENAME)
    ])
    example_gen = tfx.components.CsvExampleGen(
        input_base=DATA_DIRECTORY,
        input_config=input_config
    )
    statistics_gen = tfx.components.StatisticsGen(
        examples=example_gen.outputs["examples"]
    )
    schema_importer = tfx.dsl.Importer(
        source_uri=schema_path,
        artifact_type=tfx.types.standard_artifacts.Schema
    ).with_id("schema_importer")
    example_validator = tfx.components.ExampleValidator(
        statistics=statistics_gen.outputs["statistics"],
        schema=schema_importer.outputs["result"]
    )
    transform = tfx.components.Transform(
        examples=example_gen.outputs["examples"],
        schema=schema_importer.outputs["result"],
        module_file=os.path.abspath(transform_module_path),
    )
    trainer = tfx.components.Trainer(
        module_file=os.path.abspath(trainer_module_path),
        examples=example_gen.outputs["examples"],
        transform_graph=transform.outputs["transform_graph"],
        train_args=tfx.proto.TrainArgs(num_steps=100),
        eval_args=tfx.proto.EvalArgs(num_steps=5),
    )
    eval_config = tfma.EvalConfig(
        model_specs=[
            tfma.ModelSpec(
                signature_name="serving_default",
                preprocessing_function_names=["tft_layer"],
                label_key="Survived",
            )
        ],
        metrics_specs=[
            tfma.MetricsSpec(
                per_slice_thresholds={
                    "binary_accuracy": tfma.config.PerSliceMetricThresholds(
                        thresholds=[
                            tfma.PerSliceMetricThreshold(
                                slicing_specs=[tfma.SlicingSpec()],
                                threshold=tfma.MetricThreshold(
                                    value_threshold=tfma.GenericValueThreshold(
                                        lower_bound={"value": 0.7}
                                    ),
                                    change_threshold=tfma.GenericChangeThreshold(
                                        direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                                        absolute={"value": -1e-10},
                                    ),
                                ),
                            )
                        ]
                    ),
                }
            )
        ],
        slicing_specs=[
            tfma.SlicingSpec(), 
            tfma.SlicingSpec(feature_keys=["sex"])
        ],
    )
    model_resolver = tfx.dsl.Resolver(
        strategy_class=tfx.dsl.experimental.LatestBlessedModelStrategy,
        model=tfx.dsl.Channel(type=tfx.types.standard_artifacts.Model),
        model_blessing=tfx.dsl.Channel(type=tfx.types.standard_artifacts.ModelBlessing),
    ).with_id("latest_blessed_model_resolver")
    evaluator = tfx.components.Evaluator(
        examples=example_gen.outputs["examples"],
        model=trainer.outputs["model"],
        eval_config=eval_config,
        baseline_model=model_resolver.outputs["model"],
    )
    pusher = tfx.components.Pusher(
        model=trainer.outputs["model"],
        model_blessing=evaluator.outputs["blessing"],
        push_destination=tfx.proto.PushDestination(
            filesystem=tfx.proto.PushDestination.Filesystem(
                base_directory=model_directory
            )
        ),
    )
    components = [
        example_gen,
        statistics_gen,
        schema_importer,
        example_validator,
        transform,
        trainer,
        model_resolver,
        evaluator,
        pusher,
    ]
    return tfx.dsl.Pipeline(
        pipeline_name=pipeline_name,
        pipeline_root=pipeline_directory,
        metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(
            metadata_path
        ),
        components=components,
    )
tfx.orchestration.LocalDagRunner().run(
    create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_directory=str(PIPELINE_DIRECTORY),
        data_directory=DATA_DIRECTORY,
        schema_path=str(SCHEMA_DIRECTORY),
        model_directory=str(MODEL_DIRECTORY),
        metadata_path=str(METADATA_PATH),
        transform_module_path=TRANSFORM_MODULE_PATH,
        trainer_module_path=TRAINER_MODULE_PATH
    )
)
Running inference
inference_fn = get_inference_fn(MODEL_DIRECTORY)
result = inference_fn(examples=tf.constant(_examples(test_df)))
print(result["output_0"].numpy())