!pip install gym
!pip install gym-legacy-toytext
import numpy as np
def possible_actions(x,y, shape):
"""
Returns the possible actions at a state s=(x,y)
"""
p_list = []
(M, N) = shape # 4,3
if y+1<M:
p_list.append((x,y+1,'N'))
if y-1>=0:
p_list.append((x,y-1,'S'))
if x+1<N:
p_list.append((x+1,y,'E'))
if x-1>=0:
p_list.append((x-1,y,'W'))
return p_list
def V_k(x,y, V_last, reward, p_suc=0.8, gamma=1.0):
"""
Calculates V_k for a state s=(x,y) given the previous V, 'V_last'.
'p_suc' is the probability to move succesfully.
It is possible that multiple action have equal value,
in this case we return the first one in order N,S,E,W.
Returns value V_k and action pi.
"""
V = float('-inf')
pi = ''
# Calculate V for all possible actions, and pick the maximum.
for x_,y_, a in possible_actions(x, y, reward.shape):
# Case I: Succesfully move to state s=(x_,y_) with p=0.8
v_inner = p_suc*(reward[y_, x_]+gamma*V_last[y_, x_])
# Case II: Stay in same state s=(x,y) with p=0.2
v_inner += (1-p_suc)*(reward[y,x] + gamma*V_last[y,x])
if v_inner > V:
V = v_inner
pi = a
return pi, V
def value_iteration(V_init, reward, epsilon=0.01, gamma=0.9):
"""
Value iteration algorithm for finding optimal policy pi.
Returns optimal policy, expected reward V, and a count
of the number of iteration for satisfactory convergence.
"""
(M, N) = reward.shape
V_last = V_init
V = np.zeros((M, N))
pi = np.zeros((M, N), dtype=str)
counter = 0
while True:
counter += 1
for i in range(M):
for j in range(N):
# Calculate the path and expected reward for all states
pi[i,j], V[i,j] = V_k(j,i, V_last, reward, gamma=gamma)
# If abs(V_k-V_{k-1})<epsilon we have converged
if (np.abs(V-V_last)<epsilon).all():
return pi, V, counter
V_last = V.copy()
#reward = np.asarray([[-1,1,0],
# [0,1,1],
# [-1,0,-1],
# [0,-1,1]])
reward = np.zeros((3,3))
reward[1,1]=10
V_init = 0*np.ones(reward.shape)
print('V0:')
print(V_init)
pi, V, count = value_iteration(V_init, reward, epsilon=0.0001)
print('V:')
# To show the grid we flip the array to show it correctly.
print(np.flipud(V))
print('PI:')
print(np.flipud(pi))
print(f'Took {count} iterations to converge')
import gym
import gym_toytext
import numpy as np
import random
import math
env = gym.make("NChain-v0")
env.reset()
def Q_learn(env, num_episodes, learning_rate, epsilon, gamma):
"""
Implementation of the Q-learning algorithm based
on the given code example.
Returns the Q matrix.
"""
# Init the Q-table
Q = np.zeros((5, 2))
for _ in range(num_episodes):
state = env.reset()
done = False
while not done:
# Select an action based on a random draw
if random.uniform(0, 1) < epsilon:
# Explore action space with probability epsilon
action = env.action_space.sample()
else:
# Otherwise, exploit learned values
action = np.argmax(Q[state,:])
# Take the step and get the new state and reward
# done is set to True after 1000 steps.
new_state, reward, done, info = env.step(action)
# Update Q based on our newly gained experience
update = reward + (gamma * np.max(Q[new_state,:])) - Q[state, action]
# Scale the newly gained experience by a learning rate factor
Q[state, action] += learning_rate * update
state = new_state
return Q
num_epis = 20000
learning_rt = 0.1
epsilon = 0.1
gamma = 0.95
Q = Q_learn(env, num_epis, learning_rt, epsilon, gamma)
print(Q)
num_epis = 100
learning_rt = 0.1
epsilon = 1
gamma = 0.95
Q_list = []
for _ in range(3):
Q_list.append(Q_learn(env, num_epis, learning_rt, epsilon, gamma))
avg_Q = np.zeros((5,2))
for q in Q_list:
avg_Q+=q
avg_Q /= len(Q_list)
print(avg_Q)
class Chain_MDP:
"""
Here we define the Chain environment's MDP.
"""
def __init__(self):
self.states = [0,1,2,3,4]
self.p_slip = 0.2
self.num_states = len(self.states)
def action(self, state):
"""
Returns possible actions at a state
action 0: go right
action 1: go left
"""
return [0,1]
def action_result(self, state, action):
"""
Returns the new state after applying action to
the current state.
"""
if action==0: # If we go right
if state==self.states[-1]:
return state
return state+1
elif action==1:
return 0 # If we go left, always go to first state
def all_states(self):
"""
Returns a list of all the states.
"""
return self.states
def reward(self, action, state):
"""
Returns the reward for taking action at state,
resulting in new_state.
"""
if action == 1: # action 'b', meaning go left
return 2
if action == 0: # action 'a', meaning go right
if state==self.states[-1]:
return 10
return 0
def val_iter(env, epsilon, gamma):
"""
Value iteration algorithm for finding optimal policy pi and value V.
Returns optimal policy, expected reward V, and a count
of the number of iteration for satisfactory convergence.
"""
V_last = np.zeros(env.num_states)
V = np.zeros(env.num_states)
pi = np.zeros(env.num_states)
counter = 0
while True:
counter += 1
for state in env.all_states():
v_s = float('-inf')
# Calculate V for all possible actions, and pick the maximum.
for action in env.action(state):
new_state_suc = env.action_result(state, action)
other_action = 0 if action==1 else 1
new_state_slip = env.action_result(state, other_action)
# Case I: Succesfully move to state
v_inner = (1-env.p_slip)*(env.reward(action, new_state_suc)+gamma*V_last[new_state_suc])
# Case II: Slip and take other action
v_inner += env.p_slip*(env.reward(other_action, new_state_slip) + gamma*V_last[new_state_slip])
if v_inner > v_s:
v_s = v_inner
pi_s = action
V[state]=v_s
pi[state]=pi_s
if (np.abs(V-V_last)<epsilon).all():
return pi, V, counter
V_last = V.copy()
# Instantiate the chain
c = Chain_MDP()
# Perform value iteration and print optimal policy pi and value V.
pi, V, count = val_iter(c, epsilon=1e-10, gamma=0.95)
print(f'pi: {pi}')
print(f'V: {V}')