# if you don't have gym installed, uncomment and run these lines
# Install a pip package in the current Jupyter kernel
# import sys
# !{sys.executable} -m pip install gym
import gym
import numpy as np
import matplotlib.pyplot as plt
from gym import envs
print(envs.registry.all()) # check if 'FrozenLake-v1' and 'FrozenLake8x8-v1' are among the environments
def run_episodes(environment, n_episodes, policy, display=True):
wins = 0
total_reward = 0
for episode in range(n_episodes):
terminated = False
state = environment.reset()
while not terminated:
# Select an action to perform in a current state
if isinstance(policy, str) and policy == 'random':
action = environment.action_space.sample()
else:
action = np.argmax(policy[state])
# Perform an action and observe how environment acted in response
next_state, reward, terminated, info = environment.step(action)
# Plot the first episode
if episode==1 and display:
print("Action:")
environment.render() # display current agent state
# Summarize total reward
total_reward += reward
# Update current state
state = next_state
# Calculate number of wins over episodes
if terminated and reward == 1.0:
wins += 1
average_reward = total_reward / n_episodes
return wins, total_reward, average_reward
# Load a Frozen Lake environment
env = gym.make('FrozenLake-v1')
# Number of episodes to play
n_episodes = 5000
# First episode plotted as a sample episode
print('First episode:')
wins, total_reward, average_reward = run_episodes(env, n_episodes, policy="random", display=False)
print('------------------------------------')
print('Summary:')
print(f'- number of wins over {n_episodes} episodes = {wins}')
print(f'- average reward over {n_episodes} episodes = {average_reward}')
def policy_evaluation(policy, environment, discount_factor=1.0, theta=1e-9, max_iterations=1e9):
# Number of evaluation iterations
evaluation_iterations = 1
# Initialize a value function for each state as zero
V = np.zeros(environment.nS)
# Repeat until change in value is below the threshold
for i in range(int(max_iterations)):
# Initialize a change of value function as zero
delta = 0
# Iterate though each state
for state in range(environment.nS):
v = V[state]
v_a = []
for action in range(environment.nA):
v_x = 0
for (p, new_state, reward, terminating) in environment.P[state][action]:
v_x += policy[state][action] * p * (reward + V[new_state])
v_a.append(v_x)
# Update value function
V[state] = max(v_a)
# Calculate the absolute change of value function
delta = max(delta, np.abs(V[state] - v))
# Update value function
#V[state] = v
evaluation_iterations += 1
# Terminate if value change is insignificant
if delta < theta:
# TODO - check how many iterations
print(f'Policy-evaluation converged at iteration #{evaluation_iterations}')
return V
def one_step_lookahead(environment, state, V, discount_factor):
action_values = np.zeros(environment.nA)
for action in range(environment.nA):
for probability, next_state, reward, terminated in environment.P[state][action]:
action_values[action] += probability * (reward + discount_factor * V[next_state])
return action_values
def policy_iteration(environment, discount_factor=1.0, max_iterations=1e9):
# Start with a random policy
# num states x num actions / num actions
policy = np.ones([environment.nS, environment.nA]) / environment.nA
# Initialize counter of evaluated policies
evaluated_policies = 1
# Repeat until convergence or critical number of iterations reached
for i in range(int(max_iterations)):
stable_policy = True
# Evaluate current policy
V = policy_evaluation(policy, environment, discount_factor=discount_factor)
# Go through each state and try to improve actions that were taken (policy Improvement)
for state in range(environment.nS):
# Choose the best action in a current state under current policy
current_action = policy[state].tolist().index(max(policy[state]))
# Look one step ahead and evaluate if current action is optimal
# We will try every possible action in a current state
action_value = one_step_lookahead(environment, state, V, discount_factor)
# Select a better action
best_action = action_value.tolist().index(max(action_value))
# If action didn't change
if current_action != best_action:
stable_policy = True
# Greedy policy update
policy[state] = np.eye(environment.nA)[best_action]
evaluated_policies += 1
# If the algorithm converged and policy is not changing anymore, then return final policy and value function
if stable_policy:
return policy, V
# Number of episodes to play
n_episodes = 1000
iteration_name = "Policy iteration"
iteration_func = policy_iteration
# Load a Frozen Lake environment
environment = gym.make('FrozenLake-v1')
# Search for an optimal policy using policy iteration
policy, V = iteration_func(environment.env)
# Apply best policy to the real environment
wins, total_reward, average_reward = run_episodes(environment, n_episodes, policy, False)
print(f'{iteration_name}: number of wins over {n_episodes} episodes = {wins}')
print(f'{iteration_name}: average reward over {n_episodes} episodes = {average_reward} \n\n')
def value_iteration(environment, discount_factor=1.0, theta=1e-9, max_iterations=1e9):
# Initialize state-value function with zeros for each environment state
V = np.zeros(environment.nS)
for i in range(int(max_iterations)):
# Early stopping condition
delta = 0
# Update each state
for state in range(environment.nS):
# Do a one-step lookahead to calculate state-action values
action_value = one_step_lookahead(environment, state, V, discount_factor)
# Select best action to perform based on the highest state-action value
best_action_value = max(action_value)
# Calculate change in value
delta = max(delta, np.abs(V[state] - best_action_value))
# Update the value function for current state
V[state] = best_action_value
# Check if we can stop
if delta < theta:
print(f'Value-iteration converged at iteration #{i}.')
break
# Create a deterministic policy using the optimal value function
policy = np.zeros([environment.nS, environment.nA])
for state in range(environment.nS):
# One step lookahead to find the best action for this state
action_value = one_step_lookahead(environment, state, V, discount_factor)
# Select best action based on the highest state-action value
best_action = action_value.tolist().index(max(action_value))
# Update the policy to perform a better action at a current state
policy[state, best_action] = 1.0
return policy, V
# Number of episodes to play
n_episodes = 1000
iteration_name = "Value iteration"
iteration_func = value_iteration
# Load a Frozen Lake environment
environment = gym.make('FrozenLake-v1') #make environment
# Search for an optimal policy using policy iteration
policy, V = iteration_func(environment.env)
# Apply best policy to the real environment
wins, total_reward, average_reward = run_episodes(environment, n_episodes, policy, False)
print(f'{iteration_name}: number of wins over {n_episodes} episodes = {wins}')
print(f'{iteration_name}: average reward over {n_episodes} episodes = {average_reward} \n\n')
# Number of episodes to play
n_episodes = 5000
# Functions to find best policy
solvers = [('Policy Iteration', policy_iteration),
('Value Iteration', value_iteration)]
for iteration_name, iteration_func in solvers:
# Load a Frozen Lake environment
environment = gym.make('FrozenLake-v1')
# Search for an optimal policy using policy iteration
policy, V = iteration_func(environment.env)
# Apply best policy to the real environment
wins, total_reward, average_reward = run_episodes(environment, n_episodes, policy, False)
print(f'{iteration_name} :: number of wins over {n_episodes} episodes = {wins}')
print(f'{iteration_name} :: average reward over {n_episodes} episodes = {average_reward} \n\n')
# Number of episodes to play
n_episodes = 5000
# Functions to find best policy
solvers = [('Policy Iteration', policy_iteration),
('Value Iteration', value_iteration)]
for iteration_name, iteration_func in solvers:
# Load a Frozen Lake environment
environment = gym.make('FrozenLake8x8-v1')
# Search for an optimal policy using policy iteration
policy, V = iteration_func(environment.env)
# Apply best policy to the real environment
wins, total_reward, average_reward = run_episodes(environment, n_episodes, policy, False)
print(f'{iteration_name} :: number of wins over {n_episodes} episodes = {wins}')
print(f'{iteration_name} :: average reward over {n_episodes} episodes = {average_reward} \n\n')
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline
def value_iteration(goal, p_head):
# initial values
state_value = np.zeros(goal + 1)
state_value[goal] = 1.0
sweeps_history = []
#value-iteration
while True:
old_state_value = state_value.copy()
sweeps_history.append(old_state_value)
for state in states[1:goal]:
# get possilbe actions for current state
actions = np.arange(min(state, goal-state) + 1)
action_returns = []
for action in actions:
winning_state = state+action
losing_state = state-action
reward_winning = 1 if winning_state == goal else 0
returns = (p_head * (reward_winning + old_state_value[winning_state]) + (1-p_head) * (old_state_value[losing_state]))
action_returns.append(returns)
# new_value = # max_over_every_action(sum_over_every_possible_new_state(probablitiy_ending_in_new_state * (reward + value_of_new_state)))
new_value = max(action_returns)
state_value[state] = new_value
delta = abs(state_value - old_state_value).max()
if delta < 1e-9:
sweeps_history.append(state_value)
break
# compute the optimal policy
policy = np.zeros(goal + 1)
for state in states[1:goal]:
actions = np.arange(min(state, goal-state) + 1)
action_returns = []
for action in actions:
winning_state = state+action
losing_state = state-action
reward_winning = 1 if winning_state == goal else 0
returns = (.4 * (reward_winning + old_state_value[winning_state]) + 0.6 * (old_state_value[losing_state]))
action_returns.append(returns)
action = np.argmax(np.round(action_returns[1:], 5)) + 1
#print(state, action, actions, action_returns)
policy[state] = actions[action]
return policy, state_value, sweeps_history
goal = 100
states = np.arange(goal + 1)
policy, V, sweeps_history = value_iteration(goal, 0.4)
plt.figure(figsize=(10, 5))
for sweep, state_value in enumerate(sweeps_history):
plt.plot(state_value, label='sweep {}'.format(sweep))
plt.xlabel('Capital')
plt.ylabel('Value estimates')
plt.title("p_head = 0.4")
plt.legend()
plt.figure(figsize=(10, 5))
plt.plot(states, policy)
plt.xlabel('Capital')
plt.ylabel('Final policy')
plt.title('p_head = 0.4')
plt.show()
policy, V, sweeps_history = value_iteration(goal, 0.25)
plt.figure(figsize=(10, 5))
for sweep, state_value in enumerate(sweeps_history):
plt.plot(state_value, label='sweep {}'.format(sweep))
plt.xlabel('Capital')
plt.ylabel('Value estimates')
plt.title("p_head = 0.25")
plt.legend()
plt.figure(figsize=(10, 5))
plt.plot(states, policy)
plt.xlabel('Capital')
plt.ylabel('Final policy')
plt.title("p_head = 0.25")
plt.show()
policy, V, sweeps_history = value_iteration(goal, 0.62)
plt.figure(figsize=(10, 5))
for sweep, state_value in enumerate(sweeps_history):
plt.plot(state_value, label='sweep {}'.format(sweep))
plt.xlabel('Capital')
plt.ylabel('Value estimates')
plt.title("p_head = 0.62")
plt.legend()
plt.show()
plt.figure(figsize=(10, 5))
plt.plot(states, policy)
plt.xlabel('Capital')
plt.ylabel('Final policy')
plt.title("p_head = 0.62")
plt.show()