$$ \huge{\underline{\textbf{ Iterative Policy Evaluation }}} $$
# Naive implementation (for loops are slow), but matches the box exactly
def iter_policy_eval(env, policy, gamma, theta):
"""Iterative Policy Evaluation
Params:
env - environment with following required memebers:
env.nb_states - number of states
env.nb_action - number of actions
env.model - prob-transitions and rewards for all states and actions, see note #1
policy (2d array) - policy to evaluate, rows must sum to 1
gamma (float) - discount factor
theta (float) - termination condition
"""
V = np.zeros(env.nb_states)
while True:
delta = 0
for s in range(env.nb_states):
v = V[s]
tmp = 0
for a in range(env.nb_actions):
for p, s_, r, _ in env.model[s][a]: # see note #1 !
# p - transition probability from (s,a) to (s')
# s_ - next state (s')
# r - reward on transition from (s,a) to (s')
tmp += policy[s,a] * p * (r + gamma * V[s_])
V[s] = tmp
delta = max(delta, abs(v - V[s]))
if delta < theta: break
return V
env.model parameter is taken directly from OpenAI API for FrozenLake-v1 (where it is called env.P, see below). It is a nested structure which describes transition probabilities and expected rewards, for example:
>>> env.model[6][0]
[(0.3333333333333333, 2, 0.0, False),
(0.3333333333333333, 5, 0.0, True),
(0.3333333333333333, 10, 0.0, False)]
Has following meaning:
See diagram
Vectorised Implementation. Updates V "on copy", while non-vectorised updates "in place", so numerically intermediate steps will differ.
def iter_policy_eval_vec(env, policy, gamma, theta):
V, v = np.zeros(env.nb_states), np.zeros(env.nb_states)
P_pi, R_pi = flatten_mdp(env, policy)
while True:
np.copyto(v, V)
V = R_pi + gamma * P_pi @ V
delta = np.max(np.abs(v - V))
if delta < theta: break
return V
Helper functions
def flatten_mdp(env, policy):
"""Flatten MDP to MRP"""
# Could use sparse matrices to save memory
P_pi = np.zeros([env.nb_states, env.nb_states]) # transition probability matrix (s) to (s')
R_pi = np.zeros([env.nb_states]) # exp. reward from state (s) to any next state
for s in range(env.nb_states):
for a in range(env.nb_actions):
for p, s_, r, _ in env.model[s][a]:
# p - transition probability from (s,a) to (s')
# s_ - next state (s')
# r - reward on transition from (s,a) to (s')
P_pi[s, s_] += policy[s,a] * p # transition probability (s) -> (s')
R_pi[s] += policy[s,a] * p * r # exp. reward from (s) to any next state
assert np.alltrue(np.sum(P_pi, axis=-1)==np.ones([env.nb_states])) # rows should sum to 1
return P_pi, R_pi
import numpy as np
import matplotlib.pyplot as plt
import gym
env = gym.make('FrozenLake-v0')
env.reset()
env.render()
Rename some members, but don't break stuff!
if not hasattr(env, 'nb_states'): env.nb_states = env.env.nS
if not hasattr(env, 'nb_actions'): env.nb_actions = env.env.nA
if not hasattr(env, 'model'): env.model = env.env.P
Eval random policy
policy = np.ones([env.nb_states, env.nb_actions]) / env.nb_actions
V_pi = iter_policy_eval(env, policy, gamma=1.0, theta=1e-8)
print(V_pi.reshape([4, -1]))
Check if correct
correct_V_pi = np.array([[0.0139398 , 0.01163093, 0.02095299, 0.01047649],
[0.01624867, 0. , 0.04075154, 0. ],
[0.0348062 , 0.08816993, 0.14205316, 0. ],
[0. , 0.17582037, 0.43929118, 0. ]])
assert np.allclose(V_pi.reshape([4,-1]), correct_V_pi, )
%timeit iter_policy_eval(env, policy, gamma=1.0, theta=1e-8)
%timeit iter_policy_eval_vec(env, policy, gamma=1.0, theta=1e-8)