This notebooks presents basic Deep Q-Network (DQN) used to solve OpenAI Gym Classic Control environments like Mountain Car, Inverted Pendulum and so on. In this notebook we use DQN without target network for educational purposes. For practical applications use target network.
The environments solved in this notebook are:
I tried to match pre-processing and hyper parameters for all the environments. As the result some hyperparams will differ from optimal. Special note goes to frame skip parameter, which is set to 4 for mountain car and moon lander and omitted otherwise. It can be brought to value 2 for all environments and all of them should still train OK. As for training iterations, I used 25k everywhere with exception of LunarLander (200k). Epsilon decay is also elongated for lunar lander.
Imports
import os
import datetime
import itertools
import collections
import numpy as np
import matplotlib.pyplot as plt
Import OpenAI Gym
import gym
Initialise TensorFlow
import tensorflow as tf
gpu_options = tf.GPUOptions(allow_growth=True)  # init TF ...
config=tf.ConfigProto(gpu_options=gpu_options)  # w/o taking ...
with tf.Session(config=config): pass            # all GPU memory
Import helpers for plotting (source file: helpers.py)
import helpers
import importlib
importlib.reload(helpers);
Epsilon random policy
def policy(st, model, eps):
    if np.random.rand() > eps:
        q_values = model.eval(np.stack([st]))
        return np.argmax(q_values)
    else:
        return env.action_space.sample()
Main training loop. Supports resuming from pre-trained agent and callbacks.
def q_learning(env, frames, gamma, eps_decay_steps, eps_target,
               batch_size, model, mem, start_step=0,
               callback=None, trace=None, render=False):
    """Q-Learning, supprots resume
    
    Note: If resuming, all parameters should be identical to original call, with
        exception of 'start_step' and 'frames'.
    
    Params:
        env - environment
        frames - number of time steps to execute
        gamma - discount factor [0..1]
        eps_decay_steps - decay epsilon-greedy param over that many time steps
        eps_target - epsilon-greedy param after decay
        batch_size - neural network batch size from memory buffer
        model      - function approximator, already initialised, with methods:
                     eval(state, action) -> float
                     train(state, target) -> None
        mem - memory reply buffer
        start_step - if continuning, pass in return value (tts_) here
        callback - optional callback to execute
        trace - this object handles data logging, plotting etc.
        render - render openai gym environment?
    """
    
    def eps_schedule(tts, eps_decay_steps, eps_target):
        if tts > eps_decay_steps:
            return eps_target
        else:
            eps_per_step_change = (1-eps_target) / eps_decay_steps
            return 1.0 - tts * eps_per_step_change
    
        
    assert len(mem) >= batch_size
    
    tts_ = start_step                        # total time step
    for _ in itertools.count():              # count from 0 to infinity
        
        S = env.reset()
        episode_reward = 0                   # purely for logging
        if render: env.render()
        
        for t_ in itertools.count():         # count from 0 to infinity
            
            eps = eps_schedule(tts_, eps_decay_steps, eps_target)
            
            A = policy(S, model, eps)
            
            S_, R, done, _ = env.step(A)
            episode_reward += R
            if render: env.render()
            
            mem.append(S, A, R, S_, done)
            
            if callback is not None:
                callback(tts_, t_, S, A, R, done, eps, episode_reward, model, mem, trace)
            
            states, actions, rewards, n_states, dones, _ = mem.get_batch(batch_size)
            targets = model.eval(n_states)
            targets = rewards + gamma * np.max(targets, axis=-1)
            targets[dones] = rewards[dones]  # return of next-to-terminal state is just R
            model.train(states, actions, targets)
            S = S_
            
            tts_ += 1
            if tts_ >= start_step + frames:
                return tts_                  # so we can pick up where we left
            
            if done:
                break
Stripped down version for evaluation. Does not train agent.
def evaluate(env, model, frames=None, episodes=None, eps=0.0, render=False):
    assert frames is not None or episodes is not None
        
    total_reward = 0
    
    tts_ = 0                                 # total time step
    for e_ in itertools.count():             # count from 0 to infinity
        if episodes is not None and e_ >= episodes:
            return total_reward
        
        S = env.reset()
        if render: env.render()
        
        for t_ in itertools.count():         # count from 0 to infinity
            
            A = policy(S, model, eps)
            
            S_, R, done, _ = env.step(A)
            total_reward += R
            if render: env.render()
    
            S = S_
            
            tts_ += 1
            if frames is not None and tts_ >= frames:
                return
            
            if done:
                break
Stripped down version to pre-fill memory buffer with random policy.
def mem_fill(env, mem, steps=None, episodes=None, render=False):
        
    # Fill memory buffer using random policy
    tts_ = 0
    for e_ in itertools.count():
        if episodes is not None and e_ >= episodes:
            return
        
        S = env.reset();
        if render: env.render()
        
        for t_ in itertools.count():
        
            A = env.action_space.sample()    # random policy
            S_, R, done, _ = env.step(A)
            if render: env.render()
                
            mem.append(S, A, R, S_, done)
            
            S = S_
            
            tts_ += 1
            if steps is not None and tts_ >= steps:
                return
            
            if done:
                break
Definition of simple neural network with two hidden layers. As as side note, for classic control tasks tilings work much better.
class TFNeuralNet():
    def __init__(self, nb_in, nb_hid_1, nb_hid_2, nb_out, lr):
        self.nb_in = nb_in
        self.nb_hid_1 = nb_hid_1
        self.nb_hid_2 = nb_hid_2
        self.nb_out = nb_out
        
        tf.reset_default_graph()
              
        self._x = tf.placeholder(name='xx', shape=[None, nb_in], dtype=tf.float32)
        self._y = tf.placeholder(name='yy', shape=[None, nb_out], dtype=tf.float32)
        self._h_hid_1 = tf.layers.dense(self._x, units=nb_hid_1,
                                        activation=tf.nn.relu, name='Hidden_1')
        self._h_hid_2 = tf.layers.dense(self._h_hid_1, units=nb_hid_2,
                                        activation=tf.nn.relu, name='Hidden_2')
        self._y_hat = tf.layers.dense(self._h_hid_2, units=nb_out,
                                      activation=None, name='Output')
        self._loss = tf.losses.mean_squared_error(self._y, self._y_hat)
        self._optimizer = tf.train.RMSPropOptimizer(learning_rate=lr)
        self._train_op = self._optimizer.minimize(self._loss)
        self._sess = tf.Session()
        self._sess.run(tf.global_variables_initializer())
        
    def backward(self, x, y):
        assert x.ndim == y.ndim == 2
        _, y_hat, loss = self._sess.run([self._train_op, self._y_hat, self._loss],
                                         feed_dict={self._x: x, self._y:y})
        return y_hat, loss
    
    def forward(self, x):
        return self._sess.run(self._y_hat, feed_dict={self._x: x})
    
    def save(self, filepath):
        saver = tf.train.Saver()
        saver.save(self._sess, filepath)
        
    def load(self, filepath):
        saver = tf.train.Saver()
        saver.restore(self._sess, filepath)
Function approximator wrapper. Mostly performs sanity check and input/output normalisation.
class TFFunctApprox():
    def __init__(self, model, st_low, st_high, rew_mean, rew_std, nb_actions):
        """Q-function approximator using Keras model
        Args:
            model: TFNeuralNet model
        """
        st_low = np.array(st_low);
        st_high = np.array(st_high)
        self._model = model
        
        assert st_low.ndim == 1 and st_low.shape == st_high.shape
        
        if len(st_low) != model.nb_in:
            raise ValueError('Input shape does not match state_space shape')
        if nb_actions != model.nb_out:
            raise ValueError('Output shape does not match action_space shape')
        # normalise inputs
        self._offsets = st_low + (st_high - st_low) / 2
        self._scales = 1 / ((st_high - st_low) / 2)
        
        self._rew_mean = rew_mean
        self._rew_std = rew_std
    def eval(self, states):
        assert isinstance(states, np.ndarray)
        assert states.ndim == 2
        inputs = (states - self._offsets) * self._scales
        y_hat = self._model.forward(inputs)
        
        #return y_hat
        return y_hat*self._rew_std + self._rew_mean
    def train(self, states, actions, targets):
        
        assert isinstance(states, np.ndarray)
        assert isinstance(actions, np.ndarray)
        assert isinstance(targets, np.ndarray)
        assert states.ndim == 2
        assert actions.ndim == 1
        assert targets.ndim == 1
        assert len(states) == len(actions) == len(targets)
        
        
        targets = (targets-self._rew_mean) / self._rew_std    # normalise
        inputs = (states - self._offsets) * self._scales
        all_targets = self._model.forward(inputs)       # this should normalised already
        all_targets[np.arange(len(all_targets)), actions] = targets
        self._model.backward(inputs, all_targets)
Memory reply buffer. Reasonably fast impl., doesn't do memory copies when picking batches. This will be useful for Atari. Python deque is very slow.
class Memory:
    """Circular buffer for DQN memory reply. Fairly fast."""
    def __init__(self, max_len, state_shape, state_dtype):
        """
        Args:
            max_len: maximum capacity
        """
        assert isinstance(max_len, int)
        assert max_len > 0
        self.max_len = max_len                      # maximum length        
        self._curr_insert_ptr = 0                   # index to insert next data sample
        self._curr_len = 0                          # number of currently stored elements
        state_arr_shape = [max_len] + list(state_shape)
        self._hist_St = np.zeros(state_arr_shape, dtype=state_dtype)
        self._hist_At = np.zeros(max_len, dtype=int)
        self._hist_Rt_1 = np.zeros(max_len, dtype=float)
        self._hist_St_1 = np.zeros(state_arr_shape, dtype=state_dtype)
        self._hist_done_1 = np.zeros(max_len, dtype=bool)
    def append(self, St, At, Rt_1, St_1, done_1):
        """Add one sample to memory, override oldest if max_len reached.
        Args:
            St [np.ndarray]   - state
            At [int]          - action
            Rt_1 [float]      - reward
            St_1 [np.ndarray] - next state
            done_1 [bool]       - next state terminal?
        """
        self._hist_St[self._curr_insert_ptr] = St
        self._hist_At[self._curr_insert_ptr] = At
        self._hist_Rt_1[self._curr_insert_ptr] = Rt_1
        self._hist_St_1[self._curr_insert_ptr] = St_1
        self._hist_done_1[self._curr_insert_ptr] = done_1
        
        if self._curr_len < self.max_len:                 # keep track of current length
            self._curr_len += 1
            
        self._curr_insert_ptr += 1                        # increment insertion pointer
        if self._curr_insert_ptr >= self.max_len:         # roll to zero if needed
            self._curr_insert_ptr = 0
    def __len__(self):
        """Number of samples in memory, 0 <= length <= max_len"""
        return self._curr_len
    def get_batch(self, batch_len):
        """Sample batch of data, with repetition
        Args:
            batch_len: nb of samples to pick
        Returns:
            states, actions, rewards, next_states, next_done, indices
            Each returned element is np.ndarray with length == batch_len
        """
        assert self._curr_len > 0
        assert batch_len > 0
        
        indices = np.random.randint(        # randint much faster than np.random.sample
            low=0, high=self._curr_len, size=batch_len, dtype=int)
        states = np.take(self._hist_St, indices, axis=0)
        actions = np.take(self._hist_At, indices, axis=0)
        rewards_1 = np.take(self._hist_Rt_1, indices, axis=0)
        states_1 = np.take(self._hist_St_1, indices, axis=0)
        dones_1 = np.take(self._hist_done_1, indices, axis=0)
        return states, actions, rewards_1, states_1, dones_1, indices
    
    def pick_last(self, nb):
        """Pick last nb elements from memory
        
        Returns:
            states, actions, rewards, next_states, done_1, indices
            Each returned element is np.ndarray with length == batch_len
        """
        assert nb <= self._curr_len
        
        start = self._curr_insert_ptr - nb                # inclusive
        end = self._curr_insert_ptr                       # not inclusive
        indices = np.array(range(start,end), dtype=int)   # indices to pick, can be neg.
        indices[indices < 0] += self._curr_len            # loop negative to positive
        
        states = np.take(self._hist_St, indices, axis=0)
        actions = np.take(self._hist_At, indices, axis=0)
        rewards_1 = np.take(self._hist_Rt_1, indices, axis=0)
        states_1 = np.take(self._hist_St_1, indices, axis=0)
        dones_1 = np.take(self._hist_done_1, indices, axis=0)
        
        return states, actions, rewards_1, states_1, dones_1, indices
    
Environment wrapper to skip frames. Also known as action repeat. Really helps with some environments, e.g. mountain car.
class WrapFrameSkip():
    def __init__(self, env, frameskip):
        assert frameskip >= 1
        self._env = env
        self._frameskip = frameskip
        self.observation_space = env.observation_space
        self.action_space = env.action_space
    
    def reset(self):
        return self._env.reset()
    
    def step(self, action):
        sum_rew = 0
        for _ in range(self._frameskip):
            obs, rew, done, info = self._env.step(action)
            sum_rew += rew
            if done: break
        return obs, sum_rew, done, info
    
    def render(self, mode='human'):
        return self._env.render(mode=mode)
        
    def close(self):
        self._env.close()               
This object is a basic data logger. If you don't want to generate plots, you can mostly ignore it
class Trace():
    def __init__(self, eval_every, render=False, test_states=None, state_labels=None):
        
        if test_states is not None:
            assert test_states.ndim == 2
            
        self.enable_plotting = False
        
        self.eval_every = eval_every
        self.test_states = test_states
        self.state_labels = state_labels
        
        self.tstep = 0
        self.total_tstep = 0
        
        self.q_values = collections.OrderedDict()
        self.ep_rewards = collections.defaultdict(float)
        self.last_ep_reward = None
        
        self.states = []
        self.actions = []
        self.rewards = []  # t+1
        self.dones = []    # t+1
        self.epsilons = []
Callback which is called every single training iteration, it does as follows:
test_statesdef callback(total_time_step, tstep, st, act, rew_, done_,
             eps, ep_reward, model, memory, trace):
    """Called from gradient_MC after every episode.
    
    Params:
        episode [int] - episode number
        tstep [int]   - timestep within episode
        model [obj]   - function approximator
        trace [list]  - list to write results to"""
    
    assert total_time_step == trace.total_tstep
    
    trace.tstep = tstep
    
    trace.states.append(st)
    trace.actions.append(act)
    trace.rewards.append(rew_)
    trace.dones.append(done_)
    trace.epsilons.append(eps)
        
    if done_:
        trace.ep_rewards[total_time_step] = ep_reward
        trace.last_ep_reward = ep_reward
            
    #
    #   Print, Evaluate, Plot
    #
    if (trace.eval_every is not None) and (trace.total_tstep % trace.eval_every == 0):
        
        last_ep_rew = trace.last_ep_reward
        reward_str = str(round(last_ep_rew, 3)) if last_ep_rew is not None else 'None'
        print(f'wall: {datetime.datetime.now().strftime("%H:%M:%S")}   '
              f'ep: {len(trace.ep_rewards):3}   tstep: {tstep:4}   '
              f'total tstep: {trace.total_tstep:6}   '
              f'eps: {eps:5.3f}   reward: {reward_str}   ')
        if len(st) == 2:
            # We are working with 2D environment,
            # eval. Q-Value function across whole state space
            q_arr = helpers.eval_state_action_space(model, env, split=[128,128])
            trace.q_values[trace.total_tstep] = q_arr
        else:
            # Environment is not 2D,
            # eval. on pre-defined random sample of states
            if trace.test_states is not None:
                y_hat = model.eval(trace.test_states)
                trace.q_values[trace.total_tstep] = y_hat
        if trace.enable_plotting:
            helpers.plot_all(env, model, memory, trace)
            print('■'*80)
    trace.total_tstep += 1
Start with mountain car environment
Notes:
The nut and bolt of mountain car is that agent gets extremely sparse reward on successful exit. Shortest possible exit trajectory is approx 120-130 steps. If we impose 200 step limit and don't repeat actions this means agent has to perform series of 120-130 actions exactly correctly in the row with very little margin for error. If agent doesn't succeed it gets reset back to beginning on step 200. To do this with random exploratory policy is virtually impossible.
Note that this can still be solved easily with linear function approximation (local updates) and optimistic initialisation which facilitates continuing exploration. This is not possible with DQN.
def experiment_mountaincar():
    neural_net = TFNeuralNet(nb_in=2, nb_hid_1=64, nb_hid_2=64, nb_out=3, lr=0.00025)
    
    model = TFFunctApprox(neural_net,
                          env.observation_space.low,
                          env.observation_space.high,
                          rew_mean=-50,
                          rew_std=15,
                          nb_actions=env.action_space.n)
    
    mem = Memory(max_len=100000, state_shape=(2,), state_dtype=float)
    mem_fill(env, mem, steps=10000)
    test_states, _, _, _, _, _ = mem.get_batch(10)
    
    trace = Trace(eval_every=1000, test_states=test_states)
    
    return trace, model, mem
Setup the experiment
env = gym.make('MountainCar-v0').env  # remove 200 step limit
env = WrapFrameSkip(env, frameskip=4)
trace, model, mem = experiment_mountaincar()
# trace.enable_plotting = True
Train the agent
tts = q_learning(env, frames=25000, gamma=.99,
                 eps_decay_steps=20000, eps_target=0.1, batch_size=4096,
                 model=model, mem=mem, callback=callback, trace=trace)
Optional: train some more
# tts = q_learning(env, frames=5000, gamma=.99,
#                  eps_decay_steps=20000, eps_target=0.1, batch_size=4096,
#                  model=model, mem=mem, start_step=tts, callback=callback, trace=trace)
Optional: plot the agent state
# helpers.plot_all(env, model, mem, trace, print_=True)
Save weights
model._model.save('./tf_models/MountainCar.ckpt')
Load weights
model._model.load('./tf_models/MountainCar.ckpt')
Enjoy trained agent
# In Jupyter, press squre '■' in top menu to quit animation
try: evaluate(env, model, frames=float('inf'), eps=0.0, render=True)
except KeyboardInterrupt: pass
finally: env.close()
Result
Expected agent behaviour after 15000 iterations

If you enable plotting, then output after training should be in roughly as follows
wall: 16:45:30   ep:  91   tstep:   22   total tstep:  15000   eps: 0.325   reward: -134.0
Where:
Pendulum is in my opinion actually quite a bit easier than mountain car. Mainly because of rich reward signal.
Notes:
class Pendulum2DEnv():
    def __init__(self):
        self._env = gym.make('Pendulum-v0')
        
        self.observation_space = gym.spaces.Box(
            low=np.array([-np.pi, -8.0]), high=np.array([np.pi, 8.0]), dtype=np.float32 )
        self.action_space = gym.spaces.Discrete(n=3)
        
    def reset(self):
        cos, sin, vel = self._env.reset()
        theta = np.arctan2(sin, cos)
        return np.array([theta, vel])
        
    def step(self, action):
        torques = [-2.0, 0.0, 2.0]
        # torques = [-2.0, -.5, 0.0, .5, 2.0]
        joint_effort = torques[action]
        
        obs, rew, done, _ = self._env.step([joint_effort])
        cos, sin, vel = obs
        theta = np.arctan2(sin, cos)
        return np.array([theta, vel]), rew, done, obs
    
    def render(self, mode='human'):
        return self._env.render(mode=mode)
        
    def close(self):
        self._env.close()
def experiment_pendulum():
    neural_net = TFNeuralNet(nb_in=2, nb_hid_1=64, nb_hid_2=64, nb_out=3, lr=0.00025)
    
    model = TFFunctApprox(neural_net,
                          env.observation_space.low,
                          env.observation_space.high,
                          rew_mean=-210,
                          rew_std=50,
                          nb_actions=env.action_space.n)
    
    mem = Memory(max_len=100000, state_shape=(2,), state_dtype=float)
    mem_fill(env, mem, steps=10000)
    test_states, _, _, _, _, _ = mem.get_batch(10)
    
    trace = Trace(eval_every=1000, test_states=test_states)
    
    return trace, model, mem
Setup the experiment
env = Pendulum2DEnv()
trace, model, mem = experiment_pendulum()
# trace.enable_plotting = True
Train the agent
tts = q_learning(env, frames=25000, gamma=.99,
                 eps_decay_steps=20000, eps_target=0.1, batch_size=4096,
                 model=model, mem=mem, callback=callback, trace=trace)
Optional: train some more
# tts = q_learning(env, frames=5000, gamma=.99,
#                  eps_decay_steps=20000, eps_target=0.1, batch_size=4096,
#                  model=model, mem=mem, start_step=tts, callback=callback, trace=trace)
Optional: plot the agent state
# helpers.plot_all(env, model, mem, trace, print_=True)
Save weights
model._model.save('./tf_models/Pendulum.ckpt')
Load weights
model._model.load('./tf_models/Pendulum.ckpt')
Enjoy trained agent
# In Jupyter, press squre '■' in top menu to quit animation
try: evaluate(env, model, frames=float('inf'), eps=0.0, render=True)
except KeyboardInterrupt: pass
finally: env.close()
Result
Expected agent behaviour after 25000 iterations

If you enable plotting, then output after training should be in roughly as follows
wall: 08:13:49   ep: 250   tstep:   99   total tstep:  25000   eps: 0.100   reward: -227.530
Where:
Everybody loves cartpole!
Notes:
This environment has peculiar dynamic where positive reward, along with non-corrected maximisation bias in Q-Function (we don't use Dual-DQN) causes runaway Q-Value effect where Q-Values increase continuously. This is ok for now, because RELATIVE Q-Values are still good enough to select correct action. I think using target network along with Dual-DQN would help with this problem.
def experiment_cartpole():
    neural_net = TFNeuralNet(nb_in=4, nb_hid_1=64, nb_hid_2=64, nb_out=2, lr=0.00025)
    
    model = TFFunctApprox(neural_net,
                          st_low=np.array([-.2, -1.0, -0.15, -1.0]),
                          st_high=np.array([.2, 1.0, 0.15, 1.0]),
                          rew_mean=0,
                          rew_std=1,
                          nb_actions=env.action_space.n)
    
    mem = Memory(max_len=100000, state_shape=(4,), state_dtype=float)
    mem_fill(env, mem, steps=10000)
    test_states, _, _, _, _, _ = mem.get_batch(10)
    
    trace = Trace(eval_every=1000, test_states=test_states)
    
    return trace, model, mem
Setup the experiment
env = env = gym.make('CartPole-v0')
trace, model, mem = experiment_cartpole()
# trace.enable_plotting = True
Train the agent
tts = q_learning(env, frames=25000, gamma=.99,
                 eps_decay_steps=20000, eps_target=0.1, batch_size=4096,
                 model=model, mem=mem, callback=callback, trace=trace)
Optional: train some more
# tts = q_learning(env, frames=5000, gamma=.99,
#                  eps_decay_steps=20000, eps_target=0.1, batch_size=4096,
#                  model=model, mem=mem, start_step=tts, callback=callback, trace=trace)
Optional: plot the agent state
# helpers.plot_all(env, model, mem, trace, print_=True)
Save weights
model._model.save('./tf_models/CartPole.ckpt')
Load weights
model._model.load('./tf_models/CartPole.ckpt')
Enjoy trained agent
# In Jupyter, press squre '■' in top menu to quit animation
try: evaluate(env, model, frames=float('inf'), eps=0.0, render=True)
except KeyboardInterrupt: pass
finally: env.close()
Result
Expected agent behaviour after 25000 iterations

If you enable plotting, then output after training should be in roughly as follows
wall: 10:16:24   ep: 349   tstep:  125   total tstep:  24000   eps: 0.100   reward: 200.0
Where:
This is a variation of pendulum environment
def experiment_acrobot():
    neural_net = TFNeuralNet(nb_in=6, nb_hid_1=64, nb_hid_2=64, nb_out=3, lr=0.00025)
    
    model = TFFunctApprox(neural_net,
                          env.observation_space.low,
                          env.observation_space.high,
                          rew_mean=-70,
                          rew_std=10,
                          nb_actions=env.action_space.n)
    
    mem = Memory(max_len=100000, state_shape=(6,), state_dtype=float)
    mem_fill(env, mem, steps=10000)
    test_states, _, _, _, _, _ = mem.get_batch(10)
    
    trace = Trace(eval_every=1000, test_states=test_states)
    
    return trace, model, mem
Setup the experiment
env = env = gym.make('Acrobot-v1')
trace, model, mem = experiment_acrobot()
# trace.enable_plotting = True
Train the agent
tts = q_learning(env, frames=25000, gamma=.99,
                 eps_decay_steps=20000, eps_target=0.1, batch_size=4096,
                 model=model, mem=mem, callback=callback, trace=trace)
Optional: train some more
# tts = q_learning(env, frames=5000, gamma=.99,
#                  eps_decay_steps=20000, eps_target=0.1, batch_size=4096,
#                  model=model, mem=mem, start_step=tts, callback=callback, trace=trace)
Optional: plot the agent state
# helpers.plot_all(env, model, mem, trace, print_=True)
Save weights
model._model.save('./tf_models/Acrobot.ckpt')
Load weights
model._model.load('./tf_models/Acrobot.ckpt')
Enjoy trained agent
# In Jupyter, press squre '■' in top menu to quit animation
try: evaluate(env, model, frames=float('inf'), eps=0.0, render=True)
except KeyboardInterrupt: pass
finally: env.close()
Result
Expected agent behaviour after 25000 iterations

If you enable plotting, then output after training should be in roughly as follows
wall: 09:23:11   ep: 332   tstep:   31   total tstep:  24000   eps: 0.100   reward: -90.0
Where:
def experiment_lunarlander():
    neural_net = TFNeuralNet(nb_in=8, nb_hid_1=64, nb_hid_2=64, nb_out=4, lr=0.00025)
    
    model = TFFunctApprox(neural_net,
                          st_low=np.array([-1., -1., -1., -1., -1., -1., -1., -1.]),
                          st_high=np.array([ 1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.]),
                          rew_mean=0,
                          rew_std=1,
                          nb_actions=env.action_space.n)
    
    mem = Memory(max_len=100000, state_shape=(8,), state_dtype=float)
    mem_fill(env, mem, steps=10000)
    
    trace = Trace(eval_every=1000,
                  test_states=np.array([[0, 1.4, 0, 0, 0, 0, 0, 0],     # init
                                        [0, 0.7, 0, 0, 0, 0, 0, 0],     # half way
                                        [0, 0.0, 0, 0, 0, 0, 0, 0],]),  # landing pad
                  state_labels=['Pos.x', 'Pos.y', 'Vel.x', 'Vel.y',
                                'Angle', 'Ang. Vel', 'Left Leg', 'Right Leg'])
    
    return trace, model, mem
Setup the experiment
env = gym.make('LunarLander-v2')
#env = WrapFrameSkip(env, frameskip=4)
trace, model, mem = experiment_lunarlander()
# trace.enable_plotting = True
Train the agent
tts = q_learning(env, frames=200000, gamma=.99,
                 eps_decay_steps=50000, eps_target=0.1, batch_size=4096,
                 model=model, mem=mem, callback=callback, trace=trace)
Optional: train some more
# tts = q_learning(env, frames=5000, gamma=.99,
#                  eps_decay_steps=50000, eps_target=0.1, batch_size=4096,
#                  model=model, mem=mem, start_step=tts, callback=callback, trace=trace)
Optional: plot the agent state
# helpers.plot_all(env, model, mem, trace, print_=True)
Save weights
model._model.save('./tf_models/LunarLander.ckpt')
Load weights
model._model.load('./tf_models/LunarLander.ckpt')
Enjoy trained agent
# In Jupyter, press squre '■' in top menu to quit animation
try: evaluate(env, model, frames=float('inf'), eps=0.0, render=True)
except KeyboardInterrupt: pass
finally: env.close()
Result
Expected agent behaviour after 25000 iterations

If you enable plotting, then output after training should be in roughly as follows

Where:
This section is for rendering the animations, ignore
import time
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import animation, rc
from IPython.display import HTML
def frames_render(env, frames, episodes, eps, model, callback=None, trace=None, render=True, sleep=0):
    rendered_frames = []
    
    def policy(st, model, eps):
        if np.random.rand() > eps:
            stack = np.stack([st])  # convert lazyframe to nn input shape [1, 84, 84, 4]
            q_values = model.eval(stack)
            return np.argmax(q_values)
        else:
            return env.action_space.sample()
        
    total_reward = 0
    
    tts_ = 0                                 # total time step
    for e_ in itertools.count():             # count from 0 to infinity
        
        S = env.reset()
        
        if render:
            rendered_frames.append(env.render(mode='rgb_array'))
            #env.render()
            time.sleep(sleep)
        
        for t_ in itertools.count():         # count from 0 to infinity
            
            # print(e_, t_)
            
            A = policy(S, model, eps)
            
            S_, R, done, info = env.step(A)
            
            #total_reward += info['full-reward']
            
            if render:
                rendered_frames.append(env.render(mode='rgb_array'))
                #env.render()
                time.sleep(sleep)
            
            if callback is not None:
                callback(tts_, e_, t_, S, A, R, done, eps, model, None, trace)
    
            if done:
                break
                
            if frames is not None and tts_ >= frames:
                return rendered_frames, total_reward
                
            S = S_
                
            tts_ += 1
            
        if episodes is not None and e_ >= episodes-1:
            return rendered_frames, total_reward
env.close()
evaluate(env, frames=None, episodes=1, eps=0.05, model=model, render=True)
rendered_frames, total_reward = frames_render(env, frames=None, episodes=1, eps=0.05, model=model, render=True)
plt.ioff()
fig = plt.figure(figsize=(rendered_frames[0].shape[1] / 72.0,
                          rendered_frames[0].shape[0] / 72.0), dpi = 72)
ax = fig.add_subplot(111);
patch = ax.imshow(rendered_frames[0])
plt.tick_params(axis='both', which='both', bottom=False, top=False, left=False,
                right=False, labelbottom=False, labelleft=False)
def animate(i):
    patch.set_data(rendered_frames[i])
anim = animation.FuncAnimation(fig, animate, frames=len(rendered_frames), interval=20, repeat=True)
HTML(anim.to_html5_video())
Then: