$$ \huge{\underline{\textbf{ Continuous Actions }}} $$


Implementation of Policy for Continuous Actions equations
from Sutton and Barto 2018, chapter 13.7.
Book available for free here


Relevant equations from the book:

image.png

image.png

image.png

Equations from Exercise 13.4:

image.png

Experiment Setup

In [1]:
import numpy as np
import matplotlib.pyplot as plt
In [2]:
def gaussian(x, mean, std):
    """Gaussian probability density function. Book eq. 13.18"""
    var = np.power(std, 2)
    denom = (2*np.pi*var)**.5
    num = np.exp( -np.power(x-mean,2) / (2*var) )
    return num / denom
In [3]:
class ContinuousBanditEnv:
    def __init__(self, means):
        self.means = means
        self.std = np.ones_like(means)
    def step(self, action):
        assert action.shape == self.means.shape
        rewards = gaussian(x=action, mean=self.means, std=self.std)
        return np.sum(rewards)

Policy Definition

In [4]:
class TabularGaussinaPolicy:
    """Tabular action-state function 'approximator'"""
    def __init__(self, lr, nb_states, nb_actions, l2=0.0, ent=0.0):
        assert isinstance(lr, float)
        assert isinstance(nb_states, tuple)
        assert isinstance(nb_actions, int)
        self._lr = lr                # learning rate
        self._l2 = l2                # L2 reg.
        self._ent = ent              # entropy reg.
        self.n_act = nb_actions
        self._theta_mu = np.zeros((*nb_states, nb_actions))     # weights
        self._theta_sigma = np.zeros((*nb_states, nb_actions))  # weights
        
    def pi(self, state):
        """Return policy, i.e. probability distribution over actions."""
        assert isinstance(state, (int, tuple))
        assert self._theta_mu.ndim == 2 if isinstance(state, int) else len(state)+1
        assert self._theta_sigma.ndim == 2 if isinstance(state, int) else len(state)+1
        
        # Eq. 13.20
        # in tabular case x(s) vectors are one-hot, which is same as table lookup
        mu = self._theta_mu[state].copy()
        sigma = np.exp(self._theta_sigma[state])
        return mu, sigma         # do not sample here

    def update(self, state, action, disc_return):
        assert isinstance(disc_return, float)
        
        mu = self._theta_mu[state]                               # Eq. 13.20
        sigma = np.exp(self._theta_sigma[state])
        
        grad_ln_theta_mu = (1/sigma**2) * (action-mu)            # Ex. 13.4
        grad_ln_theta_sigma = (((action-mu)**2 / sigma**2) - 1)
        
        # L2 regularization - helps to ensure policy doesn't get deterministic
        grad_ln_theta_mu -= self._l2 * self._theta_mu[state]
        grad_ln_theta_sigma -= self._l2 * self._theta_sigma[state]
        
        # entropy reg. - also helps to ensure policy doesn't get deterministic
        prob = gaussian(action, mu, sigma)
        entropy = -1 * np.sum(prob * np.log(prob))
        grad_ln_theta_mu -= self._ent * entropy
        grad_ln_theta_sigma -= self._ent * entropy
        
        # apply update
        self._theta_mu[state] += self._lr * grad_ln_theta_mu * disc_return
        self._theta_sigma[state] += self._lr * grad_ln_theta_sigma * disc_return

def test_tgp():
    tgp = TabularGaussinaPolicy(lr=0.01, l2=0.0, nb_states=(2,), nb_actions=1)
    tgp.update(1, 5.0, 1.0)
    mu, sigma = tgp.pi(state=0)
    assert np.alltrue(mu == 0.0)
    assert np.alltrue(sigma == 1.0)
    mu, sigma = tgp.pi(state=1)
    assert np.alltrue(mu == 0.05)
    assert np.allclose(sigma, 1.271249)

    print('PASS')
test_tgp()
PASS

Examples

Basic Example

In [5]:
env = ContinuousBanditEnv(means=np.array([2.0]))
In [6]:
tgp = TabularGaussinaPolicy(lr=0.01, nb_states=(1,), nb_actions=1, l2=0.0, ent=0.0)

hist_R, hist_mu, hist_sigma = [], [], []
for i in range(10000):
    mu, sigma = tgp.pi(state=0)
    action = np.random.normal(loc=mu, scale=sigma)
    
    reward = env.step(action)
    
    tgp.update(state=0, action=action, disc_return=reward)
    
    mu, sigma = tgp.pi(state=0)
    hist_R.append(reward)
    hist_mu.append(mu)
    hist_sigma.append(sigma)

hist_R = np.array(hist_R)
hist_mu = np.array(hist_mu)
hist_sigma = np.array(hist_sigma)
In [7]:
plt.scatter(range(len(hist_R)), hist_R, marker='.', s=1, alpha=0.5)
plt.show()
In [8]:
plt.plot(hist_mu[:,0])
plt.fill_between(range(len(hist_mu)),
                 hist_mu[:,0]-hist_sigma[:,0],
                 hist_mu[:,0]+hist_sigma[:,0],
                 alpha=0.2)
Out[8]:
<matplotlib.collections.PolyCollection at 0x7f3ba8be9650>

Example - With Regularization

In [9]:
env = ContinuousBanditEnv(means=np.array([2.0]))
In [10]:
tgp = TabularGaussinaPolicy(lr=0.01, nb_states=(1,), nb_actions=1, l2=0.0, ent=0.1)

hist_R, hist_mu, hist_sigma = [], [], []
for i in range(10000):
    mu, sigma = tgp.pi(state=0)
    action = np.random.normal(loc=mu, scale=sigma)
    
    reward = env.step(action)
    
    tgp.update(state=0, action=action, disc_return=reward)
    
    mu, sigma = tgp.pi(state=0)
    hist_R.append(reward)
    hist_mu.append(mu)
    hist_sigma.append(sigma)

hist_R = np.array(hist_R)
hist_mu = np.array(hist_mu)
hist_sigma = np.array(hist_sigma)
In [11]:
plt.scatter(range(len(hist_R)), hist_R, marker='.', s=1, alpha=0.5)
plt.show()
In [12]:
plt.plot(hist_mu[:,0])
plt.fill_between(range(len(hist_mu)),
                 hist_mu[:,0]-hist_sigma[:,0],
                 hist_mu[:,0]+hist_sigma[:,0],
                 alpha=0.2)
Out[12]:
<matplotlib.collections.PolyCollection at 0x7f3ba8b2aad0>

Example - 2D Actions

In [13]:
env = ContinuousBanditEnv(means=np.array([2.0, -2.0, 1.0, -1.0]))
In [14]:
tgp = TabularGaussinaPolicy(lr=0.002, nb_states=(1,), nb_actions=4, l2=0.0, ent=0.1)

hist_R, hist_mu, hist_sigma = [], [], []
for i in range(10000):
    mu, sigma = tgp.pi(state=0)
    action = np.random.normal(loc=mu, scale=sigma)
    
    reward = env.step(action)
    
    tgp.update(state=0, action=action, disc_return=reward)
    
    mu, sigma = tgp.pi(state=0)
    hist_R.append(reward)
    hist_mu.append(mu)
    hist_sigma.append(sigma)

hist_R = np.array(hist_R)
hist_mu = np.array(hist_mu)
hist_sigma = np.array(hist_sigma)
In [15]:
plt.scatter(range(len(hist_R)), hist_R, marker='.', s=1, alpha=0.5)
plt.show()
In [16]:
colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728']
for i, col, target in zip(range(4), colors, env.means):
    plt.plot(hist_mu[:,i])
    plt.plot([0, 10000], [target, target], color=col, linestyle='--')
    plt.fill_between(range(len(hist_mu)),
                     hist_mu[:,i]-hist_sigma[:,i],
                     hist_mu[:,i]+hist_sigma[:,i],
                     alpha=0.2)
#plt.savefig('assets/1307_continuous.png')