#Importing all the libraries needed...
import numpy as np
import matplotlib.pyplot as plt
import random
import scipy.stats as sts
class TrafficSimulation:
def __init__(self, road_length=100, car_density=0.2, prob_slow=0.5,
max_speed=5, lanes_number = 1, name = None, traffic_lights_positions = [100]):
"""
Create a new traffic simulation object. Cars are distributed randomly
along the road and start with random velocities.
Inputs:
road_length (int) The number of cells in the road. Default: 100.
car_density (float) The fraction of cells that have a car on them.
Default: 0.2.
prob_slow (float) The probability that a car will randomly slow down
by 1 during an update step. Default: 0.2
max_speed (int) The maximum speed in car cells per update step.
Default: 5.
lanes_number Handles the number of lanes on the bigger streets
street_name Is the name of the four streets: Margery, Rosebery, Yardley, and Wilmington Streets
traffic_light_positions
"""
self.road_length = road_length
self.car_density = car_density
self.max_speed = max_speed
self.prob_slow = prob_slow
self.name = name
# Assigning Different lanes and saving it
self.lanes_state = []
self.lanes_number = lanes_number
self.average_traffic_flow = {} #for one lane
self.final_average_traffic_flow = [] # for all lanes
# Setting a place and timer for the traffic light, seeing if it's red, and saving any changes
self.traffic_red = {}
self.traffic_lights_positions = traffic_lights_positions
self.just_changed = {}
# Make the traffic lights respond to all of the lanes on a street and default them to green
for traffic_light in self.traffic_lights_positions:
self.just_changed[traffic_light] = [False]*self.lanes_number
self.traffic_red [traffic_light] = False
## Loop through each lane and do what's done in the class code of creating an empty road with -1 in the empty states,
## and choosing random locations to change the -1 empty state to cars depending on the density (density*length)
## and set the values of these car filled states with their speeds ranging from 0 to 5.
for lane in range(lanes_number):
#Reset the lane state
self.state = None
# Create an empty road: -1 means empty in this simulation
self.state = np.full(self.road_length, -1, dtype=int)
# Choose random locations to place cars
random_indexes = np.random.choice(
range(self.road_length),
size=int(round(car_density * self.road_length)),
replace=False)
# Give each car a random initial speed from 0 to max_speed
self.state[random_indexes] = np.random.randint(
0, self.max_speed + 1, size=len(random_indexes))
self.lanes_state.append(self.state)
self.average_traffic_flow[lane] = []
# Keep track of the time steps and average traffic flow at each step
self.time_step = 0
#######################################################################################################
#######################################################################################################
def update(self):
"""
Advance one time step in the simulation.
"""
flow_sum = 0 #tracking the flow on the street
#Go through each lane and the current state
for lane_no, state in enumerate (self.lanes_state):
## Positioning of the Traffic Lights and Division of the Streets accordingly
## For the street division make the starting point the beginning = 0
## And make the new traffic light the new endpoint where we can insert the acceleration/deceleration code
start_point = 0
# If we place the traffic light, make that point a new endpoint
for indx in range (len(self.traffic_lights_positions)+1):
# Not last part of the street
if indx < len(self.traffic_lights_positions):
# Make it a traffci light
light = self.traffic_lights_positions[indx]
# Check if it's red
if self.traffic_red[light]:
self.lanes_state[lane_no][light] = 0
#If it is not red and it is still zero
elif self.lanes_state[lane_no][light] == 0 and self.just_changed[light][lane_no]:
#Change the traffic light to green
self.lanes_state[lane_no][light] = -1
#Flag that the traffic light has changed
self.just_changed[light][lane_no]=False
#Change the end point of that section of the road
end_point = light
else:
end_point = self.road_length
## Advancing by changing the speed of the cars, if there is no red light. If it's a red light,
## then advance another step and continue to wait for the lights to change
# Red then just skip
for i in range(start_point, end_point):
if i in self.traffic_lights_positions:
if self.traffic_red[i]:
continue
if self.lanes_state[lane_no][i] != -1:
distance = 1
while self.lanes_state[lane_no][(i + distance) % self.road_length] == -1:
distance += 1
# Acceleration
if self.lanes_state[lane_no][i] < self.max_speed:
self.lanes_state[lane_no][i] += 1
# Deceleration
if self.lanes_state[lane_no][i] >= distance:
self.lanes_state[lane_no][i] = distance - 1
# Randomization
if (
(self.lanes_state[lane_no][i] > 0) and
(np.random.uniform() < self.prob_slow)
):
self.lanes_state[lane_no][i] -= 1
#Change the start point of next section of the road
start_point = light
# Move cars forward using their new speeds
new_state = np.full(self.road_length, -1, dtype=int)
for i in range(self.road_length):
cell = self.lanes_state[lane_no][i]
if cell != -1:
new_state[(i + cell) % self.road_length] = cell
self.lanes_state[lane_no] = new_state
# Update average traffic flow history
self.average_traffic_flow[lane_no].append(
sum(self.lanes_state[lane_no][self.lanes_state[lane_no] > 0]) / self.road_length)
flow_sum += self.average_traffic_flow[lane_no][-1]
#Calculate the final traffic flow average
self.final_average_traffic_flow.append(flow_sum/self.lanes_number)
#Add one time step
self.time_step += 1
#######################################################################################################
#######################################################################################################
def change_traffic_light(self, current_traffic, position):
"""
This methods switchs the traffic light in a certian intersection
"""
#Change the determined traffic light to the given state
self.traffic_red[position] = current_traffic
self.just_changed[position] = [True]*self.lanes_number
#######################################################################################################
#######################################################################################################
def display(self):
'''
Print out the current state of the simulation.
'''
print(f"----- Time Step {self.time_step} -----")
for state in self.lanes_state:
print(''.join('.' if x == -1 else str(x) for x in state))
current_strategy = []
rd_len = 50
max_sp = 5
prob_slw = 0.3
lanes_no = 2
traffic_pos = [20, 40]
trails = 300
densities = [0.125,0.25,0.5,0.75,0.875]
for i in densities:
sim1 = TrafficSimulation(road_length=rd_len, car_density=i, max_speed=max_sp, prob_slow=prob_slw, lanes_number = lanes_no, name = "Margery"
, traffic_lights_positions = traffic_pos)
print("Density is", i)
for j in range (3):
sim1.display()
sim1.update()
print()
current_strategy = []
rd_len = 50
max_sp = 5
prob_slw = [0.11,0.33,0.5,0.66,0.88]
lanes_no = 2
traffic_pos = [20, 40]
trails = 300
for i in prob_slw:
sim1 = TrafficSimulation(road_length=rd_len, car_density=0.5, max_speed=max_sp, prob_slow=i, lanes_number = lanes_no, name = "Margery"
, traffic_lights_positions = traffic_pos)
print("Probability of slow is", i)
for j in range (3):
sim1.display()
sim1.update()
print()
current_strategy = []
rd_len = 50
max_sp = 5
prob_slw = 0.3
lanes_no = [1,2,3,4]
traffic_pos = [20, 40]
trails = 300
for i in lanes_no:
sim1 = TrafficSimulation(road_length=rd_len, car_density=1/2, max_speed=max_sp, prob_slow=prob_slw, lanes_number = i, name = "Margery"
, traffic_lights_positions = traffic_pos)
print(i,"Lane/s")
for j in range (3):
sim1.display()
sim1.update()
print()
current_strategy = []
rd_len = 50
max_sp = 5
prob_slw = 0.3
lanes_no = 1
traffic_pos = [[0, 49] , [0,15] , [15,30], [30,49]]
trails = 300
for i in traffic_pos:
sim1 = TrafficSimulation(road_length=rd_len, car_density=1/2, max_speed=max_sp, prob_slow=prob_slw, lanes_number = lanes_no, name = "Margery"
, traffic_lights_positions = i)
print(i,"-th Traffic Light Position")
for j in range (10):
sim1.display()
sim1.update()
print()
current_strategy = []
rd_len = 50
max_sp = 5
prob_slw = 0.3
lanes_no = 2
traffic_pos = [15, 30]
trails = 300
sim1 = TrafficSimulation(road_length=rd_len, car_density=0.5, max_speed=7, prob_slow=prob_slw, lanes_number = lanes_no, name = "Margery"
, traffic_lights_positions = traffic_pos)
for i in range (10):
sim1.display()
sim1.update()
print()
sim2 = TrafficSimulation(
road_length=rd_len, car_density=0.3, max_speed=5, prob_slow=prob_slw, lanes_number = lanes_no, name = "Wilmington"
, traffic_lights_positions = traffic_pos)
for i in range (10):
sim2.display()
sim2.update()
print()
sim3 = TrafficSimulation(
road_length=rd_len, car_density=0.2, max_speed=4, prob_slow=prob_slw, lanes_number = lanes_no, name = "Yardley"
, traffic_lights_positions = traffic_pos)
for i in range (10):
sim3.display()
sim3.update()
print()
sim4 = TrafficSimulation(
road_length=rd_len, car_density=0.6, max_speed=7, prob_slow=prob_slw, lanes_number = lanes_no, name = "Rosebery"
, traffic_lights_positions = traffic_pos)
for i in range (10):
sim4.display()
sim4.update()
print()
class intersection:
def __init__(self, streets, prob_turning = 0.2):
"""
This class handles the intersections between the streets,
and it helps in regulating the traffics to maintain a synced system
"""
#Instaces from the street class
self.streets = streets
#The traffic direction, True means the flow is vertical
self.traffic_direction = [False, False, False, False]
#The current time step
self.time_step = 0
#The proability of turing in an intersection
self.prob_turning = prob_turning
#The intersections map based on
self.intersections_map ={1: {"H": "Wilmington","V": "Margery", "H_position" : 30, "V_position" : 15},
2: {"H": "Wilmington","V": "Rosebery", "H_position" : 15, "V_position" : 30},
3: {"H": "Yardley","V": "Margery", "H_position" : 15, "V_position" : 30},
4: {"H": "Yardley","V": "Rosebery", "H_position" : 30, "V_position" : 15}}
#Update the traffic intially to reset them
self.update (1)
self.update (2)
self.update (3)
self.update (4)
## Swithcing the traffic lights to the other color
def update(self, intersection):
#Inverse the traffic light
self.traffic_direction[intersection-1] = not self.traffic_direction[intersection-1]
#Change the traffic for the intersecting streets
self.streets[self.intersections_map[intersection]["H"]].change_traffic_light(self.traffic_direction[intersection-1],
self.intersections_map[intersection]["H_position"])
self.streets[self.intersections_map[intersection]["V"]].change_traffic_light(not self.traffic_direction[intersection-1],
self.intersections_map[intersection]["V_position"])
def run_simulation(self, steps=70):
"""
This method runs the main simulation
"""
#Specific traffic timer for post street
post_st_traffic_counter = 0
#Run the simulation for a number of steps
for i in range(steps):
#The following part of the code checks the cars turning in the intersections
#Loop through each intersection
for interesection in self.intersections_map:
#Define the variables
h = self.intersections_map[interesection]["H"]
v = self.intersections_map[interesection]["V"]
H_position = self.intersections_map[interesection]["H_position"]
V_position = self.intersections_map[interesection]["V_position"]
#Intersections at horizontal position 15 are similar in behaviour
if H_position == 15:
# From Horizontal to Vertical
# If there is a car on the horizontal and there is clearance on the vertical, the horizontally moving car can go to the vertical
# and the horizontal spot becomes empty, while the open space is occupied by the new car on the vertical
# ?? The change happens on the outside of the street so on lane -1
if self.streets[h].lanes_state[-1][H_position - 1] != -1 and self.streets[v].lanes_state[-1][V_position + 1] == -1:
# there is ability to switch with probability p
if random.uniform (0, 1) <= self.prob_turning:
# the car moves to the veritical, so the position is taken and value is replace by the one from the horizontal
self.streets[v].lanes_state[-1][V_position + 1] = self.streets[h].lanes_state[-1][H_position - 1]
# and the horizontal street becomes empty, and thus -1
self.streets[h].lanes_state[-1][H_position - 1] = -1
# From Vertical to Horizontal
# If there is a car on the vertical and it wants to move in the horizontal, the vertically moving car will go to the opening in horizontal
# make sure there is space on the horizontal, and car at the front in the vertical
# The change happens on the inside of the street so on lande 0
if self.streets[h].lanes_state[0][H_position + 1] == -1 and self.streets[v].lanes_state[0][V_position - 1] != -1:
if random.uniform (0, 1) <= self.prob_turning:
self.streets[h].lanes_state[0][H_position + 1] = self.streets[v].lanes_state[0][V_position - 1]
self.streets[v].lanes_state[0][V_position - 1] = -1
#Intersections at horizontal position 30 are similar in behaviour
elif H_position == 30:
# Doing it on the inside so lane 0
if self.streets[h].lanes_state[0][H_position - 1] != -1 and self.streets[v].lanes_state[0][V_position + 1] == -1:
if random.uniform (0, 1) <= self.prob_turning:
self.streets[v].lanes_state[0][V_position + 1] = self.streets[h].lanes_state[0][H_position - 1]
self.streets[h].lanes_state[0][H_position - 1] = -1
# Doing it in the outside so on lane -1
if self.streets[h].lanes_state[-1][H_position + 1] == -1 and self.streets[v].lanes_state[-1][V_position - 1] != -1:
if random.uniform (0, 1) <= self.prob_turning:
self.streets[h].lanes_state[-1][H_position + 1] = self.streets[v].lanes_state[-1][V_position - 1]
self.streets[v].lanes_state[-1][V_position - 1] = -1
#Update all the streets using the method "Update"
for street_name in self.streets:
self.streets[street_name].update()
self.update (1)
self.update (2)
self.update (3)
self.update (4)
#Increase the time step by one
self.time_step += 1
def average_traffic_flow (self):
"""
This method calculates the average flow rate for all the streets
"""
flow_sum = 0
for street in self.streets:
flow_sum += self.streets[street].final_average_traffic_flow[-1]
return flow_sum / len(self.streets)
def display(self):
'''
Print out the current state of the simulation.
'''
print(f"----- Time Step {self.time_step} -----")
for state in self.lanes_state:
print(''.join('.' if x == -1 else str(x) for x in state))
current_strategy = []
rd_len = 50
prob_slw = 0.1
lanes_no = 2
traffic_pos = [15, 30]
trails = 300
sim1 = TrafficSimulation(road_length=rd_len, car_density=0.5, max_speed=7, prob_slow=prob_slw, lanes_number = lanes_no, name = "Margery"
, traffic_lights_positions = traffic_pos)
sim2 = TrafficSimulation(
road_length=rd_len, car_density=0.3, max_speed=5, prob_slow=prob_slw, lanes_number = lanes_no, name = "Wilmington"
, traffic_lights_positions = traffic_pos)
sim3 = TrafficSimulation(
road_length=rd_len, car_density=0.2, max_speed=4, prob_slow=prob_slw, lanes_number = lanes_no, name = "Yardley"
, traffic_lights_positions = traffic_pos)
sim4 = TrafficSimulation(
road_length=rd_len, car_density=0.6, max_speed=7, prob_slow=prob_slw, lanes_number = lanes_no, name = "Rosebery"
, traffic_lights_positions = traffic_pos)
for trail in range(trails):
blocks = intersection({"Margery" : sim1,
"Wilmington" : sim2,
"Yardley" : sim3,
"Rosebery" : sim4})
blocks.run_simulation(100)
current_strategy.append(blocks.average_traffic_flow())
#print(current_strategy)
plt.hist(current_strategy)
plt.xlim(0,0.03)
plt.show()
print(f"The mean of the current strategy {(np.mean(current_strategy))}")
print(f"The standard deviation of the current strategy {(np.std(current_strategy), 3)}")
print(f"Confidence Interval for the current strategy [{ (np.mean(current_strategy) - 1.96 * sts.sem(current_strategy),3)} ,{round (np.mean(current_strategy) + 1.96 * sts.sem(current_strategy),3)}]")
p_slows = [0, 0.25, 0.5]
car_densities = np.linspace(0, 1, 101)
trials = 50 # Number of times to repeat the simulation at each density
sim_flow_results = {} # The results go here
sim1 = TrafficSimulation(road_length=rd_len, car_density=0.5, max_speed=7, prob_slow=prob_slw, lanes_number = lanes_no, name = "Margery"
, traffic_lights_positions = traffic_pos)
sim2 = TrafficSimulation(
road_length=rd_len, car_density=0.3, max_speed=5, prob_slow=prob_slw, lanes_number = lanes_no, name = "Wilmington"
, traffic_lights_positions = traffic_pos)
sim3 = TrafficSimulation(
road_length=rd_len, car_density=0.2, max_speed=4, prob_slow=prob_slw, lanes_number = lanes_no, name = "Yardley"
, traffic_lights_positions = traffic_pos)
sim4 = TrafficSimulation(
road_length=rd_len, car_density=0.6, max_speed=7, prob_slow=prob_slw, lanes_number = lanes_no, name = "Rosebery"
, traffic_lights_positions = traffic_pos)
for p_slow in p_slows:
sim_flow_results[p_slow] = []
for density in car_densities:
flows = []
for trial in range(trials):
blocks = intersection({"Margery" : sim1, "Wilmington" : sim2, "Yardley" : sim3, "Rosebery" : sim4})
blocks.run_simulation(100)
# Record the final average traffic flow
flows.append(blocks.average_traffic_flow())
sim_flow_results[p_slow].append(np.mean(flows))
plt.figure(figsize=(8, 6))
for p, flow in sim_flow_results.items():
plt.plot(car_densities, flow, label=f'Simulation p = {p}')
plt.xlim(0, 1)
plt.ylim(0, 1)
plt.legend()
plt.show()
#The following function is adopted from session 9
def mfa(v, density, p_slow):
v_max = len(v) - 1
new_v = [0] * len(v) # The updated speed probability vector
# Probability of a car appearing in front of you
car_probabilities = [density] * (v_max + 1)
car_probabilities = [None] + [
(1-density)**(distance-1) * density for distance in range(1, v_max + 1)]
car_probabilities.append(1 - np.sum(car_probabilities[1:]))
for v_from in range(v_max + 1):
# Current speed, will be updated below
speed = v_from
# Accelerate
if speed < v_max:
speed += 1
# Brake when there is a car in front at each distance from 1 to speed
for distance in range(1, speed + 1):
# Probability that a car is in front at a particular distance
car_at_distance = car_probabilities[distance]
if distance > 1:
new_v[distance-1] += v[v_from] * car_at_distance * (1-p_slow)
new_v[distance-2] += v[v_from] * car_at_distance * p_slow
else:
new_v[distance-1] += v[v_from] * car_at_distance
# No cars in front up to distance == speed
no_cars = 1 - np.sum(car_probabilities[1:speed+1])
new_v[speed] += v[v_from] * no_cars * (1-p_slow)
new_v[speed-1] += v[v_from] * no_cars * p_slow
return new_v
# The following code is also adopted from session 9
def average_speed(v):
return np.sum(np.array(v) * np.arange(len(v)))
def average_flow(v, density):
return density * average_speed(v)
max_speed = 5
p_slows = [0, 0.25, 0.5]
car_densities = np.linspace(0, 1, 101)
mfa_flow_results = {} # map from p_slow to flow results for all densities
for p_slow in p_slows:
mfa_flow_results[p_slow] = []
for density in car_densities:
# Start from a uniform distribution over speeds
v = [1/(max_speed + 1)] * (max_speed + 1)
assert abs(sum(v) - 1) < 1e-6 # Sanity check before
for i in range(100): # Run until convergence
v = mfa(v, density, p_slow)
assert abs(sum(v) - 1) < 1e-6 # Sanity check after
mfa_flow_results[p_slow].append(average_flow(v, density))
plt.figure(figsize=(8, 6))
for p, flow in sim_flow_results.items():
#line1 = plt.plot(car_densities, flow, linestyle='-',
# label=f'Simulation p = {p}')
color = line1[0].get_color()
plt.plot(car_densities, mfa_flow_results[p], marker='.', linestyle='--',
color=color, label=f'MFA p = {p}')
plt.xlim(0, 1)
plt.ylim(0, 1)
plt.legend()
plt.show()