Lab SD-TSIA214 - Hidden Markov Models
Riccardo Prestigiacomo - Francesco Manca
import numpy as np
""" read emails and store them in a list"""
df = []
for i in range(1,31):
data = np.loadtxt("dat/mail" + str(i) + ".dat", dtype=int)
df.append(data)
""" DATA STRUCTURE INIZIALIZATION """
states = np.array([0,1])
start_prob = np.array([1-1e-100,1e-100])
trans = np.array([[0.999218078035812,0.000781921964187974],[1e-100,1-1e-100]])
emis = np.loadtxt("PerlScriptAndModel/P.text")
def viterbiAlgorithm(obs, states, start_prob, trans, emission_prob, log=False):
"""
Viterbi Algorithm Implementation
Arguments:
- obs: sequence of observation
- states:list of states
- start_prob:vector of the initial probabilities
- trans: transition matrix
- emission_prob: emission probability matrix
Returns:
- seq: sequence of state
"""
start_prob = np.log(start_prob)
trans = np.log(trans)
emission_prob = np.log(emission_prob)
T1 = np.zeros((obs.size, states.size))
T2 = np.zeros((obs.size, states.size), dtype = np.uint8)
for state in states:
T1[0,state] = start_prob[state] + emission_prob[obs[0]][state]
T2[0,state] = 0
for index in range(1,obs.size):
for state in states:
liste = [T1[index-1,start_state] + trans[start_state, state] for start_state in states]
T1[index, state] = np.max(liste) + emission_prob[obs[index]][state]
T2[index, state] = np.argmax(liste)
if(log == False): continue
path = np.zeros(len(obs), dtype= np.uint8)
path[-1] = np.argmax(T1[-1])
for i in range(len(obs)-2,-1,-1):
path[i] = T2[i, path[i+1]]
return path + 1
" test on random email"
obs = df[10] # select mail11
x = viterbiAlgorithm(obs, states, start_prob, trans, emis, log=True)
print(x)
"""x = viterbi(obs, states, start_prob, A, emis)for i in range(10,30):
Y = df.loc[i, ::] # select email
Y = Y[~np.isnan(Y)]
result, path = viterbiAlgorithm(Y, A, B, pi)
f = open("dat/"+'path'+str(i+1)+'.txt','w')
for p in result:
f.write(str(p))
"""
"""
Since for the test part of our data we have the precise point of transition, we're gonna compare
those values with the results obtained using our algorithm.
"""
for i in range(1,11):
print("---------------------Mail "+ str(i) + "---------------------")
Y = df[i-1]
result = viterbiAlgorithm(Y, states, start_prob, trans, emis, log = True)
header = "dat/mail" + str(i) + "h.txt"
with open(header) as h:
expected_trans = len(h.read())+1
trans_viterbi = ((np.array(result) == 1).sum()+1)
print("Expected transition between header and body: " + str(expected_trans))
print("Viterbi transition between header and body: " + str(trans_viterbi))
""" performance on test set """
def spliceText(path, text):
'''splice the text according to the states path'''
vals, index = np.unique(path, return_index=True)
index = index[1]
return text[0:index], text[index:]
def visualizeHeaderBody(number):
path = viterbiAlgorithm(np.loadtxt('dat/mail'+ str(number) + '.dat', dtype = int), states, start_prob, trans, emis, log = True)
val, index = np.unique(path, return_index=True)
index = index[1]
print('+-------------------------------------- mail %d ----------------------------+' % number)
print('Test result:')
print("state 1: 0 ~ " + str(index-1))
print('state 2:' + str(index) + ' ~ ' + str(len(path)))
with open('dat/mail'+ str(number) + '.txt') as file:
text = file.read()
header,body = spliceText(path,text)
print('<------------------------------ header ------------------------------------------------->')
print('\n'.join(header.splitlines()[0:6]))
print('\n'.join(header.splitlines()[-6:]))
print('<------------------------------ body --------------------------------------------------->')
print('\n'.join(body.splitlines()[0:9]))
print('<------------------------------ End --------------------------------------------------->')
visualizeHeaderBody(11)
visualizeHeaderBody(30)
visualizeHeaderBody(25)
visualizeHeaderBody(24)