$$ \huge{\underline{\textbf{ Linear Models }}} $$

$$ \large{\textbf{ Tile Coding }} $$


Example application of Tile Coding
from Sutton and Barto 2018, chapter 9.5.
Book available for free here


We are going to use Gradient MC algorithm from chapter 9.3 as a learning algorithm. Before continuing you should familiarise yourself properly with chapter 9.3. We will use the "fixed" version of Gradient MC, repeated here for reference.

One notable difference is that this version takes callback and trace arguments, so we can log results after each episode. This is used to reproduce figures in the book.

In [1]:
def gradient_MC(env, policy, ep, gamma, model, callback=None, trace=None):
    """Gradient Monte Carlo Algorithm
    
    Params:
        env    - environment
        policy - function in a form: policy(state)->action
        ep     - number of episodes to run
        gamma  - discount factor [0..1]
        model  - function approximator, already initialised, with method:
                     train(state, target) -> None
        callback - function in a form: callback(episode, model, trace) -> None
        trace  - passed to callback, so it can log data into it
    """
    for e_ in range(ep):
        traj, T = generate_episode(env, policy)
        Gt = 0
        for t in range(T-1,-1,-1):
            St, _, _, _ = traj[t]      # (st, rew, done, act)
            _, Rt_1, _, _ = traj[t+1]
            
            Gt = gamma * Gt + Rt_1
            model.train(St, Gt)
               
        if callback is not None:
            callback(e_, model, trace)
In [2]:
def generate_episode(env, policy):
    """Generete one complete episode.
    
    Returns:
        trajectory: list of tuples [(st, rew, done, act), (...), (...)],
                    where St can be e.g tuple of ints or anything really
        T: index of terminal state, NOT length of trajectory
    """
    trajectory = []
    done = True
    while True:
        # === time step starts here ===
        if done:  St, Rt, done = env.reset(), None, False
        else:     St, Rt, done = env.step(At)
        At = policy(St)
        trajectory.append((St, Rt, done, At))
        if done:  break
        # === time step ends here ===
    return trajectory, len(trajectory)-1

Figure 9.10 - averaged over 10 runs. Data points were collected every 100 episodes.

I realise this does not reproduce figure from the book, currently I'm not sure how exactly figure in the book was obtained. I am confident it is not because number of runs I used is 10 instead of 30 like in the book, or because I calculated VE every 100 episodes.

The only alternative implementation I found is by Shangtong Zhang (github). His graph looks a bit closer, but he decays step size over time, which obviously changes shape of curves. I modified his implementation to use fixed step size instead, and got same result as mine.

If anyone was able to reproduce results exactly (w/o adding/changing/removing anythign), please drop me a msg.

Environment Setup

All environment and plotting code is exactly the same as in chapter 9.3. All the code is available here: helpers_0905.py

In [3]:
import time
import numpy as np
import matplotlib.pyplot as plt
from helpers_0905 import LinearEnv, plot_linear
In [4]:
env = LinearEnv()
In [5]:
def policy(st):
    return np.random.choice([0, 1])

Tile Coding - Introduction

We will be using tile coding implementation by Richard Sutton himself.

In [6]:
import tiles3

Basic usage of this library is as follows:

  • create Index Hash Table (IHT) - we work with small state space, so don't worry about collisions
  • call tiles3.tiles(...) or tiles3.tileswrap(..) with IHT as first parameter - to obtain list of INDICES of activated tiles
  • use indices to fetch data from e.g. weights array

All this is very similar to how AggregateFunctApprox worked in chapter 9.3

Let's see behaviour using one set of tiles. Tiles divide state space into [ 0.00-0.99,   1.0-1.99,   2.0-2.99,   ... ]

In [7]:
iht = tiles3.IHT(2048)

result = []
print('state -> indices')
for state in np.arange(0, 2.5, 0.15):
    indices = tiles3.tiles(iht, numtilings=1, floats=[state])
    result.append(indices)
    print('{0:.2f}'.format(state), ' -> ', indices)
state -> indices
0.00  ->  [0]
0.15  ->  [0]
0.30  ->  [0]
0.45  ->  [0]
0.60  ->  [0]
0.75  ->  [0]
0.90  ->  [0]
1.05  ->  [1]
1.20  ->  [1]
1.35  ->  [1]
1.50  ->  [1]
1.65  ->  [1]
1.80  ->  [1]
1.95  ->  [1]
2.10  ->  [2]
2.25  ->  [2]
2.40  ->  [2]

Let's do the same with two set of tillings. First set of tiles divides space same as before, second set of tiles is offest by one-half.

In [8]:
iht = tiles3.IHT(2048)

result = []
print('state -> indices')
for state in np.arange(0, 2.5, 0.15):
    indices = tiles3.tiles(iht, numtilings=2, floats=[state])
    result.append(indices)
    print('{0:.2f}'.format(state), ' -> ', indices)
state -> indices
0.00  ->  [0, 1]
0.15  ->  [0, 1]
0.30  ->  [0, 1]
0.45  ->  [0, 1]
0.60  ->  [0, 2]
0.75  ->  [0, 2]
0.90  ->  [0, 2]
1.05  ->  [3, 2]
1.20  ->  [3, 2]
1.35  ->  [3, 2]
1.50  ->  [3, 4]
1.65  ->  [3, 4]
1.80  ->  [3, 4]
1.95  ->  [3, 4]
2.10  ->  [5, 4]
2.25  ->  [5, 4]
2.40  ->  [5, 4]

And with four. Note I changed range. First set as before, sets 2,3,4 are now shifted by one-quater.

In [9]:
iht = tiles3.IHT(2048)

result = []
print('state -> indices')
for state in np.arange(0, 1.0, 0.07):
    indices = tiles3.tiles(iht, numtilings=4, floats=[state])
    result.append(indices)
    print('{0:.2f}'.format(state), ' -> ', indices)
state -> indices
0.00  ->  [0, 1, 2, 3]
0.07  ->  [0, 1, 2, 3]
0.14  ->  [0, 1, 2, 3]
0.21  ->  [0, 1, 2, 3]
0.28  ->  [0, 1, 2, 4]
0.35  ->  [0, 1, 2, 4]
0.42  ->  [0, 1, 2, 4]
0.49  ->  [0, 1, 2, 4]
0.56  ->  [0, 1, 5, 4]
0.63  ->  [0, 1, 5, 4]
0.70  ->  [0, 1, 5, 4]
0.77  ->  [0, 6, 5, 4]
0.84  ->  [0, 6, 5, 4]
0.91  ->  [0, 6, 5, 4]
0.98  ->  [0, 6, 5, 4]

Tile Coding

In [13]:
class TileCodingFuncApprox():
    def __init__(self, learn_rate, tile_size, nb_tilings, nb_states):
        """
        Params:
            learn_rate - step size, will be adjusted for nb_tilings automatically
            tile_size  - e.g. 200 for tiles to cover [1-200, 201-400, ...]
            nb_tilings - how many tiling layers
            nb_states  - total number of states, e.g. 1000
        """
        self._lr = learn_rate / nb_tilings
        self._tile_size = tile_size
        self._nb_tilings = nb_tilings
        self._nb_states = nb_states
        
        # calculate total number of possible tiles so we can allocate memory
        # this calculation is valid for 1d space only!
        nb_tiles_to_cover_state_space = (nb_states // tile_size) + 1  # +1 because tiles
        num_tiles_total = nb_tiles_to_cover_state_space * nb_tilings  #    stick out on sides
        self._num_total_tiles = num_tiles_total

        self._iht = tiles3.IHT(num_tiles_total)  # index hash table
        self._w = np.zeros(num_tiles_total)      # weights
        
    def reset(self):
        self._iht = tiles3.IHT(self._num_total_tiles)  # clear index hash table
        self._w = np.zeros(self._num_total_tiles)      # clear weights
        
    def evaluate(self, state):
        st_scaled = (state-1) / self._tile_size        # scale state to map to tiles correctly
        tile_indices = tiles3.tiles(self._iht,
                                    numtilings=self._nb_tilings,
                                    floats=[st_scaled])
        
        # Implementation #1
        return np.sum(self._w[tile_indices])           # pick correct weights and sum up
        
        # Implementation #2                            # as lin. comb., equivalent to #1
#         x = np.zeros(self._num_total_tiles)
#         x[tile_indices] = 1
#         return x @ self._w
        
        
    def train(self, state, target):
        st_scaled = (state-1) / self._tile_size        # scale state to map to tiles correctly
        tile_indices = tiles3.tiles(self._iht, 
                                    numtilings=self._nb_tilings,
                                    floats=[st_scaled])
        value = np.sum(self._w[tile_indices])
        self._w[tile_indices] += self._lr * (target - value)  # update active weights

Quick test: one set of tiles dividing states into groups of size 200 eacg

In [14]:
model = TileCodingFuncApprox(learn_rate=0.001, tile_size=200, nb_tilings=1, nb_states=1001)
gradient_MC(env, policy, ep=1000, gamma=1.0, model=model)
V = [model.evaluate(st) for st in range(1001)]
plot_linear(V, env=env)

Quick test #2: now with four sets of tiles, each offset by 1/4 of tile width. Note that small 'steps' are not completly independent, tiles still have same width as before.

In [17]:
model = TileCodingFuncApprox(learn_rate=0.001, tile_size=200, nb_tilings=4, nb_states=1001)
gradient_MC(env, policy, ep=1000, gamma=1.0, model=model)
V = [model.evaluate(st) for st in range(1001)]
plot_linear(V, env=env)

Reproduce Figure 9.10

Callback is called by gradient_MC every single timestep:

  • only process every 100th episode (speed)
  • compute V for all states
  • compute Root Mean Squared Error (skip terminal state 0, state 1001 is out of boundry)
  • append to rmse list
In [18]:
def callback(episode, model, trace):
    """Called from gradient_MC after every episode.
    
    Params:
        episode [int] - episode number
        model [obj]   - function approximator
        trace [list]  - list to write results to"""
    if episode % 100 != 0:    return
    V = np.array([model.evaluate(st) for st in range(1001)])                   # arr of float
    err = np.sqrt(np.mean(np.power((np.array(V[1:]) - env.V_approx[1:]), 2)))  # float
    trace.append(err)

Define experiment

In [19]:
def experiment(runs, env, policy, ep, gamma, model, callback):
    results = []  # dims: [nb_runs, nb_episodes]
    for r in range(runs):
        trace = []  # dim: [nb_episodes]
        model.reset()
        gradient_MC(env, policy, ep, gamma, model, callback=callback, trace=trace)
        results.append(trace)
    return np.average(results, axis=0)

Compute result for tilings=1. Average over 10 runs. This took ~6min.

In [276]:
ts = time.time()
model_1 = TileCodingFuncApprox(learn_rate=0.0001, tile_size=200, nb_tilings=1, nb_states=1001)
tiles_1 = experiment(10, env, policy, ep=10000, gamma=1.0, model=model_1, callback=callback)
print('Time took:', time.time()-ts)
Time took: 363.76061820983887

And for tilings=50. Note that alpha is automatically divided by nb_tilings, so we are still in line with the book.

In [280]:
ts = time.time()
model_50 = TileCodingFuncApprox(learn_rate=0.0001, tile_size=200, nb_tilings=50, nb_states=1001)
tiles_50 = experiment(10, env, policy, ep=10000, gamma=1.0, model=model_50, callback=callback)
print('Time took:', time.time()-ts)
Time took: 979.4627282619476

Plot results

In [305]:
fig = plt.figure()
ax = fig.add_subplot(111)
xx = np.array(range(len(tiles_1))) * 100
ax.plot(xx, tiles_1, color='red', label='State aggregation (one tiling)')
ax.plot(xx, tiles_50, color='green', label='Tile coding (50 tilings)')
ax.set_ylabel('$\sqrt{\overline{VE}}$ averaged over 10 runs')
ax.set_xlabel('Episodes')
# plt.savefig('assets/fig_0910.png')
ax.legend()
plt.show()

And plot final values for V

In [25]:
V_1 = [model_1.evaluate(st) for st in range(1001)]
V_50 = [model_50.evaluate(st) for st in range(1001)]

plot_linear(V_1, env)
plot_linear(V_50, env)