import numpy as np
import matplotlib.pyplot as plt
import math
import time
import matplotlib.animation as animation
from numpy.core.fromnumeric import size
def plant_delay(max_delta=3, K=1):
## Initialization
# Length of simulation (time steps)
simlen = 30
# Target
target = 0.0
# Create subplots
fig, axes = plt.subplots(2, 2, figsize=(8, 6))
## Simulation
for delta, ax in zip(range(0, max_delta+1), axes.flatten()):
y = np.zeros((simlen)) # Output
y[0] = 1 # Set first output
for t in range(simlen-1): # range(simlen-1)
# Compute output
u = K*(target - y[t-delta])
y[t+1] = 0.5*y[t] + 0.4*u # 1st order dynamics
## Plot
x = range(simlen)
ax.plot(x, y)
ax.set_xlabel('time step')
ax.set_ylabel('y')
ax.set_title(f'delay $\delta$ = {delta}, gain $K$ = {K}')
plt.tight_layout()
plt.show()
plant_delay(max_delta=3, K=2)
plant_delay(max_delta=3, K=3)
import numpy as np
import math
class SimulationFunctions:
def __init__(self,Var):
self.T = Var[0]
self.dt = Var[1]
self.L = Var[2]
self.kp = Var[3]
self.kd = Var[4]
self.le1 = Var[5]
self.le2 = Var[6]
self.m1 = Var[7]
self.m2 = Var[8]
self.g = Var[9]
def minjerk(self,init,final,t):
desired = np.zeros((2))
desired[0]=init[0]+(final[0]-init[0])*(10*(t/self.T)**3-15*(t/self.T)**4+6*(t/self.T)**5)
desired[1]=init[1]+(final[1]-init[1])*(10*(t/self.T)**3-15*(t/self.T)**4+6*(t/self.T)**5)
return desired
def invkinematics(self,position):
q=np.zeros((2,1))
q[1]=math.acos((sum(np.square(position))-(self.le1**2+self.le2**2))/(2*self.le1*self.le2))
q[0]=math.atan2(position[1], position[0])-q[1]/2
return q
def pdcontroller(self,desired_angle, delayed_angle, delayed_velocity):
torque = np.zeros((2))
torque[0]=self.kp*(desired_angle[0]-delayed_angle[0])+self.kd*(0-delayed_velocity[0])
torque[1]=self.kp*(desired_angle[1]-delayed_angle[1])+self.kd*(0-delayed_velocity[1])
return torque
def plant(self,ang,vel,acc,torque):
#calculate inertia
inertia = np.zeros((2))
inertia[0]=self.m1*(self.le1/2)**2
inertia[1]=self.m2*(self.le2/2)**2
# calculate load torques
t_load = np.zeros((2))
t_load[0]=(self.m1*self.g*(self.le1/2)*np.sin(ang[0])) + (acc[0]*(inertia[0]+inertia[1]+self.m2*self.le1*self.le2*np.cos(ang[1])+(self.m1*self.le1**2+self.m2*self.le2**2)/4+self.m2*self.le1**2)) + (acc[1]*(inertia[1]+(self.m2*self.le2**2)/4+(self.m2*self.le1*self.le2)/2*np.cos(ang[1]))) + ((-(self.m2*self.le1*self.le2)/2)*vel[1]**2) + -self.m2*self.le1*self.le2*vel[0]*vel[1]*np.sin(ang[1])
t_load[1]=(self.m2*self.g*(self.le2/2)*np.sin(ang[1])) + (acc[1]*(inertia[1]+(self.m1*self.le2**2)/4)) + (acc[0]*(inertia[1]+((self.m2*self.le1*self.le2)/2)*np.cos(ang[1])+(self.m2*self.le2**2)/4)+((self.m2*self.le1*self.le2)/2)*vel[0]**2*np.sin(ang[1]))
#calculate acceleration
centripetal_torque1=inertia[1]+((self.m2*self.le1*self.le2)/2)*np.cos(ang[1])+((self.m2*self.le2**2)/4)
centripetal_torque2=((self.m2*self.le1*self.le2)/2)*np.sin(ang[1])
coriolis_torque=self.m2*self.le1*self.le2*vel[0]*vel[1]*np.sin(ang[1])
inertial_component1=inertia[0]+inertia[1]+(self.m2*self.le1*self.le2*np.cos(ang[1]))+((self.m1*self.le1**2+self.m2*self.le2**2)/4)+self.m2*self.le1**2
inertial_component2=inertia[1]+((self.m1*self.le2**2)/4)
interaction_inertial_torque=inertia[1]+((self.m2*self.le2**2)/4)+((self.m2*self.le1*self.le2)/2)*np.cos(ang[1])
acc[0]=(torque[0]-(((vel[0]**2*centripetal_torque2)/inertial_component2)*centripetal_torque1+vel[1]**2*centripetal_torque2-coriolis_torque) / (inertial_component1-((interaction_inertial_torque/inertial_component2)*centripetal_torque1)))
acc[1]=(torque[1]-acc[0]*centripetal_torque1-vel[0]**2*centripetal_torque2)/inertial_component2
#calculate velocity
vel[0]=vel[0]+self.dt*acc[0]
vel[1]=vel[1]+self.dt*acc[1]
#calculate shoulder angle
ang[0]=ang[0]+self.dt*vel[0]
#calculate elbow angle
ang[1]=ang[1]+self.dt*vel[1]
return ang, vel, acc
def fkinematics(self,ang):
elbow_pos=[[self.le1*np.cos(ang[0])], [self.le1*np.sin(ang[0])]]
p1=[[np.cos(ang[0]), -np.sin(ang[0])], [np.sin(ang[0]), np.cos(ang[0])]]
p2=[[self.le2*np.cos(ang[1])], [self.le2*np.sin(ang[1])]]
wrist_pos=list(elbow_pos+np.matmul(np.array(p1),np.array(p2)))
return elbow_pos, wrist_pos
def reach_sim(sigma2: float=0., delta: int=0, kp: float=200.0, kd: float=11.0):
## Parameters
# Movement duration
T=.6
# Time step
dt=.01
# Simulation duration
L=6.0
# Proportional parameter
kp=kp
# Derivative parameter
kd=kd
# Upper arm length
le1=.3
# Lower arm length
le2=.3
# Upper arm mass
m1=3
# Lower arm mass
m2=3
# Gravity
g=-9.8
## Functions
Var = [T,dt,L,kp,kd,le1,le2,m1,m2,g]
Sim = SimulationFunctions(Var)
## Variables
# Joint angles [shoulder elbow]
ang=[-np.pi/4, np.pi]
ang_rec=np.zeros((int(L/dt+1),2))
delayed_ang=ang[:]
# Joint velocity [shoulder elbow]
vel=[0, 0]
vel_rec=np.zeros((int(L/dt+1),2))
delayed_vel=vel[:]
# Joint acceleration [shoulder elbow]
acc=[0, 0]
acc_rec=np.zeros((int(L/dt+1),2))
# Jerk [shoulder elbow]
jerk_rec=np.zeros((int(L/dt+1),2))
# Shoulder position
shoulder_pos=[0, 0]
# Elbow position
elbow_pos=[0, 0]
# Wrist position
wrist_pos=[0, 0]
wrist_pos_rec=np.zeros((int(L/dt+1),2))
# Initial wrist position for current movement
init_wrist_pos=wrist_pos
# Desired wrist position
final_wrist_pos=[[0.3, 0.0], [0.0, 0.0], [.3*np.cos(np.pi/4), .3*np.sin(np.pi/4)], [0.0, 0.0],
[0.0, .3], [0.0, 0.0], [.3*np.cos(3*np.pi/4), .3*np.sin(3*np.pi/4)], [0.0, 0.0]]
# Current target index
curr_target=0
# Movement start_time
start_t=0
## Simulation
## Reset variables
fig, ax = plt.subplots(1, 1, figsize=(12,8))
ax.set_xlabel('meters', fontsize=10)
ax.set_ylabel('meters', fontsize=10)
ax.set_xlim([-0.5, .5])
ax.set_ylim([-0.5, .5])
Time = time.time()
for t in np.arange(0,int(L),dt):
# Update records
ang_rec[round(t/dt)+1,:]=ang
vel_rec[round(t/dt)+1,:]=vel
acc_rec[round(t/dt)+1,:]=acc
if t>0:
jerk_rec[round(t/dt)+1,:]=acc-acc_rec[round(t/dt),:]
## Current wrist target
current_wrist_target=final_wrist_pos[curr_target][:]
if curr_target<=7:
## Planner
# Get desired position from planner
if t-start_t<T:
desired_pos=Sim.minjerk(init_wrist_pos, current_wrist_target, t-start_t)
## Inverse kinematics
# Get desired angle from inverse kinematics
desired_ang=np.real(Sim.invkinematics(desired_pos))
## Inverse dynamics
# Get desired torque from PD controller
## TO DO DEFINE NOISE
## TO DO ADD NOISE to torque
## TO DO Define delay for ang and vel
## TO DO Compute torque with delayed ang and vel
delta = delta
if round(t/dt)+1-delta > 0:
delayed_ang = ang_rec[round(t/dt)+1-delta, :]
delayed_vel = vel_rec[round(t/dt)+1-delta, :]
desired_torque=Sim.pdcontroller(desired_ang, delayed_ang, delayed_vel)
noise = np.sqrt(sigma2)*np.random.randn(*desired_torque.shape)
desired_torque += noise
## Forward dynamics
# Pass torque to plant
[ang,vel,acc]= Sim.plant(ang,vel,acc,desired_torque)
## Forward kinematics
# Calculate new joint positions
[elbow_pos, wrist_pos] = Sim.fkinematics(ang)
# Record wrist position
wrist_pos_rec[round(t/dt)+1,:]=wrist_pos
## Next target
if (t-start_t>=T+.02) & (curr_target<7):
curr_target=curr_target+1
init_wrist_pos=wrist_pos
start_t=t
## Plot arm, wrist path, and targets -- ANIMATION
#ax.cla()
#ax.scatter(np.array(final_wrist_pos)[:,0], np.array(final_wrist_pos)[:,1], color='green')
#ax1.scatter(np.array(final_wrist_pos)[:,0], np.array(final_wrist_pos)[:,1], color='green')
#ax.plot([shoulder_pos[0], elbow_pos[0][0]], [shoulder_pos[1], elbow_pos[1][0]], color='blue')
#ax.plot([elbow_pos[0][0], wrist_pos[0]], [elbow_pos[1][0], wrist_pos[1]], color= 'blue')
#plt.pause(0.01)
# # plt.tight_layout()
#for t2 in np.arange(dt,t,dt):
# # ax1.cla()
#ax.plot(wrist_pos_rec[:round(t2/dt),0], wrist_pos_rec[:round(t2/dt),1],'--',color='red',linewidth=0.5)
# ax.plot([wrist_pos_rec[round(t2/dt),0], wrist_pos_rec[round(t2/dt)+1,0]], [wrist_pos_rec[round(t2/dt),1], wrist_pos_rec[round(t2/dt)+1,1]],color='red',linewidth=0.5)
# plt.show(block=False)
#plt.pause(0.01)
# ax.cla()
#ax.autoscale_view()
elapsed = time.time() - Time
print("Time elapsed:",elapsed)
ax.plot(wrist_pos_rec[:-1,0], wrist_pos_rec[:-1,1], '--', color='red', linewidth=0.5)
ax.scatter(np.array(final_wrist_pos)[:,0], np.array(final_wrist_pos)[:,1], color='green')
ax.set_title(f'Simulation with variables: $\sigma^2$ = {sigma2}, $\delta$ = {delta}, kp = {kp}, kd = {kd}', size=16)
plt.show()
plt.figure(figsize=(6, 9))
plt.subplot(3,1,1)
[A,B] = plt.plot(np.arange(0,L-dt,dt), [xx[0] for xx in vel_rec[:int(L/dt)-1]], np.arange(0,L-dt,dt), [xx[1] for xx in vel_rec[:int(L/dt)-1]])
plt.legend([A,B],['Shoulder','Elbow'])
plt.xlabel('time (ms)')
plt.ylabel('velocity')
plt.subplot(3,1,2)
[A,B]= plt.plot(np.arange(0,L-dt,dt), [xx[0] for xx in acc_rec[:int(L/dt)-1]],np.arange(0,L-dt,dt), [xx[1] for xx in acc_rec[:int(L/dt)-1]])
plt.legend([A,B],['Shoulder','Elbow'])
plt.xlabel('time (ms)')
plt.ylabel('acceleration')
plt.subplot(3,1,3)
[A,B] = plt.plot(np.arange(0,L-dt,dt), [xx[0] for xx in jerk_rec[:int(L/dt)-1]],np.arange(0,L-dt,dt), [xx[1] for xx in jerk_rec[:int(L/dt)-1]])
plt.legend([A,B], ['Shoulder','Elbow'])
plt.xlabel('time (ms)')
plt.ylabel('jerk')
plt.tight_layout()
plt.show()
reach_sim()
reach_sim(sigma2=10)
reach_sim(delta=1)
reach_sim(delta=2)
reach_sim(delta=1, kp=50, kd=5)
reach_sim(delta=2, kp=50, kd=5)
reach_sim(delta=3, kp=50, kd=5)
reach_sim(delta=4, kp=50, kd=5)
def fb_ff_control(Kp_forward=5, Kd_forward=0.3, pert_amp=1, delay=0, Kp_feedback=5, Kd_feedback=0.3):
## Initialization
# Mass of the arm
m = 1
# Length of the arm
arm_length = 0.3
# Inertia
I = 1/3 * m *(arm_length)**2
# Gravity
g = 9.81
# Damping coefficient for the forward model
dampCoeff =0.5
# Sensory delay (s)
delay = delay
# Simulation time step (s)
dt = 0.001
# Simulation duration (s)
max_time = 2
tvec = np.arange(0, max_time, dt)
# Time to start the movement (s)
mvt_start_time = 0.5
# Delayed versions of joint angle and velocity
delay_index = int(delay/dt)
delayed_ang = np.zeros(delay_index+1)
delayed_vel = np.zeros(delay_index+1)
# Plant - current actual joint angle and velocity
ang_actual = 0
vel_actual = 0
# Forward model - estimated current joint angle and velocity
ang_estimated = 0
vel_estimated = 0
# Target joint angle
final_target_ang = np.pi/2
# History of joint angle, delayed joint angle, and torque
ang_history = []
ang_est_history = []
ang_del_history = []
ang_target_history = []
torque_history = []
pert_history = []
# Perturbation start time (s)
pert_start_time = 1.2
# Perturbation end time (s)
pert_end_time = 1.5
# Perturbation amplitude
pert_amp = pert_amp
# Gain and damping parameters for forward model
Kp_forward = Kp_forward
Kd_forward = Kd_forward
# Gain and damping parameters for feedback model
Kp_feedback = Kp_feedback
Kd_feedback = Kd_feedback
## Simulation
for t in tvec:
# Set the desired joint angle once the movement start time is reached
if t < mvt_start_time:
target_ang = 0
else:
target_ang = final_target_ang
# Forward model torque
forward_torque = Kp_forward * (target_ang - ang_estimated) + Kd_forward * ( - vel_estimated)
# Feedback torque delayed
feedback_torque = Kp_feedback * (target_ang - delayed_ang[delay_index]) + Kd_feedback * ( - delayed_vel[delay_index])
# Total torque
total_torque=forward_torque + feedback_torque
# Forward model of arm_dynamics
acc_estimated = (total_torque - dampCoeff*vel_estimated ) / I
vel_estimated = vel_estimated + dt * acc_estimated
ang_estimated = ang_estimated + dt * vel_estimated
# Apply perturbation
if ((t > pert_start_time) and (t < pert_end_time)):
pert = pert_amp
else:
pert = 0
# Plant - forward model of the arm
acc = (total_torque - dampCoeff*vel_actual + pert) / I
vel_actual = vel_actual + dt * acc
ang_actual = ang_actual + dt * vel_actual
# Update delayed copy of joint angle and velocity
for i in range(delay_index-1, -1, -1):
delayed_ang[i+1] = delayed_ang[i]
delayed_vel[i+1] = delayed_vel[i]
delayed_ang[0] = ang_actual
delayed_vel[0] = vel_actual
# Update histories
ang_history.append(ang_actual)
ang_est_history.append(ang_estimated)
ang_del_history.append(delayed_ang[delay_index])
torque_history.append(total_torque)
ang_target_history.append(target_ang)
pert_history.append(pert)
f, (ax1, ax2) = plt.subplots(2, 1, sharex=True)
ax1.plot(tvec, ang_history, label='angle')
if delay != 0:
ax1.plot(tvec, ang_del_history, 'k', label='delayed angle')
if Kp_forward != 0:
ax1.plot(tvec, ang_est_history, 'g', label='estimated angle')
ax1.plot(tvec, ang_target_history, '--', label='target angle')
ax1.plot(tvec, pert_history, 'r', label='perturbation')
ax1.set_ylabel('angle')
ax1.legend()
ax2.plot(tvec, torque_history)
ax2.set_xlabel('time')
ax2.set_ylabel('motor command')
plt.show()
for delay in np.linspace(0,0.125,5):
fb_ff_control(Kp_forward=0, Kd_forward=0, pert_amp=0, delay=delay)
fb_ff_control(Kp_forward=0, Kd_forward=0, pert_amp=1, delay=0)
fb_ff_control(Kp_feedback=0, Kd_feedback=0, pert_amp=0, delay=0.125)
fb_ff_control(Kp_feedback=0, Kd_feedback=0, pert_amp=1, delay=0)
def smith_predictor(alpha=1e-5, delay_estimate=10):
# time parameters
dt = 0.005
max_time = 2
mvt_start_time = 0.5
# arm model
m = 1
arm_length = 0.3
I = m *(arm_length/2)**2
dampCoeff =0.5
# controller parameters
Kp = 20
Kv = 2
desired_end_pos = np.pi/2
# learning model parameters
w1 = 0
w2 = 0
# learning rate
alpha = alpha
# weight histories
w1_hist=[]
w2_hist=[]
delay_true = 10 # time steps
# TODO: change delay_estimate
delay_estimate = delay_estimate
n_trials = 40
f, (ax1, ax2, ax3) = plt.subplots(3, 2, sharex=True, num=1, figsize=(10, 7))
# learning loop
for i in range(n_trials):
vel = 0
pos = 0
vel_estimated = 0
pos_estimated = 0
acc_delayed = np.zeros(delay_true+1)
vel_delayed = np.zeros(delay_true+1)
pos_delayed = np.zeros(delay_true+1)
feedback_torque_delayed = np.zeros(delay_estimate+1)
acc_estimated_delayed = np.zeros(delay_estimate+1)
vel_estimated_delayed = np.zeros(delay_estimate+1)
pos_estimated_delayed = np.zeros(delay_estimate+1)
index_time =0
# movement loop
tvec = np.arange(0, max_time, dt)
acc_estim_plot = np.zeros(tvec.size)
acc_plot = np.zeros(tvec.size)
vel_estim_plot = np.zeros(tvec.size)
vel_plot = np.zeros(tvec.size)
pos_estim_plot = np.zeros(tvec.size)
pos_plot = np.zeros(tvec.size)
torque_plot = np.zeros(tvec.size)
for t in tvec:
for k in range(delay_true-1, -1, -1):
acc_delayed[k+1] = acc_delayed[k]
vel_delayed[k+1] = vel_delayed[k]
pos_delayed[k+1] = pos_delayed[k]
for k in range(delay_estimate-1, -1, -1):
acc_estimated_delayed[k+1] = acc_estimated_delayed[k]
vel_estimated_delayed[k+1] = vel_estimated_delayed[k]
pos_estimated_delayed[k+1] = pos_estimated_delayed[k]
feedback_torque_delayed[k+1] = feedback_torque_delayed[k]
if t < mvt_start_time:
desired_pos = 0
else:
desired_pos = desired_end_pos
# feedback torque
pos_err = (desired_pos - pos_estimated) - pos_delayed[-1] + pos_estimated_delayed[-1]
vel_err = (- vel_estimated) - vel_delayed[-1] + vel_estimated_delayed[-1]
feedback_torque = Kp * pos_err + Kv * vel_err
feedback_torque_delayed[0] = feedback_torque
# arm forward dynamics
acc = (feedback_torque - dampCoeff*vel) / I
vel = vel + dt * acc
pos = pos + dt * vel
acc_delayed[0] = acc
vel_delayed[0] = vel
pos_delayed[0] = pos
#forward model of arm_dynamics
acc_estimated = w1*feedback_torque + w2*vel_estimated
vel_estimated = vel_estimated + dt * acc_estimated
pos_estimated = pos_estimated + dt * vel_estimated
acc_estimated_delayed[0] = acc_estimated
vel_estimated_delayed[0] = vel_estimated
pos_estimated_delayed[0] = pos_estimated
# learning the weights with gradient descent
error = (acc_delayed[-1] - acc_estimated_delayed[-1])
w1 = w1 + alpha*error*feedback_torque_delayed[-1]
w2 = w2 + alpha*error*vel_estimated_delayed[-1]
# plotting
acc_estim_plot[index_time] = acc_estimated
acc_plot[index_time] = acc
vel_estim_plot[index_time] = vel_estimated
vel_plot[index_time] = vel
pos_estim_plot[index_time] = pos_estimated
pos_plot[index_time] = pos
torque_plot[index_time] = feedback_torque
index_time = index_time + 1
if i==1:
# f, (ax1, ax2, ax3) = plt.subplots(3, 1, sharex=True, num=1)
ax1[0].plot(tvec, pos_plot, label='pos')
ax1[0].set_xlabel('time')
ax1[0].set_ylabel('pos')
ax1[0].plot(tvec, pos_estim_plot, 'k', label='estimated pos')
ax1[0].legend()
ax1[0].title.set_text('First trial')
ax2[0].plot(tvec, vel_plot, label='vel')
ax2[0].set_xlabel('time')
ax2[0].set_ylabel('vel')
ax2[0].plot(tvec, vel_estim_plot, 'k', label='estimated vel')
ax2[0].legend()
ax3[0].plot(tvec, acc_plot, label='acc')
ax3[0].set_xlabel('time')
ax3[0].set_ylabel('acc')
ax3[0].plot(tvec, acc_estim_plot, 'k', label='estimated acc')
ax3[0].legend()
w1_hist.append(w1)
w2_hist.append(w2)
# f, (ax1, ax2, ax3) = plt.subplots(3, 1, sharex=True, num=2)
ax1[1].plot(tvec, pos_plot, label='pos')
ax1[1].set_xlabel('time')
ax1[1].set_ylabel('pos')
ax1[1].plot(tvec, pos_estim_plot, 'k', label='estimated pos')
ax1[1].legend()
ax1[1].title.set_text('Last trial')
ax2[1].plot(tvec, vel_plot, label='vel')
ax2[1].set_xlabel('time')
ax2[1].set_ylabel('vel')
ax2[1].plot(tvec, vel_estim_plot, 'k', label='estimated vel')
ax2[1].legend()
ax3[1].plot(tvec, acc_plot, label='acc')
ax3[1].set_xlabel('time')
ax3[1].set_ylabel('acc')
ax3[1].plot(tvec, acc_estim_plot, 'k', label='estimated acc')
ax3[1].legend()
## TODO: Plot change in weights
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 3))
ax1.plot(list(range(n_trials)), w1_hist)
ax1.set_title('Weight 1 history')
ax1.set_xlabel('n trial')
ax1.set_ylabel('weight value')
ax2.plot(list(range(n_trials)), w2_hist)
ax2.set_title('Weight 2 history')
ax2.set_xlabel('n trial')
ax2.set_ylabel('weight value')
plt.tight_layout()
plt.show()
alphas = [1e-5, 5e-5, 1e-4, 1.5e-4, 2.2e-4]
for a in alphas:
print(f'Alpha is: {a}')
smith_predictor(a)
smith_predictor(alpha=1e-4, delay_estimate=9)
smith_predictor(alpha=1e-4, delay_estimate=11)
xmin = 0
xmax = 2*np.pi
N = 5
mu = np.linspace(xmin, xmax, N)
n_plot = 10000
xplot = np.linspace(0, 2*np.pi, n_plot)
fig, axes = plt.subplots(2, 2, figsize=(10, 7))
sigmas = [0.01, 0.5, 1, 10]
for i, (ax, sigma) in enumerate(zip(axes.flatten(), sigmas)):
for i in range(N):
phi = np.exp(-(xplot-mu[i])**2/sigma**2)
ax.plot(xplot, phi, label='RF ' + str(i))
ax.set_title(f"$\sigma$ = {sigma}", size=16)
ax.set_xlabel('x', size=14)
ax.set_ylabel('$\phi$', size=14)
ax.legend(loc='center left', bbox_to_anchor=(1, 0.5))
plt.tight_layout()
plt.show()
w = np.random.normal(loc=0.0, scale=0.1,size=N)
y = np.zeros(n_plot)
for i in range(N):
y += w[i]*np.exp(-(xplot-mu[i])**2/sigma**2)
plt.figure()
plt.plot(xplot, y)
plt.show()
def CMAC_sin(N,sigma,n_examples, randomize_sample_order=False, n_epochs=100):
np.random.seed(42)
xmin = 0
xmax = 2*np.pi
mu = np.linspace(xmin, xmax, N)
w = np.random.normal(loc=0.0, scale=0.1,size=N)
beta = 1e-3
n_epochs = n_epochs
x = np.linspace(0, 2*np.pi, n_examples)
y = np.sin(x)
if randomize_sample_order:
shuffler = np.random.permutation(len(x))
x = x[shuffler]
y = y[shuffler]
e_vec = []
for _ in range(n_epochs): # in each epoch
es = []
for k in range(x.shape[0]): # for every example
phi = np.exp(-(x[k]-mu)**2/sigma**2) # for all receptive fields at once
yhat = np.dot(w, phi)
e = y[k] - yhat
es.append(e**2)
w += beta*e*phi # for all weights
e_vec.append(np.mean(np.array(es)))
n_plot = 10000
xplot = np.linspace(0, 2*np.pi, n_plot)
y = np.zeros(n_plot)
for i in range(N):
y += w[i]*np.exp(-(xplot-mu[i])**2/sigma**2)
fig, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(11,4))
fig.suptitle("N=" + str(N) + ", $\sigma=$" + str(sigma) + ", n_examples=" + str(n_examples))
ax1.plot(xplot, y, label='CMAC')
ax1.plot(xplot, np.sin(xplot), label='Target')
ax1.set_title("CMAC vs sin")
ax1.set_xlabel('x')
ax1.set_ylabel('y')
ax1.legend()
ax2.plot(e_vec)
ax2.set_title("MSE")
ax2.set_yscale('log')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('MSE')
plt.tight_layout()
plt.show()
for sigma in [0.5,1,1.25,1.5,2,5]:
CMAC_sin(5,sigma,100)
CMAC_sin(5,1.25,100, n_epochs=800)
for n_examples in [10,100,500,1000,10000]:
CMAC_sin(5,1.25,n_examples, randomize_sample_order=True)
for N_receptive_fields in [5,10,100,500,2500]:
CMAC_sin(N_receptive_fields,1.25,1000, randomize_sample_order=False)
def GaussianBasisFunction(x, mu, sigma):
return np.exp(-(x-mu)**2/(sigma**2))
class CMAC:
def __init__(self, n_rfs, xmin, xmax, beta=1e-3):
""" Initialize the basis function parameters and output weights """
self.n_rfs = n_rfs
self.mu = np.zeros((2, self.n_rfs))
self.sigma = np.zeros(2)
crossval = 0.8 # has to be between 0 and 1 !
for k in range(2):
self.sigma[k] = 0.5/np.sqrt(-np.log(crossval)) * (xmax[k] - xmin[k])/(self.n_rfs-1) # RFs cross at phi = crossval
self.mu[k] = np.linspace(xmin[k], xmax[k], self.n_rfs)
self.w = np.random.normal(loc=0.0, scale=0.2, size=(self.n_rfs, self.n_rfs))
self.beta = beta
self.B = None
self.y = None
def predict(self, x):
""" Predict yhat given x
Saves activations `B` for later weight update
"""
phi = np.zeros((2, self.n_rfs))
for k in range(2):
phi[k] = GaussianBasisFunction(x[k], self.mu[k], self.sigma[k]) # for i in phi_ki at the same time
self.B = np.zeros((self.n_rfs, self.n_rfs))
for i in range(self.n_rfs):
for j in range(self.n_rfs):
self.B[i,j] = phi[0][i] * phi[1][j]
yhat = np.dot(self.w.ravel(), self.B.ravel()) # Element-wise multiplication and summing of all elements
return yhat
def learn(self, e):
"""
Update the weights using the covariance learning rule
For all weights at once.
"""
self.w += self.beta*e*self.B
class SingleLink:
def __init__(self, dt):
self.theta = 0
self.omega = 0
self.m = 0.25 # link mass
self.l = 1.0 # link lengths
self.I = 1/3*self.m*self.l**2 # link inertia, rod
self.b = 5 # link
self.g_const = 9.82 # m/s^2
self.sim_step = 1e-3 # time step for Euler integration
if dt < self.sim_step:
self.sim_step = dt
self.dt = dt # time step of step function
def step(self, tau):
t = 0
while t < self.dt:
alpha = (tau - self.b*self.omega - self.g) / self.I
self.theta += self.omega*self.sim_step
self.omega += alpha*self.sim_step
t += self.sim_step
@property
def g(self):
return self.m*self.g_const*(0.5*self.l)*np.cos(self.theta)
def CMAC_control(Kp: float=20., Kv: float=2., A: float=np.pi, T: int=5, n_trials: int=1, plot_error: bool=False,
cmac: CMAC=None, cmac_learn: bool=False, sine_ref: bool=True):
## Initialize simulation
Ts = 1e-2
T_end = 10 # in one trial
n_steps = int(T_end/Ts) # in one trial
n_trials = n_trials
plant = SingleLink(Ts)
## Logging variables
t_vec = np.array([Ts*i for i in range(n_steps*n_trials)])
theta_vec = np.zeros(n_steps*n_trials)
theta_ref_vec = np.zeros(n_steps*n_trials)
## Feedback controller variables
Kp = Kp
Kv = Kv
## TODO: Define parameters for periodic reference trajectory
A = A
T = T
## TODO: CMAC initialization
n_rfs = 10
if cmac:
c = cmac
## Simulation loop
for i in range(n_steps*n_trials):
t = i*Ts
## TODO: Calculate the reference at this time step
if sine_ref:
theta_ref = A * np.sin(2 * np.pi * t/T)
else:
theta_ref = np.pi/4
# Measure
theta = plant.theta
omega = plant.omega
# Feedback controler
error = (theta_ref - theta)
tau_m = Kp * error + Kv* (-omega)
## TODO: Implement the CMAC controller into the loop
if cmac:
tau_cmac = c.predict([theta, theta_ref])
else:
tau_cmac = 0
tau = tau_m + tau_cmac
# Iterate simulation dynamics
plant.step(tau)
if cmac_learn:
c.learn(tau_m)
theta_vec[i] = plant.theta
theta_ref_vec[i] = theta_ref
## Plotting
plt.plot(t_vec, theta_vec, label='theta')
plt.plot(t_vec, theta_ref_vec, '--', label='reference')
plt.xlim(0, min(t_vec[-1], 50))
plt.legend()
if plot_error:
## Plot trial error
error_vec = theta_ref_vec - theta_vec
l = int(T/Ts)
trial_error = np.zeros(n_trials)
for t in range(n_trials):
trial_error[t] = np.sqrt(np.mean(error_vec[t*l:(t+1)*l]**2 ))
plt.figure()
plt.plot(trial_error)
plt.xlabel('n_trials')
plt.ylabel('MSE')
plt.show()
CMAC_control(Kp=2, Kv=0, sine_ref=False)
CMAC_control(Kp=20, Kv=2, sine_ref=False)
CMAC_control(Kp=20, Kv=2, A=np.pi, T=5, n_trials=1)
CMAC_control(Kp=20, Kv=2, A=np.pi, T=5, n_trials=5)
Amp = np.pi # amplitude
xmin = [-Amp, -Amp]
xmax = [Amp, Amp]
cmac = CMAC(n_rfs=10, xmin=xmin, xmax=xmax, beta=1.5e-3)
CMAC_control(Kp=20, Kv=2, A=Amp, T=5, n_trials=5, cmac=cmac, cmac_learn=True)
betas = [1e-3, 5e-3, 1e-2, 1e-1]
Amp = np.pi # amplitude
xmin = [-Amp, -Amp]
xmax = [Amp, Amp]
for b in betas:
print(f'Using a learning rate of beta = {b}')
cmac = CMAC(n_rfs=25, xmin=xmin, xmax=xmax, beta=b)
CMAC_control(Kp=20, Kv=2, A=Amp, T=5, n_trials=100, cmac=cmac, cmac_learn=True, plot_error=True)
CMAC_control(Kp=20, Kv=2, A=Amp, T=4, n_trials=1, cmac=cmac, cmac_learn=False)
CMAC_control(Kp=20, Kv=2, A=Amp, T=3, n_trials=1, cmac=cmac, cmac_learn=False)
CMAC_control(Kp=20, Kv=2, A=Amp, T=1, n_trials=1, cmac=cmac, cmac_learn=False)
class FirstOrderBases:
'''
First order low pass filter
Transfer function: G = 1 / ( tau*s + 1 )
'''
def __init__(self, dt, tau):
self.ky1 = np.exp(-dt/tau)
self.ku1 = 1-self.ky1
self.reset()
def step(self, input_signal):
self.value = self.ky1 * self.y1 + self.ku1 * self.u1
self.y1 = self.value
self.u1 = input_signal
def reset(self):
self.value = 0
self.y1 = 0
self.u1 = 0
class DoubleFirstOrderBases:
def __init__(self, dt, tau1, tau2=None):
self.dt = dt
if tau2 is None: # same time constant
self.y1 = FirstOrderBases(dt, tau1)
self.y2 = FirstOrderBases(dt, tau1)
else:
self.y1 = FirstOrderBases(dt, tau1)
self.y2 = FirstOrderBases(dt, tau2)
self.value = 0
def step(self, input_signal):
self.y1.step(input_signal)
self.y2.step(self.y1.value)
self.value = self.y2.value
def reset(self):
self.y1.reset()
self.y2.reset()
class MultiInputSecondOrderBases:
def __init__(self, dt, n_inputs, tau_r, tau_d):
self.n_inputs = n_inputs
self.n_bases = len(tau_r)
self.tau_r = np.tile(tau_r, n_inputs) # repeats the whole array a number of times
self.tau_d = np.tile(tau_d, n_inputs)
self.p = DoubleFirstOrderBases(dt, self.tau_r, self.tau_d)
def step(self, input_signal):
# input signal has length num_inputs
# p's have length num_inputs*num_bases
# Repeat each element of input signal 'num_bases' times
input_repeated = np.repeat(input_signal, self.n_bases)
self.p.step(input_repeated)
def reset(self):
self.p.reset()
@property
def value(self):
return self.p.value
class AdaptiveFilterCerebellum:
def __init__(self, dt, n_inputs, n_outputs, num_bases, beta):
self.dt = dt
self.beta = beta
self.n_inputs = n_inputs
self.n_outputs = n_outputs
# Initialize cortical bases
self.tau_r = np.random.uniform(low=2.0, high=50.0, size=num_bases)*1e-3 # ms
self.tau_d = np.random.uniform(low=50.0, high=750.0, size=num_bases)*1e-3 # ms
self.p = MultiInputSecondOrderBases(dt, n_inputs, self.tau_r, self.tau_d)
# Weights
self.weights = np.zeros((n_inputs*num_bases, n_outputs))
# Signals
self.C = np.zeros(n_outputs)
self.internal_error = np.zeros(n_outputs)
def step(self, x, error):
# Gives p_r, p_d and p at time t
self.p.step(x)
# Output of microcircuit
self.C = np.dot(self.weights.T, self.p.value)
# Update error signal and weights
self._update_weights(error) # update before or after calc?
return self.C
def _update_weights(self, error):
self.weights += self.beta * np.outer(self.p.value, error)
@property
def output(self):
return self.C
Ts = 1e-3
n_inputs = 1
n_outputs = 1
all_n_bases = [10, 15, 20, 30, 50]
def Cerebellum_test(n_bases: int=10, T: float=5.0):
beta = 2e-3*Ts/n_bases
print(f'Trying with {n_bases} bases:')
c = AdaptiveFilterCerebellum(Ts, n_inputs, n_outputs, n_bases, beta)
## TODO: Paste your experiment code from exercise 2.6
## Initialize simulation
T_end = 10 # in one trial
n_steps = int(T_end/Ts) # in one trial
n_trials = 80
plant = SingleLink(Ts)
## Logging variables
t_vec = np.array([Ts*i for i in range(n_steps*n_trials)])
theta_vec = np.zeros(n_steps*n_trials)
theta_ref_vec = np.zeros(n_steps*n_trials)
## Feedback controller variables
Kp = 20
Kv = 2
## TODO: Define parameters for periodic reference trajectory
A = np.pi
T = 5
u=0
## Simulation loop
for i in range(n_steps*n_trials):
t = i*Ts
## TODO: Calculate the reference at this time step
theta_ref = A * np.sin(2 * np.pi * t/T)
# Measure
theta = plant.theta
omega = plant.omega
e = (theta_ref - theta)
c.step(u, e)
e_fb = e + c.output
# Feedback controller
u = Kp * e_fb + Kv* (-omega)
# Iterate simulation dynamics
plant.step(u)
theta_vec[i] = plant.theta
theta_ref_vec[i] = theta_ref
## TODO: Plot results
fig, (ax1, ax2) = plt.subplots(2, 2, figsize=(12, 8))
ax1[0].plot(t_vec, theta_vec, label='theta')
ax1[0].plot(t_vec, theta_ref_vec, '--', label='reference')
ax1[0].legend()
ax1[0].set_title('AdaptiveFilterCMAC for all trials', size=16)
ax1[1].plot(t_vec, theta_vec, label='theta')
ax1[1].plot(t_vec, theta_ref_vec, '--', label='reference')
ax1[1].set_xlim(0, min(t_vec[-1], 200))
ax1[1].legend()
ax1[1].set_title('AdaptiveFilterCMAC for 20 first trials', size=16)
## Plot trial error
error_vec = theta_ref_vec - theta_vec
l = int(T/Ts)
trial_error = np.zeros(n_trials)
for t in range(n_trials):
trial_error[t] = np.sqrt(np.mean(error_vec[t*l:(t+1)*l]**2 ))
print(f'Lowest MSE for {n_bases} bases: {round(np.min(trial_error), 7)}')
ax2[0].plot(trial_error)
ax2[0].set_xlabel('n_trials')
ax2[0].set_ylabel('MSE')
ax2[0].set_title('MSE with regular y-axis', size=16)
ax2[1].plot(trial_error)
ax2[1].set_xlabel('n_trials')
ax2[1].set_ylabel('MSE')
ax2[1].set_yscale('log')
ax2[1].set_title('MSE with log y-axis', size=16)
plt.tight_layout()
plt.show()
for n_bases in all_n_bases:
Cerebellum_test(n_bases=n_bases)
import pandas as pd
pd.DataFrame({'Name': ['Andrea Vallone', 'Ewa Rusiecka', 'Stefan Petrovic'], 'Contribution': ['30%', '35%', '35%']})