How to create custom training loops in Keras

How to create custom training loops in Keras
How to create custom training loops in Keras. 

Training models in Keras is usually done using the fit method. However, you may want more control over the training process. To do that, you'll need to create a custom training loop. This involves setting up a custom function to compute the loss and gradient. This article will walk you through the process of doing that. Let's get to it.


Obtain dataset

We'll use the Fashion MNIST dataset for this illustration and load it using the Layer data loader.

# pip install layer
import layer
mnist_train = layer.get_dataset('layer/fashion_mnist/datasets/fashion_mnist_train').to_pandas()
mnist_test = layer.get_dataset('layer/fashion_mnist/datasets/fashion_mnist_test').to_pandas()
# Successfully logged into https://app.layer.ai as guest
# ⠴  fashion_mnist_train  ━━━━━━━━━━ LOADED [0:00:10] 
# ⠦  fashion_mnist_test   ━━━━━━━━━━ LOADED [0:00:04] 

Here's is how the dataset looks like:

mnist_train["images"][17]
mnist_test["images"][23]

Data processing

Next, convert the cloth images to NumPy arrays.

import numpy as np
def images_to_np_array(image_column):
    return np.array([np.array(im.getdata()).reshape((im.size[1], im.size[0])) for im in image_column])
train_images = images_to_np_array(mnist_train.images)
test_images = images_to_np_array(mnist_test.images)
train_labels = mnist_train.labels
test_labels = mnist_test.labels

Scaling data in deep learning is a common practice because weights and biases of the network are initialized to small numbers between 0 and 1. We, therefore, have to scale the image data.

train_images = train_images / 255.0
test_images = test_images / 255.0
train_images.shape
# (60000, 28, 28)

The neural network expects the above dataset to be in a specific shape. When training models with Keras, we pass the shape as image_width, image_height , number_of_channels. In the shape printed above, we see that the number_of_channels is missing. We need to add that. Failure to do this will result in an error similar to:

ValueError: Exception Input 0 of layer "conv2d" is incompatible with the layer: expected min_ndim=4, found ndim=3. 

To avoid that, expand the dimensions.

# Make sure images have shape (28, 28, 1)
train_images = np.expand_dims(train_images, -1)
test_images = np.expand_dims(test_images, -1)
train_images.shape
# (60000, 28, 28, 1)

Batch the dataset

Next, let's define the number of images that will be passed to the network. 32 is a common choice, but this number can be changed. Let's create batches out of the training images. Passing images in batches also makes training faster. We start by creating a tf.dataset with the from_tensor_slices method, then add the batch size.  

ds_train_batch = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
training_data = ds_train_batch.batch(32)
ds_test_batch = tf.data.Dataset.from_tensor_slices((test_images, test_labels))
testing_data = ds_test_batch.batch(32)

How to create model with custom layers in Keras

Custom layers in TensorFlow are created by inheriting tf.keras.Layer and implementing __init__, build and call.

class MyDenseLayer(tf.keras.layers.Layer):
  def __init__(self, num_outputs):
    super(MyDenseLayer, self).__init__()
    self.num_outputs = num_outputs

  def build(self, input_shape):
    self.kernel = self.add_weight("kernel",
                                  shape=[int(input_shape[-1]),
                                         self.num_outputs])

  def call(self, inputs):
    return tf.matmul(inputs, self.kernel)

layer = MyDenseLayer(10)

A better way to create custom layers is to inherit keras.Model because it avails the Model.fit,Model.evaluate, and Model.save methods. Let's create a custom block with the following layers:

parameters = {"shape":28, "activation": "relu", "classes": 10, "units":12, "optimizer":"adam", "epochs":100,"kernel_size":3,"pool_size":2, "dropout":0.5}

class CustomBlock(tf.keras.Model):
  def __init__(self, filters):
    super(CustomBlock, self).__init__(name='')
    filters1, filters2 = filters
    self.conv2a = layers.Conv2D(filters=filters1,input_shape=(28,28,1), kernel_size=(parameters["kernel_size"], parameters["kernel_size"]), activation=parameters["activation"])

    self.maxpool1a = layers.MaxPooling2D(pool_size=(parameters["pool_size"], parameters["pool_size"]))

    self.conv2b = layers.Conv2D(filters2, kernel_size=(parameters["kernel_size"], parameters["kernel_size"]), activation=parameters["activation"])

    self.maxpool2b = layers.MaxPooling2D(pool_size=(parameters["pool_size"], parameters["pool_size"]))

    self.flatten1a = layers.Flatten()
    self.dropout1a = layers.Dropout(parameters["dropout"])
    self.dense1a = layers.Dense(parameters["classes"], activation="softmax")

  def call(self, input_tensor):
    x = self.conv2a(input_tensor)
    x = tf.nn.relu(x)
    x = self.maxpool1a(x)

    x = self.conv2b(x)
    x = tf.nn.relu(x)
    x = self.maxpool2b(x)

    x = self.flatten1a(x)
    x = self.dropout1a(x)
    x = self.dense1a(x)
    return tf.nn.softmax(x)

Let's initialize the model and check the layers and variables.  

model = CustomBlock([32,64])
input_shape = (1, 28, 28, 1)
x = tf.random.normal(input_shape)
_ = model(x)
x.shape
# TensorShape([1, 28, 28, 1])
model.layers
len(model.variables)

We can also visualize the model's summary.

model.summary()

The model can be used to make predictions even before training. Obviously, the results won't be good.

Define the loss  function

The next step is to define the loss function. We use  SparseCategoricalCrossentropy because the labels are integers. If labels are one-hot encoded  CategoricalCrossentropy is used instead. The goal is to reduce the errors between the true and predicted values. The SparseCategoricalCrossentropy function takes probability predictions and returns the average loss.

loss_object = tf.keras.losses.SparseCategoricalCrossentropy()
def loss(model, x, y, training):
  # training=training is needed only if there are layers with different
  # behavior during training versus inference (e.g. Dropout).
  y_ = model(x, training=training)

  return loss_object(y_true=y, y_pred=y_)

l = loss(model, test_images, test_labels, training=False)
print("Loss test: {}".format(l))

Define the  gradients function

The gradient is computed using the tf.GradientTape function. It calculates the gradient of the loss with respect to the model trainable variables. The tape records operations in the forward pass and uses this information to compute gradients on the backward pass.

def grad(model, inputs, targets):
  with tf.GradientTape() as tape:
    loss_value = loss(model, inputs, targets, training=True)
  return loss_value, tape.gradient(loss_value, model.trainable_variables)

Create an optimizer

An optimizer function uses the computed gradients to adjust the model weights and biases to minimize the loss. This iterative process aims to find the model parameters that result in the least error. We apply the common Adam optimizer function.

optimizer = tf.keras.optimizers.Adam()

Create custom training loop

The training loop feeds the training images to the network while computing the metrics. We use the SparseCategoricalAccuracy to compute the accuracy because the labels are integers. If labels are one-hot encoded, the CategoricalAccuracy is used. We use tqdm to display a progress bar of the training process. The training process involves the following steps:

  • Pass the training data to the network for one epoch.
  • Obtain the training images and labels for each batch.
  • Run predictions using the network and compare the result with the true values.
  • Update model parameters using the Adam optimizer.
  • Track the training metrics for visualization later.
  • Repeat the process for the specified number of epochs.    
from tqdm.notebook import trange
## Note: Rerunning this cell uses the same model parameters

# Keep results for plotting
train_loss_results = []
train_accuracy_results = []

num_epochs = 10

for epoch in trange(num_epochs):
  epoch_loss_avg = tf.keras.metrics.Mean()
  epoch_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()

  # Training loop - using batches of 32
  for x, y in training_data:
    # Optimize the model
    loss_value, grads = grad(model, x, y)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

    # Track progress
    epoch_loss_avg.update_state(loss_value)  # Add current batch loss
    # Compare predicted label to actual label
    # training=True is needed only if there are layers with different
    # behavior during training versus inference (e.g. Dropout).
    epoch_accuracy.update_state(y, model(x, training=True))

  # End epoch
  train_loss_results.append(epoch_loss_avg.result())
  train_accuracy_results.append(epoch_accuracy.result())
  print("Epoch {}: Loss: {:.3f}, Accuracy: {:.3%}".format(epoch + 1,
                                                                epoch_loss_avg.result(),
                                                                epoch_accuracy.result()))

Visualize the loss

Next, visualize the training loss and accuracy with Matplotlib.

fig, axes = plt.subplots(2, sharex=True, figsize=(12, 8))
fig.suptitle('Training Metrics')

axes[0].set_ylabel("Loss", fontsize=14)
axes[0].plot(train_loss_results)

axes[1].set_ylabel("Accuracy", fontsize=14)
axes[1].set_xlabel("Epoch", fontsize=14)
axes[1].plot(train_accuracy_results)
plt.show()

Read more: TensorBoard tutorial (Deep dive with examples and notebook)

Evaluate model on test dataset

To evaluate the network's performance, we loop through the test data, make predictions and compare them with the true values.  tf.math.argmax returns the axis of the largest predicted value.

test_accuracy = tf.keras.metrics.Accuracy()

for (x, y) in testing_data:
  # training=False is needed only if there are layers with different
  # behavior during training versus inference (e.g. Dropout).
  logits = model(x, training=False)    
  prediction = tf.math.argmax(logits, axis=1, output_type=tf.int64)
  test_accuracy(prediction, y)
print("Test set accuracy: {:.3%}".format(test_accuracy.result()))
# Test set accuracy: 87.870%

Use the trained model to make predictions

Let's use the trained model to make predictions on new cloth images and print the prediction. The model outputs logits which we pass to the tf.nn.softmax. This ensures that the sum of all the outputs sums to 1. We, therefore, take the maximum value as the predicted value. Obtaining the index of the maximum and mapping it to categories gives the predicted class.

# training=False is needed only if there are layers with different
# behavior during training versus inference (e.g. Dropout).
predictions = model(test_images[0:5], training=False)
class_names = ["T-shirt/top","Trouser","Pullover","Dress","Coat","Sandal","Shirt","Sneaker","Bag","Ankle boot"]
for i, logits in enumerate(predictions):
  class_idx = tf.math.argmax(logits).numpy()
  p = tf.nn.softmax(logits)[class_idx]
  name = class_names[class_idx]
  print("Image {} prediction: {} ({:4.1f}%)".format(i, name, 100*p))

Final thoughts

You have now learned to create custom layers and training loops in Keras. This helps understand the underlying processing that happens when you call the fit method from Keras. It's also important if you want more fine-grained control of the network's training process. Specifically, we have seen that creating custom training loops involves:

  • Design the network using custom layers or using the Keras built-in layers.
  • Creating custom loss functions.
  • Building a custom function to compute model gradients.
  • Defining the optimizer function.    
  • Creating the custom loop function that utilizes the loss and gradient functions.

TensorFlow resources

Object detection with TensorFlow 2 Object detection API

How to train deep learning models on Apple Silicon GPU

How to build CNN in TensorFlow(examples, code, and notebooks)

How to build artificial neural networks with Keras and TensorFlow

Open On GitHub

The Complete Data Science and Machine Learning Bootcamp on Udemy is a great next step if you want to keep exploring the data science and machine learning field.

Follow us on LinkedIn, Twitter, GitHub, and subscribe to our blog, so you don't miss a new issue.