import numpy as np
def check_possibilities(x,y,dim):
# the function takes the indices of a point and the dimension of a grid and returns possible moves at a point (x,y) in the grid
(m,n) = dim
possib = []
# if the if-statements were not true, we would move outside the grid
if y<m-1:
possib.append((x, y+1, 'N'))
if y>0:
possib.append((x, y-1, 'S'))
if x<n-1:
possib.append((x+1, y, 'E'))
if x>0:
possib.append((x-1, y, 'W'))
return possib
def get_V(x,y, V_last, reward, p_mov=0.8, gamma=1):
# the function calculates the total reward V for a state s=(x,y) given the last V, V_last
# initialize v with minus infinity and policy with empty string
# p_mov is the probability for a move on the grid
# the function returns V and pi
v = float('-inf')
pi = ''
p = check_possibilities(x,y,reward.shape)
for x1,y1,m in p:
# move to next field + stay in the same field
v_ = p_mov * (reward[y1,x1]+gamma*V_last[y1,x1]) + (1-p_mov)*(reward[y,x]+gamma*V_last[y,x])
if(v_ > v):
#if new total reward is bigger than old one, replace it and save policy
v = v_
pi = m
return pi, v
def value_iteration(V_0, reward, epsilon=0.01, gamma=0.9):
# The function finds the optimal policy pi by iterating over all states and calling the get_V function
# It returns the final states of the last iteration and the underlying policy
(m,n) = reward.shape
V_last = V_0
# initialize V and pi with dimension of reward matrix
V = np.zeros((m, n))
pi = np.zeros((m, n), dtype=str)
while True:
for i in range(m):
for j in range(n):
# call get_V function:
pi[i,j], V[i,j] = get_V(j, i, V_last, reward, gamma=gamma)
# If abs(V_k-V_{k-1})<epsilon -> convergence
if (np.abs(V-V_last)<epsilon).all():
return pi, V
V_last = V.copy()
# initialize what is given in exercise:
rewards = np.array([(0,0,0),(0,10,0),(0,0,0)])
V_init = 0*np.ones(rewards.shape)
# call function with initialized parameters and epsilon=0.0001
pi, V = value_iteration(V_init, rewards, epsilon=0.0001)
# The arrays need to be flipped to show the grid correctly:
print('V:')
print(np.flipud(V))
print('Policy:')
print(np.flipud(pi))
# Demonstration of that there is no significant change in result when using a different V_init
V_init_0 = np.array([(0,0,0),(0,10,0),(0,0,0)])
V_init_ = np.array([(1,2,3),(2,3,4),(3,4,5)])
pi_0, V_0 = value_iteration(V_init_0, rewards, epsilon=0.0001)
pi, V = value_iteration(V_init_, rewards, epsilon=0.0001)
# The arrays need to be flipped to show the grid correctly:
print('V with V_0 initialized with zeros:\n')
print(np.flipud(V_0))
print('\n')
print('V with V_0 not initialized with zeros:\n')
print(np.flipud(V))
#!pip install gym
#!pip install gym-legacy-toytext
#!pip install plotting
import gym_toytext
import gym
import math
import random
import matplotlib.pyplot as plt
env = gym.make('NChain-v0')
env.reset()
def Q_learn(env, num_episodes, learning_rate, epsilon, gamma):
# using the code example given, we're implementing the Q algorithm
# the function takes the environment, the number of episodes, the learning rate, an epsilon and a gamma
# and returns the Q matrix
# initialize Q for 5 states and 2 possible actions as given in exercise
Q = np.zeros((5, 2))
# to keep track of accumulated reward:
episode_rewards = np.zeros(num_episodes)
re = 0
for ep in range(num_episodes):
state = env.reset()
done = False
while done == False:
# First we select an action:
if random.uniform(0, 1) < epsilon: # Flip a skewed coin
action = env.action_space.sample() # Explore action space
else:
action = np.argmax(Q[state,:]) # Exploit learned values
# Then we perform the action and receive the feedback from the environment
new_state, reward, done, info = env.step(action)
# Finally we learn from the experience by updating the Q-value of the selected action
# Update statistics
prediction_error = reward + (gamma*np.max(Q[new_state,:])) - Q[state, action]
# sum reward:
re += reward
episode_rewards[ep] = re
Q[state,action] += learning_rate*prediction_error
state = new_state
return Q, episode_rewards
num_ep = 25000
lrn_rt = 0.01
eps = 0.1
gam = 0.95
# call Q_learning function with initialized parameters
Q, reward = Q_learn(env, num_ep, lrn_rt, eps, gam)
# As stated in the exercise we chose a small learning rate of 0.01
# We chose epsilon to be small (-> 0.1) and therefore select random actions with 10% probability regardless of the actual q value.
%matplotlib inline
fig, ax = plt.subplots(1,1, figsize=(15,8))
x = np.arange(0,num_ep,1)
ax.plot(x, reward)
ax.set_title('Accumulative reward')
ax.set_yscale('log')
plt.show()
print('Matrix Q:')
print(Q)
import numpy as np
class environment:
# In this class we define the environment's MDP
def __init__(self):
# it is initialized with 5 states, a probability of slipping of 0.2 and the number of states
self.states = [0,1,2,3,4]
self.slip = 0.2
self.no_states = len(self.states)
def possible_action(self, state):
# returns the possible actions to be taken outgoing from the current state
# left = 0
# right = 1
return([0,1])
def actions_taken(self,state, possible_action):
# it returns the new state after taking the action outgoing from the current state
if possible_action == 1: # going right
if state == self.states[-1]: # if the state is the last one, it will jump back to the same state
return state
return state +1
elif possible_action == 0: # going left
return 0 # when going left, we're always jumping back to the start
def reward(self, possible_action, state):
# it returns the reward given the old state and the action that is taken which leads to a new state
if possible_action ==0: # if going left
return 2
else:
if state == self.states[-1]: # if the state is the last one, it will jump back to the same state and gain a reward of 10
return 10
return 0 # if going right and not in the last state, the reward will be 0
def total_states(self):
# returns a list of all states
return self.states
def iter_Q(env, epsilon, gamma):
# initiliazing V_last, V and pi with the number of states appearing in the environment
V_last = np.zeros(env.no_states)
V = np.zeros(env.no_states)
pi = np.zeros(env.no_states)
p_ = 0
while True:
for state in env.total_states():
v = float('-inf')
for action in env.possible_action(state):
new_state = env.actions_taken(state, action)
# slip action:
slip_action = 0 if action==1 else 1
slip_state = env.actions_taken(state, slip_action)
# successful moves:
v_ = (1-env.slip)*(env.reward(action, new_state) + gamma*V_last[new_state])
# no successful move, agent 'slips' and the opposite transition is instead taken
v_ += ((env.slip)*(env.reward(slip_action, slip_state) + gamma*V_last[slip_state]))
if(v_ > v):
#if new total reward is bigger than old one, replace it and save policy
v = v_
p_ = action
V[state] = v
pi[state] = p_
#if np.abs(V-V_last)<epsilon -> convergence
if (np.abs(V-V_last)<epsilon).all():
return pi, V
V_last = V.copy()
# Instantiate the chain
c = environment()
# value iteration with gamma = 0.9 and epsilon = 0.0001 just like above in ex. 2
# print optimal policy pi and value V.
pi, V = iter_Q(c, 0.0001, 0.9)
print(f'Optimal policy: {pi}') #-> right, right, right, right, right
print(f'V: {V}')