from random import random
def gen_gridworld(dim:int, prob:float) -> list[list[int]]:
"""generate gridworld implemenation
generates a dim by dim gridworld where the (0,0) is source and (dim-1, dim-1) is goal, and all other cells have probability, prob, of being a blocking cell
Args:
dim (int) : the dimensions of the square grid
prob (float): the probability of a square being blocked
Returns:
list[list[int]]: a dimb by dim square grid with blocked and unblock squares
"""
grid = []
for i in range(dim):
col = []
for j in range(dim):
p = random()
if i == 0 and j == 0:
col.append(0)
elif i == dim - 1 and j == dim - 1:
col.append(0)
elif p < prob:
col.append(1)
else:
col.append(0)
grid.append(col)
return grid

### Heuristics
def euclidean_heuristic(x1: int, y1: int, x2: int, y2: int):
return ((x1 - x2) ** 2 + (y1 - y2) ** 2)**(0.5)
def manhattan_heuristic(x1: int, y1: int, x2: int, y2: int):
return abs(x1 - x2) + abs(y1 - y2)
def chebyshev_heuristic(x1: int, y1: int, x2: int, y2: int):
return max(abs(x1 - x2), abs(y1 - y2))

### A* for Known Grids
from collections import defaultdict
import heapq
from typing import Tuple, Callable, List, Dict
import numpy as np
def a_star(
grid: List[List[int]],
h: Callable = manhattan_heuristic,
source: Tuple[int, int] = (0, 0),
W: int = 1,
) -> (list[tuple[int, int]], int, list[tuple[int,int]], int, int):
"""A* Implementation
Used to find optimal shortest path from source to goal on a full gridworld (i.e with complete information of the block cells).
Args:
grid (2d int list): A square grid with 0 representing empty square and 1 representing a blocked square
h (func) Optional: heuristic function and the default heuristic function is the manhattan distance
source (tuple) Optional: can control where to start the A* search from
W (int) Optional: weight on the heuristic function
Returns:
list(tuple(int, int)): the path from the source node to the goal node which consists of coordinate points (i.e. x,y) in the form of tuples.
int: the number of steps that need to taken from soure to goal (i.e. len(path) - 1 or 2(dim - 1))
list(tuple(int,int)): all visited squares from the source point. Contains int tuples in the form of (x,y).
int: trajectory - num of times a node was popped from the fringe
int: cells processed - num of times we check if a particular grid square was block or not
"""
n, m = len(grid) - 1, len(grid[0]) - 1 # dimensions of the grid
goal = (n, m) # goal coord of the grid always bottom right corner
g = defaultdict(
lambda: float("inf")
) # dictionary with key: (x,y) tuple and value: length of the shortest path from (0,0) to (x,y)
g[source] = 0 # set distance to source 0
parent = {
source: None
} # dictionary with key: (x,y) tuple and value is a (x,y) tuple where the value point is the parent point of the key point in terms of path traversal
pq = [(0, -g[source], source)]
trajectory = cells_processed = 0 # statistics to keep track while A* runs
while pq:
_, _, curr = heapq.heappop(pq) # pop from priority queue
# pretty_print_explored_region(grid, list(g.keys())) # uncomment if you want to see each step of A*
trajectory += 1
if curr == goal: # check if we found our goal point
break
# generate children
for dx, dy in [(0, 1), (-1, 0), (0, -1), (1, 0)]: # right down left up
new_x, new_y = (curr[0] + dx, curr[1] + dy) # compute children
# skip out of bounds or blocked children
if not (0 <= new_x <= n and 0 <= new_y <= m and grid[new_x][new_y] == 0):
continue
children = (new_x, new_y)
new_g= g[curr] + 1 # add + 1 to the real cost of children
if (
children not in g or new_g < g[children]
): # only care about new undiscovered children or children with lower cost
cells_processed += 1
g[children] = new_g # update cost
parent[children] = curr # update parent of children to current
h_value = h(*children, *goal) # calculate h value
f = new_g + W * h_value # generate f(n')
heapq.heappush(
pq, (f, -new_g, children)
) # add children to priority queue | -new_g is added to break ties
else:
return [], float("inf"), list(g.keys()), trajectory, cells_processed
# generate path traversal using parent dict
path = [curr]
while curr != source:
curr = parent[curr]
path.append(curr)
path.reverse() # reverse the path so that it is the right order
return path, g[goal], list(g.keys()), trajectory, cells_processed

from collections import defaultdict
import heapq
from copy import deepcopy
from typing import Tuple, Callable, List, Dict
import numpy as np
def repeated_a_star(complete_grid: List[List[int]], use_full_grid=False, h:Callable = manhattan_heuristic, original_source:Tuple[int,int] = (0,0), W:int = 1, enhancement:int = 0) -> (list[list[int]], list[tuple[int,int]], int, int, int, int):
"""Repeated A* Implementation
Used to find optimal shortest path from source to goal on a full gridworld (i.e with complete information of the block cells).
Args:
complete_grid (2d int list): A square grid with 0 representing empty square and 1 representing a blocked square
user_full_grid (bool) Optinal: False indicates that the agent should not be able to see the whole grid and True represents the agent has information of the entire grid
h (func) Optional: heuristic function and the default heuristic function is the manhattan distance
original_source (tuple) Optional: can control where to start the A* search from
enhancements (int) Optional: this is to change the behavior of repeated A* which is for Question 7 & 8 | 0 is default repeated A*, 1 is reduced FOV repeated A*, 2 is improved repeated A*
Returns:
list[list[int]]: the discovered grid after running repeated A*
list[tuple[int, int]]: the path from the source node to the goal node which consists of coordinate points (i.e. x,y) in the form of tuples.
int: the number of steps in the path from source to goal (i.e. len(path) - 1 or 2(dim - 1))
list[tuple[int,int]]: all visited squares from the source point. Contains int tuples in the form of (x,y).
int: trajectory
int: num of cells processed
int: number of repeats in the repeated A* algorithm
return grid, path_discovered_grid, len(path_discovered_grid)-1, list(g.keys()), trajectory, cells_processed, repeats
"""
n, m = len(complete_grid) - 1, len(complete_grid[0]) - 1 # dimensions of grid
goal = (n, m) # goal coord
source = original_source #we need to remember the original_source node bc in repeated A* source changes
trajectory = cells_processed = repeats = 0 #statistics that is tracked during function call
# iteration = defaultdict(lambda : 0) #tracks which repeat cycle a node was last calculated for
closed = set() #
if use_full_grid: # if agent is allowed knowledge of full grid
grid = deepcopy(complete_grid)
else: # if agent has to discover
grid = [[0]*(m+1) for _ in range(n+1)]
def planning(pq:list, grid:list[list[int]]): # planning step of repeated A* where we try to find the best route from source -> goal
while pq:
f_value, neg_h_value, curr = heapq.heappop(pq)
nonlocal cells_processed
cells_processed += 1
if curr == goal: # reach goal node we are done planning
heapq.heappush(pq, (f_value, neg_h_value, curr))
return
for dx, dy in [(0, 1), (-1, 0), (0, -1), (1, 0)]: # right left down up
new_x, new_y = (curr[0] + dx, curr[1] + dy) # compute children
# skip out of bounds or blocked children
if not (0 <= new_x <= n and 0 <= new_y <= m and grid[new_x][new_y] == 0):
continue
children = (new_x, new_y)
# if iteration[children] < repeats: # we have seen this children before in a past repeat and must reset it's g value
# g.pop(children, None)
# iteration[children] = repeats
new_cost = g[curr] + 1
if g[children] > new_cost: # check if new distance is better than current best distance to child
g[children] = new_cost
parent[children] = curr
h_value = h(*children, *goal)
f = new_cost + W * h_value # calculate new f_value
heapq.heappush(pq, (f, -g[children], children)) # push onto priority queue | -g[children]. is added to break ties on f
def get_path(start, goal): #compute the path from start to goal using parent pointers
path = [goal]
curr = goal
while curr != start:
curr = parent[curr]
path.append(curr)
path.reverse() # reverse the path so that it is the right order
return path
while source != goal: # execution step of repeated A*
repeats += 1
parent = defaultdict(lambda : None)
g = defaultdict(lambda : float('inf'))
g[source] = 0
# iteration[source] = repeats
# iteration[goal] = repeats
pq = [(g[source] + h(*source, *goal), -g[source], source)]
planning(pq, grid)
if not pq: # no path was found from original source to goal
return grid, [], 0, list(closed), trajectory, cells_processed, repeats
path = get_path(source, goal)
# print(source, path)
for pos in path: # travel along the planned path
x,y = pos
if enhancement == 1:
grid[x][y] = complete_grid[x][y]
if grid[x][y] == 1:
if enhancement == 2 and pq:
pass # TODO Doesn't work
# while pq and (node := heapq.heappop(pq)[2]):
# if node in closed:
# source = node
# print(source)
# trajectory += len(get_path(pos, source)) - 1
break
closed.add(pos) # add to closed set
trajectory += 1
source = pos
if enhancement != 1:
for dx, dy in [(0, 1), (-1, 0), (0, -1), (1, 0)]: # generate children
new_x, new_y = [x + dx, y + dy]
if 0 <= new_x <= n and 0 <= new_y <= m:
grid[new_x][new_y] = complete_grid[new_x][new_y] # update knowledge of the grid world
# pretty_print_explored_region(grid, list(closed))
# block out all cells that weren't discovered
for i in range(len(grid)):
for j in range(len(grid[0])):
if not (i,j) in closed:
grid[i][j] = 1
path_discovered_grid, length_discovered_grid, visited_discovered_grid, _, _ = a_star(grid) # call regular A* to generate path from source to goal in final discovered grid world
return grid, path_discovered_grid, len(path_discovered_grid)-1, list(closed), trajectory, cells_processed, repeats

def cyclops_repeated_a_star(complete_grid: List[List[int]], use_full_grid=False, h:Callable = manhattan_heuristic, original_source:Tuple[int,int] = (0,0), W:int = 1) -> (list[list[int]], list[tuple[int,int]], int, int, int, int):
return repeated_a_star(complete_grid, use_full_grid, h, original_source, W, 1)
def improved_repeated_a_star(complete_grid: List[List[int]], use_full_grid=False, h:Callable = manhattan_heuristic, original_source:Tuple[int,int] = (0,0), W:int = 1) -> (list[list[int]], list[tuple[int,int]], int, int, int, int):
return repeated_a_star(complete_grid, use_full_grid, h, original_source, W, 2)

### Testing use only.
counterexample=[
[0, 0, 0, 0, 1],
[0, 0, 0, 1, 0],
[0, 0, 1, 0, 0],
[0, 1, 0, 0, 0],
[0, 0, 0, 0, 0]]
from copy import deepcopy
class colors: #
HEADER = '\033[95m'
RED = '\033[91m'
BLUE = '\033[94m'
BOLD = '\033[1m'
ENDC = '\033[0m'
def pretty_print_explored_region(grid, visited=[], path=[]):
grid_copy = deepcopy(grid)
for x,y in visited:
grid_copy[x][y] = colors.RED + str(grid[x][y]) + colors.ENDC
for x,y in path:
if type(grid_copy[x][y]) == str:
grid_copy[x][y] = colors.HEADER + str(grid[x][y]) + colors.ENDC
else:
grid_copy[x][y] = colors.BLUE + str(grid[x][y]) + colors.ENDC
for row in grid_copy:
for col in row:
print(col, end=" ") # print each element separated by space
print() # Add newline
print("-------")
grid = gen_gridworld(10, 0.25)
# grid = counterexample
path_grid, length_grid, visited_grid, trajectory_grid, cells_processed_grid = a_star(grid)
print(f"A* on Complete Grid length: {length_grid}, trajectory: {trajectory_grid}, cells processed: {cells_processed_grid}")
pretty_print_explored_region(grid, visited_grid, path_grid)
discovered_grid, path_repeated, length_repeated, visited_repeated, trajectory_repeated, cells_processed_repeated, num_of_repeats = repeated_a_star(grid, False, manhattan_heuristic)
print(f"Repeated A* length: {length_repeated}, trajectory: {trajectory_repeated}, cells processed: {cells_processed_repeated}, Repeated {num_of_repeats} times")
print(f"Do both path match: {set(path_grid) == set(path_repeated)}")
pretty_print_explored_region(discovered_grid, visited_repeated, path_repeated)
discovered_grid, path_repeated, length_repeated, visited_repeated, trajectory_repeated, cells_processed_repeated, num_of_repeats = cyclops_repeated_a_star(grid, False, manhattan_heuristic)
print(f"Restricted FOV Repeated A* length: {length_repeated}, trajectory: {trajectory_repeated}, cells processed: {cells_processed_repeated}, Repeated {num_of_repeats} times")
print(f"Do both path match: {set(path_grid) == set(path_repeated)}")
pretty_print_explored_region(discovered_grid, visited_repeated, path_repeated)
discovered_grid, path_repeated, length_repeated, visited_repeated, trajectory_repeated, cells_processed_repeated, num_of_repeats = improved_repeated_a_star(grid, False, manhattan_heuristic)
print(f"Improved Repeated A* length: {length_repeated}, trajectory: {trajectory_repeated}, cells processed: {cells_processed_repeated}, Repeated {num_of_repeats} times")
print(f"Do both path match: {set(path_grid) == set(path_repeated)}")
pretty_print_explored_region(discovered_grid, visited_repeated, path_repeated)
# RED == visited
# Purple == path that was also visited
# Blue == path but were not visited (i.e. if blue then the path we found )

import numpy as np
from matplotlib.pyplot import figure
import matplotlib.pyplot as plt
figure(figsize=(16, 9), dpi=80)
probs = np.linspace(0, 1, 21)
n = 10
def solvability(probs, heuristic= manhattan_heuristic, grids=[]):
solvable = []
if not grids:
grids_for_each_prob = []
for p in probs:
grids = []
for _ in range(n):
grids.append(gen_gridworld(101, p))
grids_for_each_prob.append(grids)
for i,p in enumerate(probs):
cnter = 0
for j in range(n):
_, path, length, _, _, _, _ = repeated_a_star(grids_for_each_prob[i][j], True, heuristic)
if path:
cnter += 1
solvable.append(cnter/n)
return solvable
solvable = solvability(probs)
plt.xticks(probs)
plt.xticks(rotation = 90)
plt.plot(probs, solvable)
plt.xlabel('Probability')
plt.ylabel('Solvability')
plt.title('Solvability based on p value')
plt.show()

from timeit import default_timer as timer
fig, axs = plt.subplots(3,2, figsize = (25,9))
plt.subplots_adjust(wspace=0.2, hspace=0.5)
n = 10
probs = np.linspace(0, 1, 21)
heuristics_dict = {
"Euclidean":euclidean_heuristic,
"Manhattan":manhattan_heuristic,
"Chebyshev":chebyshev_heuristic
}
grids_for_each_prob = []
for p in probs:
grids = []
for i in range(n):
grids.append(gen_gridworld(101, p))
grids_for_each_prob.append(grids)
for i, heuristic in enumerate(['Euclidean', 'Manhattan', 'Chebyshev']):
solvable = []
time = []
for j in range(len(probs)):
cum_solve = cum_time = 0
for k in range(n):
start = timer()
_, path, length, _, _, _, _ = repeated_a_star(grids_for_each_prob[j][k], True, heuristics_dict[heuristic])
end = timer()
cum_time += (end - start)
if path:
cum_solve += 1
solvable.append(cum_solve/n)
time.append(cum_time/n)
for j, (name, data) in enumerate([("Solvability", solvable), ("Time", time)]):
axs[i][j].plot(probs, data)
axs[i][j].title.set_text(heuristic)
axs[i][j].set_xlabel('Probability')
axs[i][j].set_ylabel(name)
axs[i][j].set_xticks(probs)
if name == "Time":
axs[i][j].set_yticks(np.linspace(0, 0.1, 11))
for tick in axs[i][j].get_xticklabels():
tick.set_rotation(90)

p0 = 0.4 # determine from Question 4
densities = np.arange(0, min(p0, 0.33) + 0.01, 0.01) # interval is [start, end) hence + 0.01
best_heuristic = manhattan_heuristic # determine from Q5
n = 10 # num of grid worlds per density

# 6.1 Density vs Average Trajectory Length
average_trajectory_length = []
cyclops_average_trajectory_length = []
for density in densities:
cum_trajectory = cum_cyclops_trajectory = cnt = 0
while cnt != n:
grid = gen_gridworld(101, density)
_, path, _, _, trajectory, _, _ = repeated_a_star(grid, False, best_heuristic)
if not path:
continue
_, _, _, _, cyclops_trajectory, _, _ = cyclops_repeated_a_star(grid, False, best_heuristic)
cnt += 1
cum_trajectory += trajectory
cum_cyclops_trajectory += cyclops_trajectory
average_trajectory_length.append(cum_trajectory/n)
cyclops_average_trajectory_length.append(cum_cyclops_trajectory/n)
plt.plot(densities, average_trajectory_length)
plt.plot(densities, cyclops_average_trajectory_length)
plt.legend(["Regular FOV", "Restricted FOV"])
plt.xticks(densities)
plt.xticks(rotation = 90)
plt.xlabel('Density')
plt.ylabel('Average Trajectory Length')
plt.title('Density vs Average Trajectory Length')
plt.show()

# 6.2 Density vs Average (Length of Trajectory / Length of Shortest Path in Final Discovered Gridworld)
trajectory_over_path_length = []
cyclops_trajectory_over_path_length= []
for density in densities:
summation = cyclops_summation = cnt = 0
while cnt != n:
grid = gen_gridworld(101, density)
_, _, length, _, trajectory, _, _ = repeated_a_star(grid, False, best_heuristic)
_, _, cyclops_length, _, cyclops_trajectory, _, _ = cyclops_repeated_a_star(grid, False, best_heuristic)
if not length:
continue
cnt += 1
summation += trajectory/length
cyclops_summation += cyclops_trajectory/cyclops_length
trajectory_over_path_length.append(summation/n)
cyclops_trajectory_over_path_length.append(cyclops_summation/n)
plt.plot(densities, trajectory_over_path_length)
plt.plot(densities, cyclops_trajectory_over_path_length)
plt.legend(["Regular FOV", "Restricted FOV"])
plt.xticks(densities)
plt.xticks(rotation = 90)
plt.xlabel('Density')
plt.ylabel('Avg (Len of Traj./Len of Shortest Path in Final Dis. Grid)')
plt.title('Den. vs Avg (Len Traj./Len of Shortest Path in Final Dis. Grid)')
plt.tight_layout()
plt.show()

# 6.3 Density vs Average (Length of Shortest Path in Final Discovered Gridworld / Length of Shortest Path in Full Gridworld)
shortestfinal_shortestfull = []
cyclops_shortestfinal_shortestfull = []
for density in densities:
summation = cyclops_summation = cnt = 0
while cnt != n:
while True:
grid = gen_gridworld(101, density)
_, _, full_length, _, _, _, _ = repeated_a_star(grid, True, best_heuristic)
if full_length:
break
_, _, final_length, _, _, _, _ = repeated_a_star(grid, False, best_heuristic)
_, _, cyclops_full_length, _, _, _, _ = cyclops_repeated_a_star(grid, True, best_heuristic)
_, _, cyclops_final_length, _, _, _, _ = cyclops_repeated_a_star(grid, False, best_heuristic)
cnt += 1
summation += (final_length/full_length)
cyclops_summation += (cyclops_final_length/cyclops_full_length)
shortestfinal_shortestfull.append(summation/n)
cyclops_shortestfinal_shortestfull.append(cyclops_summation/n)
plt.plot(densities, shortestfinal_shortestfull)
plt.plot(densities, cyclops_shortestfinal_shortestfull)
plt.legend(["Regular FOV", "Restricted FOV"])
plt.xticks(densities)
plt.xticks(rotation = 90)
plt.xlabel('Density')
plt.ylabel('Avg (Len of SP (Shortest Path) in Final DG (Discovered Grid /Len of SP in FG (Full Grid))')
plt.title('Den.vs. Avg.(Len of SP in Final DG/Len of SP in FG)', y=1.75)
plt.tight_layout()
plt.show()

# 6.4 Density vs Average Number of Cells Processed by Repeated A* Discuss your results. Are they as you expected? Explain.
number_of_cells_processed = []
cyclops_number_of_cells_processed= []
for density in densities:
cnt = summation = cyclops_summation = 0
while cnt != n:
grid = gen_gridworld(101, density)
_, _, _, _, _, cells_processed, _ = repeated_a_star(grid, False, best_heuristic)
if not cells_processed:
continue
_, _, _, _, _, cyclops_cells_proccessed, _ = cyclops_repeated_a_star(grid, False, best_heuristic)
cnt += 1
summation += cells_processed
cyclops_summation += cyclops_cells_proccessed
number_of_cells_processed.append(summation/n)
cyclops_number_of_cells_processed.append(cyclops_summation/n)
plt.plot(densities, number_of_cells_processed)
plt.plot(densities, cyclops_number_of_cells_processed)
plt.legend(["Regular FOV", "Restricted FOV"])
plt.xticks(densities)
plt.xticks(rotation = 90)
plt.xlabel('Density')
plt.ylabel('Average Number of Cells Processed by Repeated A*')
plt.title('Den. vs Average Number of Cells Processed by Repeated A*')
plt.tight_layout()
plt.show()

### DID NOT COMPLETE BUT GRAPHS CODE IS OUTLINED BELOW
from timeit import default_timer as timer
fig, axs = plt.subplots(1,2, figsize = (25,9))
plt.subplots_adjust(wspace=0.2, hspace=0.5)
n = 10
probs = np.linspace(0, 1, 21)
grids_for_each_prob = []
for p in probs:
grids = []
for i in range(n):
grids.append(gen_gridworld(101, p))
grids_for_each_prob.append(grids)
trajectories = []
time = []
improved_trajectories = []
improved_time = []
for j in range(len(probs)):
cum_trajectory = cum_time = improved_cum_trajectory = improved_cum_time = 0
for k in range(n):
start = timer()
_, _, _, _, trajectory, _, _ = repeated_a_star(grids_for_each_prob[j][k], False)
end = timer()
cum_trajectory += trajectory
cum_time += (end - start)
start = timer()
_, _, _, _, trajectory, _, _ = improved_repeated_a_star(grids_for_each_prob[j][k], False)
end = timer()
improved_cum_trajectory += trajectory
improved_cum_time += (end - start)
trajectories.append(cum_trajectory/n)
time.append(cum_time/n)
improved_trajectories.append(improved_cum_trajectory/n)
improved_time.append(improved_cum_time/n)
axs[0].plot(probs, trajectories)
axs[0].plot(probs, improved_trajectories)
axs[0].legend(["Repeated A*", "Improved Repeated A*"])
axs[0].title.set_text("Probability vs Trajectory Length")
axs[0].set_xlabel('Probability')
axs[0].set_ylabel('Trajectory Length')
axs[0].set_xticks(probs)
for tick in axs[0].get_xticklabels():
tick.set_rotation(90)
axs[1].plot(probs, time)
axs[1].plot(probs, improved_time)
axs[1].legend(["Repeated A*", "Improved Repeated A*"])
axs[1].title.set_text("Probability vs RunTime")
axs[1].set_xlabel('Probability')
axs[1].set_ylabel('Runtime')
axs[1].set_xticks(probs)
for tick in axs[1].get_xticklabels():
tick.set_rotation(90)

fig, axs = plt.subplots(1,2, figsize = (25,9))
plt.subplots_adjust(wspace=0.2, hspace=0.5)
weights = np.linspace(1, 2, 5)
densities = np.arange(0, min(p0, 0.33) + 0.01, 0.01)
l = 33
runtimes = []
trajectories = []
grids = []
while(l != -1):
grid = gen_gridworld(101, densities[l])
_, path, *_ = repeated_a_star(grid, False, best_heuristic)
if path:
grids.append(grid)
l -= 1
print("made grids")
for weight in weights:
cum_time = 0
trajectory_length = 0
for grid in grids:
start = timer()
_, _, _, _, trajectory, _, _ = repeated_a_star(grid, False, euclidean_heuristic, W = weight)
stop = timer()
cum_time += stop-start
trajectory_length += trajectory
runtimes.append(cum_time/33)
trajectories.append(trajectory_length/33)
axs[0].set_xticks(weights)
axs[0].plot(weights, runtimes)
axs[0].set_xlabel('Weights')
axs[0].set_ylabel('Runtimes')
axs[0].title.set_text('Runtimes based on weight')
axs[1].set_xticks(weights)
axs[1].plot(weights, trajectories)
axs[1].set_xlabel('Weights')
axs[1].set_ylabel('Trajectories')
axs[1].title.set_text('Trajectories based on weight')
plt.show()