Introductino to Deep Learning: Lab 1-2

2020년 12월 26일

Lab 1

Music Generation with RNNs


# Import Tensorflow 2.0
import tensorflow as tf 

# Download and import the MIT 6.S191 package
import mitdeeplearning as mdl

# Import all remaining packages
import numpy as np
import os
import time
import functools
from IPython import display as ipythondisplay
from tqdm import tqdm

# Check that we are using a GPU, if not switch runtimes
#   using Runtime > Change Runtime Type > GPU
assert len(tf.config.list_physical_devices('GPU')) > 0
songs = mdl.lab1.load_training_data()

example_song = songs[0]
print("\nExample Song:", example_song)

songs_joined = "\n\n".join(songs)
vocab = sorted(set(songs_joined))
print("There are", len(vocab), "unique characters in the dataset")


Input a sequence of characters to the model, and train the model to predict the output.

Vectorize the text

# Define numerical representation of text
char2idx = {u:i for i, u in enumerate(vocab)}
idx2char = np.array(vocab)

# Print some mapping data
for char,_ in zip(char2idx, range(20)):
  print('  {:4s}: {:3d},'.format(repr(char), char2idx[char]))
print('  ...\n}')

# Vectorize
def vectorize_string(string):
  vectorized_output = np.array([char2idx[char] for char in string])
  return vectorized_output

vectorized_songs = vectorize_string(songs_joined)

Create training examples and targets

Use seq_length to pass a measuring length of the given text. e.g. the text is "Hello", seq_length is 4. Then, the input is "Hell", the output is "ello".

def get_batch(vectorized_songs, seq_length, batch_size):
  n = vectorized_songs.shape[0] - 1
  idx = np.random.choice(n-seq_length, batch_size)

  input_batch = [vectorized_songs[i : i+seq_length] for i in idx]
  output_batch = [vectorized_songs[i+1 : i+seq_length+1] for i in idx]

  x_batch = np.reshape(input_batch, [batch_size, seq_length])
  y_batch = np.reshape(output_batch, [batch_size, seq_length])
  return x_batch, y_batch

# Tests
test_args = (vectorized_songs, 10, 2)
if not mdl.lab1.test_batch_func_types(get_batch, test_args) or \
  not mdl.lab1.test_batch_func_shapes(get_batch, test_args) or \
  not mdl.lab1.test_batch_func_next_step(get_batch, test_args): 
  print("======\n[FAIL] could not pass tests")
  print("======\n[PASS] passed all tests!")

# Create batches
x_batch, y_batch = get_batch(vectorized_songs, seq_length=5, batch_size=1)

for i, (input_idx, target_idx) in enumerate(zip(np.sqeeze(x_batch), np.squeeze(y_batch))):
  print("Step {:3d}".format(i))
  print("  input: {} ({:s})".format(input_idx, repr(idx2char[input_idx])))
  print("  expected output: {} ({:s})".format(target_idx, repr(idx2char[target_idx])))

RNN model

  • tf.keras.Sequential
  • tf.keras.layers.Embedding: input layer, with embedding_dim
  • tf.keras.layers.LSTM: LSTM network, with units=rnn_units
  • tf.keras.layers.Dense: output layer, with vocab_size
def LSTM(rnn_units): 
  return tf.keras.layers.LSTM(

# Add LSTM and Dense layers to define the RNN model using the Sequential API
def build_model(vocab_size, embedding_dim, rnn_units, batch_size):
  model = tf.keras.Sequential([
    # Layer 1 Embedding layer t o transform indices into dense vectors
    #  of a fixed embedding size
    tf.keras.layers.Embedding(vocab_size, embedding_dim, batch_input_shape=[batch_size, None]),

    # Layer 2: LSTM with `rnn_units` number of units
    # Layer 3: Dense (fully-connected) layer that transforms the LSTM output
    #  into the vocabulary size.

model = build_model(len(vocab), embedding_dim=256, rnn_units=1024, batch_size=32)

# Test

x, y = get_batch(vectorized_songs, seq_length=100, batch_size=32)
pred = model(x)
print("Input shape:      ", x.shape, " # (batch_size, sequence_length)")
print("Prediction shape: ", pred.shape, "# (batch_size, sequence_length, vocab_size)")

Predictions from the untrained model

# Categorical distribution to sample over the example prediction
sampled_indices = tf.random.categorical(pred[0], num_samples=1)
sampled_indices = tf.squeeze(sampled_indices, axis=-1).numpy()

print("Input: \n", repr("".join(idx2char[x[0]])))
print("Next Char Predictions: \n", repr("".join(idx2char[sampled_indices])))

Training the model

Use sparse_categorical_crossentropy loss.

def compute_loss(labels, logits):
  loss = tf.keras.losses.sparse_categorical_crossentropy(labels, logits, from_logits=True)
  return loss

example_batch_loss = compute_loss(y, pred)

print("Prediction shape: ", pred.shape, " # (batch_size, sequence_length, vocab_size)") 
print("scalar_loss:      ", example_batch_loss.numpy().mean())
# Hyperparameter setting and optimization

# Optimization parameters:
num_training_iterations = 2000  # Increase this to train longer
batch_size = 4  # Experiment between 1 and 64
seq_length = 100  # Experiment between 50 and 500
learning_rate = 5e-3  # Experiment between 1e-5 and 1e-1

# Model parameters: 
vocab_size = len(vocab)
embedding_dim = 256 
rnn_units = 1024  # Experiment between 1 and 2048

# Checkpoint location: 
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "my_ckpt")
### Define optimizer and training operation ###

# instantiate a new model for training using the `build_model`
#  function and the hyperparameters created above.
model = build_model(vocab_size, embedding_dim, rnn_units, batch_size)

# instantiate an optimizer with its learning rate.
#  Checkout the tensorflow website for a list of supported optimizers.
#  Try using the Adam optimizer to start.
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

def train_step(x, y): 
  # Use tf.GradientTape()
  with tf.GradientTape() as tape:
    # feed the current input into the model and generate predictions
    y_hat = model(x)
    # compute the loss
    loss = compute_loss(y, y_hat)

  # Now, compute the gradients
  # complete the function call for gradient computation. 
  #  Remember that we want the gradient of the loss with respect all 
  #  of the model parameters.
  grads = tape.gradient(loss, model.trainable_variables)
  # Apply the gradients to the optimizer so it can update the model accordingly
  optimizer.apply_gradients(zip(grads, model.trainable_variables))
  return loss

# Begin training

history = []
plotter = mdl.util.PeriodicPlotter(sec=2, xlabel='Iterations', ylabel='Loss')
if hasattr(tqdm, '_instances'): tqdm._instances.clear() # clear if it exists

for iter in tqdm(range(num_training_iterations)):
  # Grab a batch and propagate it through the network
  x_batch, y_batch = get_batch(vectorized_songs, seq_length, batch_size)
  loss = train_step(x_batch, y_batch)

  # Update the progress bar

  # Update the model with the changed weights!
  if iter % 100 == 0:     
# Save the trained model and the weights

Generate music using the model

# Rebuild the model using a batch_size=1
model = build_model(vocab_size, embedding_dim, rnn_units, batch_size=1)

# Restore the model weights for the last checkpoint after training
model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))[1, None]))