class Rollout():
def __init__(self,init_s,Mod=None,p=None,given_actions=None,remap_angle=False):
self.init_s = init_s
self.Mod = Mod
self.p = p
self.given_actions = given_actions
self.remap_angle = remap_angle
self.max_force = 20
def rollout(self,t_max,sn=None):
control = False if self.p is None else True
given = False if self.given_actions is None else True
predict = False if self.Mod is None else True
c = CartPole.CartPole()
c.setState(self.init_s)
Y = np.zeros((t_max,4))
L = np.zeros(t_max)
if given:
A = self.given_actions
else:
A = np.zeros(t_max)
if sn == 'I':
S = 4
p = 0.
self.M = [np.ones(t_max) for s in range(S)]
T = list(range(t_max))
n = int(p*t_max)
idx = random.sample(T,n)
self.M[idx] = 0
for t in range(0,t_max):
current_state = c.getState()
if sn is not None:
current_state = stateNoise(current_state,sn)
if sn == 'I':
if self.M == 0:
#pick random state to cut off
s1 = random.randint(4)
current_state[s1] = 0
Y[t] = current_state #get current Y
L[t] = c.loss() #get current loss
if control and not given: #get next action
a = np.matmul(current_state,self.p)
a = self.max_force * np.tanh(a/self.max_force)
A[t] = a
# perform update depending on pred or not
if predict:
if control or given:
#model should be trained on actions
assert np.shape(self.Mod.trainingX)[1] == 5
current_state = np.append(current_state,A[t]) #add action to state
pred_change = self.Mod.predictChange([current_state])[0]
c.setState(current_state[0:4]+pred_change)
else:
if control or given:
c.performAction(A[t])
else:
c.performAction()
if self.remap_angle:
c.remap_angle() #remap angle
self.T = list(range(t_max))
self.Y = Y
self.L = L
self.A = A
class NonLinMod:
def __init__(self,N,M,X_S,actions=None,snr=None):
alpha, centroids, sigma = self.trainNew(N,M,X_S,snr,actions)
self.centroids = centroids
self.sigma = sigma
self.N = N
self.M = M
self.X_S = X_S
self.alpha = alpha
def K(self,X,X0,sigma): #returning k for two x points
S = len(sigma)
ex = sum([((X[j]-X0[j])**2)/(2 * sigma[j]**2) for j in [0,1,3]]) #linear squared for j=0,1,3
ex += (np.sin((X[2]-X0[2])/2)**2)/(2 * sigma[2]**2) #sin for j = 2
if S == 5:
ex += ((X[4]-X0[4])**2)/(2 * sigma[4]**2)
return np.exp(-ex) #sum and exponent
def getSigma(self,targ_Y,S):
force_sigma = 1
sigma = [np.std(targ_Y[:,s]) for s in range(4)]
if S == 5: # add force SD
sigma.append(force_sigma)
return sigma
def getBasisIndices(self,X,M): #return random Xs to be used as bases
return np.random.randint(np.size(X, axis=0), size=M)
def getKNM(self,X,centroids,sigma): #get KNM from X, centroids and SDs
M, S = np.shape(centroids)
N, S = np.shape(X)
KNM = np.zeros((N,M))
for i1 in range(N):
for i2 in range(M):
KNM[i1][i2] = self.K(X[i1],centroids[i2],sigma)
return KNM
def getAlpha(self,KNM,targ_Y,ld,idx): # perform regression
N,M = np.shape(KNM)
KMN = np.transpose(KNM)
KMM = KNM[idx] #select the rows of KNM which correspond to bases
a = np.linalg.lstsq(np.matmul(KMN,KNM) + ld*KMM , KMN, rcond=None)[0] # compute regression
alpha = np.matmul(a,targ_Y) #multiply by each y column to get weights
return alpha
def trainNew(self,N,M,S,snr,given_actions):
X = getTrainingX(N,S)
if S == 4 and given_actions is None:
#if given_actions is None: # train with just 4 states
targ_Y = getChanges(X)[0]
else: #train with scanned action included
targ_Y = getChanges(X[:,0:4],actions=X[:,4])[0]
if snr is not None:
targ_Y = getChanges(X[:,0:4],snr=snr)
sigma = self.getSigma(targ_Y,S)
idx = self.getBasisIndices(X,M)
centroids = X[idx, :]
KNM = self.getKNM(X,centroids,sigma) #get KNM for model
alpha = self.getAlpha(KNM,targ_Y,ld,idx) #regression
self.trainingX = X
return alpha, centroids, sigma
def predictChange(self,test_X): # predict y for N novel X points from alpha and centroids
N, S1 = np.shape(test_X)
M, S2 = np.shape(self.centroids)
assert S1 == S2 #otherwise model not trained on right data
KNM_test = np.zeros((N,M)) #get new KNM
for i1 in range(N):
for i2 in range(M):
KNM_test[i1][i2] = self.K(test_X[i1],self.centroids[i2],self.sigma)
pred_Y = np.zeros((N,4))
for s in range(4):
pred_Y[:,s] = np.matmul(KNM_test,self.alpha[:,s])
return pred_Y
class NonLinPol:
def __init__(self,N, M, W, weights,rbe=False,obe=False):
self.N = N
self.M = M
self.S = 4
self.training_X = self.getTrainingX(N,self.S)
self.targ_Y = getChanges(self.training_X)[0]
self.sigma = self.getSigma(self.targ_Y,self.S)
self.idx = self.getRandomIndices(self.training_X,self.M)
self.centroids = self.training_X[self.idx, :]
self.weights = weights
self.W = W
self.rbe = rbe
self.obe = obe
def getTrainingX(self,N,S):
randomAll = False
if randomAll:
return getTrainingX(self.N,self.S)
else: # smaller range of samples around unstable eq
scale = 0.1
return randStates(N,S,scale=scale)
def getPolicy(self,X): #return policy for stat
S = np.shape(X)[0]
W = self.W
M = self.M
weights = self.weights
p = np.zeros((M,S))
#for each basis function
for i in range(M):
C = self.centroids[i]
d = X - C
dt = np.transpose(d)
p[i] = weights[i] * np.exp(np.matmul(dt,W,d))
return np.sum(p,axis=0)
def getRandomIndices(self,X,M): #return random Xs to be used as bases
return np.random.randint(np.size(X, axis=0), size=M)
def getSigma(self,targ_Y,S):
force_sigma = 1
sigma = [np.std(targ_Y[:,s]) for s in range(4)]
if self.S == 5: # add force SD
sigma.append(force_sigma)
return sigma
def getTrainingX(N,S): #N Must be multiple of S
n = round(N/S)
X = np.zeros((0,S))
for s in range(S):
x = scanXRange([s],n,S)
X = np.vstack((X,x))
return X