This notebook presents LSTM network with character-wise input trained on Shakespeare plays.
Dataset file is included in this repo and consists of all works of Shakespeare concatenated together (4.6MB).
import time
import collections
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
Dataset location
dataset_location = '../Datasets/shakespeare/shakespeare_input.txt'
Open text file
with open(dataset_location, 'r') as f:
text = f.read()
print(text[:173])
Discard bit at the end such that text is divisible by 1024. This allow for batch sizes [1, 2, 4, 8, 16, 32, ..., 1024]
mod1024 = len(text) % 1024
text = text[:-mod1024]
Tokenize
tokens = collections.Counter(text).most_common()
tokens[0:5]
i2c = {i : c for i, (c, n) in enumerate(tokens)}
c2i = {c : i for i, c in i2c.items()}
print('i2c:', i2c)
print('c2i:', c2i)
Encode text as tokens, reshape to batches, convert to tensor
batch_size = 128
data = np.array([c2i[c] for c in text])
data = data.reshape((batch_size, -1))
print('data:')
print(data)
print('shape:', data.shape)
split_index = int(data.shape[1]*.8) # 80% train, 10% valid
train_data, valid_data = np.split(data, [split_index], axis=1)
print('train_data:', train_data.shape)
print('valid_data:', valid_data.shape)
Move to GPU if possible
train_x = torch.tensor(train_data).to(device)
valid_x = torch.tensor(valid_data).to(device)
print('train_x:', train_x.shape)
print('valid_x:', valid_x.shape)
Model
class CharRNN(nn.Module):
def __init__(self, nb_layers, n_in, n_embed, n_hid, n_out, dropout):
super(CharRNN, self).__init__()
self.embed = nn.Embedding(num_embeddings=n_in, embedding_dim=n_embed)
self.lstm = nn.LSTM(input_size=n_embed, hidden_size=n_hid, num_layers=nb_layers,
batch_first=True, dropout=dropout)
self.drop = nn.Dropout(p=dropout)
self.fc = nn.Linear(in_features=n_hid, out_features=n_out)
def forward(self, x, hidden):
x = self.embed(x) # shape [n_batch, n_seq, n_embed]
x, hidden = self.lstm(x, hidden) # shape [n_batch, n_seq, n_hid]
x = self.drop(x)
x = self.fc(x) # shape [n_batch, n_seq, n_out]
return x, hidden
def sample(self, inputs, hidden, topk=5):
"""Sample one token, conditioned on inputs and hidden
Params:
inputs - tensor with input tokens, shape [1, n_seq]
hidden - hidden state for LSTM, can be None
topk - int, how many top choices to consider when sampling
Returns:
token for next predicted character, tensor of shape [1, 1] containing one int
"""
logits, hidden = self(inputs, hidden)
last_output = logits[0, -1] # keep last seq. output shape [n_out]
probs = F.softmax(last_output, dim=0) # logits to probabilities shape [n_out]
probs, indices = probs.topk(topk)
weights = probs / probs.sum() # normalize probs
tok = np.random.choice(a=indices.detach().cpu().numpy(), # no torch impl. yet :(
p=weights.detach().cpu().numpy())
res = torch.tensor([[tok]], device=inputs.device) # feed to next sample() call
return res, hidden
Hyperparameters
nb_layers = 2
n_in = len(i2c)
n_seq = 256
n_embed = 50
n_hid = 64
n_out = len(i2c)
dropout = .5
Create model
model = CharRNN(nb_layers, n_in, n_embed, n_hid, n_out, dropout)
model.to(device)
optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()
Helper to generate new text
def generate(prompt=' ', size=1000):
model.eval()
inputs = [c2i[c] for c in prompt] # tokenize
inputs = torch.tensor(inputs).to(device)
inputs = inputs.reshape(1, -1) # shape [n_batch=1, n_seq]
result = []
with torch.no_grad():
output, hidden = model.sample(inputs, None, topk=5)
result.append(output.item())
for i in range(size-1):
output, hidden = model.sample(output, hidden, topk=5)
result.append(output.item())
return ''.join([i2c[i] for i in result])
Helper function for training
def train(nb_epochs, trace, trace2):
epoch = len(trace['epoch'])
train_size = train_x.shape[1] - 1 # -1 because inputs/targets are shifted by one
valid_size = valid_x.shape[1] - 1
for _ in range(nb_epochs):
time_start = time.time()
#
# Train Model
#
model.train()
tloss_sum = 0
hidden = None # reset LSTM hidden state
for i in range(0, train_size, n_seq):
# Pick mini-batch (over seqence dimension)
inputs = train_x[:,i:i+n_seq] # [n_batch, n_seq], less for last batch
targets = train_x[:,i+1:i+1+n_seq] # [n_batch, n_seq], less for last batch
if inputs.shape[1] != targets.shape[1]:
inputs = inputs[:,:-1] # fix shape for last batch in epoch
# Optimize
optimizer.zero_grad()
outputs, hidden = model(inputs, hidden)
hidden = tuple(h.detach() for h in hidden)
loss = criterion(outputs.view(-1, n_out), targets.flatten())
loss.backward()
optimizer.step()
# Record per-iteration loss
tloss_sum += loss.item() * inputs.shape[1] # size of minibatch
trace2['loss'].append( loss.item() )
tloss_avg = tloss_sum / train_size
#
# Evaluate Model
#
model.eval()
vloss_sum = 0
hidden = None
with torch.no_grad():
for i in range(0, valid_size, n_seq):
# Pick mini-batch
inputs = valid_x[:,i:i+n_seq]
targets = valid_x[:,i+1:i+1+n_seq]
if inputs.shape[1] != targets.shape[1]:
inputs = inputs[:,:-1]
# Optimize
outputs, hidden = model(inputs, hidden)
loss = criterion(outputs.view(-1, n_out), targets.flatten())
# Record per-iteration loss
vloss_sum += loss.item() * inputs.shape[1]
vloss_avg = vloss_sum / valid_size
#
# Logging
#
time_delta = time.time() - time_start
trace['epoch'].append(epoch)
trace['tloss'].append(tloss_avg)
trace['vloss'].append(vloss_avg)
#
# Print loss
#
print(f'Epoch: {epoch:3} '
f'T/V Loss: {tloss_avg:.4f} / {vloss_avg:.4f} '
f'Time: {time_delta:.2f}s')
epoch += 1
Test model before training
prompt = 'KING:\n'
new_text = generate(prompt, size=1000)
print(prompt, new_text, sep='')
Actually train the model
trace = {'epoch': [], 'tloss': [], 'vloss': []} # per epoch
trace2 = {'loss' : []} # per iteration
train(nb_epochs=10, trace=trace, trace2=trace2)
Train some more
train(nb_epochs=10, trace=trace, trace2=trace2)
Test model after training
prompt = 'KING:\n'
new_text = generate(prompt, size=1000)
print(prompt, new_text, sep='')
Wait, wait, wait, is valid loss less than train loss?
Lets plot per-iteration train loss
plt.plot(trace2['loss']);
Plot per-epoch train and valid losses
plt.plot(trace['tloss'], label='tloss')
plt.plot(trace['vloss'], label='vloss')
plt.legend();
Why validation loss is less than train loss?
Mainly because dropout is enabled in train mode.
To test this let's make helper function to evaluate model
def evaluate(data_x):
data_size = data_x.shape[1] - 1
#
# Evaluate Model
#
model.eval()
loss_sum = 0
hidden = None
with torch.no_grad():
for i in range(0, data_size, n_seq):
# Pick mini-batch
inputs = data_x[:,i:i+n_seq]
targets = data_x[:,i+1:i+1+n_seq]
if inputs.shape[1] != targets.shape[1]:
inputs = inputs[:,:-1]
# Optimize
outputs, hidden = model(inputs, hidden)
loss = criterion(outputs.view(-1, n_out), targets.flatten())
# Record per-iteration loss
loss_sum += loss.item() * inputs.shape[1]
loss_avg = loss_sum / data_size
return loss_avg
Evaluate on both train and valid datasets
train_loss = evaluate(train_x)
valid_loss = evaluate(valid_x)
print('train loss:', train_loss)
print('valid loss:', valid_loss)
Nope, all is good