SPOKEN LANGUAGE AS A BIOMARKER Linguistic Features Extraction
Load libraries
from bs4 import BeautifulSoup
import nltk
import math
import random
import numpy as np
import pandas as pd
import csv
import re
import os
import json
import pickle
import glob
import seaborn as sn
import matplotlib.pyplot as plt
from nltk.tokenize import word_tokenize
import nltk
import string
nltk.download('punkt')
import stanza
import argparse
from stanza.utils.conll import CoNLL
import stanza.resources.common
from ipywidgets import FloatProgress
stanza.download('it')
nlpit = stanza.Pipeline("it")
Get Text from ELAN
path_to_folders = "data/data/subjects_folder/"
#path_to_folders = "/content/drive/MyDrive/Chizzoni/data/subjects_folder/"
def get_sentence(sentence):
angular = r"<([\w ,!?;:.'’]+)>"
angular_no_p = r"<[\w ,!?;:.']+>"
# fragments:
sentence = re.sub(r"&-", "", sentence)
sentence = re.sub(r"&\+", "", sentence)
sentence = re.sub(r"&\*", " ", sentence)
# overlap
sentence = re.sub(angular+r"\s*\[[<>]\]", r"\1", sentence)
# &=laughs
sentence = re.sub(r"\s?&\=\w+\s?", " ", sentence)
# [&=laught]
sentence = re.sub(r"\s?\[&\=\w+\]\s?", " ", sentence)
#interruption or self interruption: what did you +/.
sentence = re.sub(r"\s?\+[/]?\.?\s?", " ", sentence)
#resuming after (self) interruption
sentence = re.sub(r"[\\+]", "", sentence)
#pauses (.) (..)
sentence = re.sub(r"\(\.+\)", "", sentence)
# <bla bla bla> [=! acting]
sentence = re.sub(angular+r"\s*\[\=\!\s?[\s?\w\s?]+\]", r"\1", sentence)
# elongated sonorants, po<i> [# 1]
sentence = re.sub(r"(\w*)"+angular+"(\w*) ?\[# ?[0-9]+\]", r"\1\2\3", sentence)
# self interuption
sentence = re.sub(r"\+\/+", " ", sentence)
# repetition <nell'armad> [/]
sentence = re.sub(angular+r"\s?\[/\]", r"", sentence)
# retracing <nell'armad> [//]
sentence = re.sub(angular+r"\s?\[//\]", r"", sentence)
# reformulation <all of my friends had> [///] uh we all decided to go home for lunch
sentence = re.sub(angular+r"\s?\[\/+\]", r"", sentence)
# <prato> [//]
sentence = re.sub(r"(\w*)"+angular_no_p+"(\w*)\s?\[\/+\]", r"\1", sentence)
# incomplete words
sentence = re.sub(r"\((\w+)\)", r"\1", sentence)
# Dialect '<Mortacci tua> [:d Dannazzione] or replacement <chiusurgia> [: chirurgia]
sentence = re.sub(angular+r" +\[\:d ?([\w ,!?;:.’']+)\]", r"\2", sentence)
sentence = re.sub(angular+r" +\[\: ?([\w ,!?;:.’']+)\]", r"\2", sentence)
sentence = re.sub(angular+r" +\[///: +([\w ,!?;:.’']+)\]", "", sentence)
# local event (complex) [^slams the table]
sentence = re.sub("\[\^\s?[\w ,!?;:.’']+\]", r" ", sentence)
# best guess <ed o> (?) or [?]
sentence = re.sub(angular+r"\s?\(\?\)\s?", r"\1", sentence)
sentence = re.sub(angular+r"\s?\[\?\]\s?", r"\1", sentence)
#esclamation <sbagliato> [!]
sentence = re.sub(angular+r"\s?\[\!\]\s?", r"\1", sentence)
# local event, simple [=! acting]
sentence = re.sub(r"\s*\[=\! ?[A-z ]+ ?\]", r" ", sentence)
# multiple repetitions <Verso le due> [x 2]
sentence = re.sub(angular+r"\s?\[x\s?[0-9]+\s?\]", r"\1", sentence)
# morpho-syntax errors <il> shampoo {synt_err}
sentence = re.sub(angular+r"\s?\{[\w_]+\}", r"\1", sentence)
sentence = re.sub(r"\{[\w_]+\}", r" ", sentence)
# partial repetition <ri> [/ x2]
sentence = re.sub(angular+r"\s?\[/\s?x\s?[0-9]+\s?]", r"", sentence)
# comment or main line [% end of recording]
sentence = re.sub(r"\[%\s?[\w\s]+\]", r"", sentence)
#xxx
sentence = re.sub(r"\s?<?[Xx]xx.?>?\s?", r" ", sentence)
#cleaning
sentence = re.sub(r"\s?\[.?\]\s?", r"", sentence)
sentence = re.sub(r"\s?\[.\]", r"", sentence)
sentence = re.sub(r"\s?\[.+\]", r" ", sentence)
sentence = re.sub(r"\/+", r"", sentence)
sentence = re.sub(r"[<>]", r"", sentence)
sentence = re.sub(r"[{}]", r"", sentence)
return sentence
#reading the Elan eaf files and saves: text_raw, text_clean, and the conll
#for every eaf for every subject and interviewer to "dicitonary"
def Eaf_scraper(path):
folder_to_scrape_list = [folder for folder in os.listdir(path) if os.path.isdir(path + folder)]
#print(f"folder_to_scrape_list {folder_to_scrape_list}")
dictionary = {}
for folder_to_scrape in folder_to_scrape_list:
dictionary[folder_to_scrape] = {}
file_paths = glob.glob(path_to_folders + folder_to_scrape + "/*.eaf")
#print(f"file_paths {file_paths}")
for file_path in file_paths:
file = os.path.basename(file_path)
dictionary[folder_to_scrape][file] = {}
dictionary[folder_to_scrape][file]["text_raw"] = {"Interviewer": [],
"Subject": []}
dictionary[folder_to_scrape][file]["text_clean"] = {"Interviewer": [],
"Subject": []}
dictionary[folder_to_scrape][file]["conll"] = []
with open(file_path, "r", encoding="utf8") as file_IN:
soup = BeautifulSoup(file_IN.read(), "xml")
tiers = soup.find_all("TIER")
for tier in tiers:
if re.search(r"[Ss]ogg", tier["TIER_ID"]):
Subject = tier
for annotation in Subject.find_all("ANNOTATION"):
dictionary[folder_to_scrape][file]["text_raw"]["Subject"].append(annotation.get_text().replace("\n",""))
for annotation in Subject.find_all("ANNOTATION"):
dictionary[folder_to_scrape][file]["text_clean"]["Subject"].append(get_sentence(annotation.get_text()).replace("\n",""))
elif re.search(r"[Ii]nt", tier["TIER_ID"]):
Interviewer = tier
for annotation in Interviewer.find_all("ANNOTATION"):
dictionary[folder_to_scrape][file]["text_raw"]["Interviewer"].append(annotation.get_text().replace("\n",""))
for annotation in Interviewer.find_all("ANNOTATION"):
dictionary[folder_to_scrape][file]["text_clean"]["Interviewer"].append(get_sentence(annotation.get_text()).replace("\n",""))
elif re.search(r"[Ww]ife", tier["TIER_ID"]):
pass
else:
print(f"problem with {file}\ntier['TIER_ID']: {tier['TIER_ID']}\ntier: {tier}")
for sent in dictionary[folder_to_scrape][file]["text_clean"]["Subject"]:
if sent != "" or sent != " ":
dictionary[folder_to_scrape][file]["conll"].append(nlpit(sent))
for folder_to_scrape_key in dictionary.keys():
dictionary[folder_to_scrape_key]["all_text_clean_subject"] = []
dictionary[folder_to_scrape_key]["all_text_raw_subject"] = []
dictionary[folder_to_scrape_key]["all_text_clean_interviewer"] = []
dictionary[folder_to_scrape_key]["all_conll_subject"] = []
for file_key in dictionary[folder_to_scrape_key].keys():
if file_key == "all_text_clean_subject" or file_key == "all_text_clean_interviewer" or file_key == "all_conll_subject" or file_key == "all_text_raw_subject":
continue
dictionary[folder_to_scrape_key]["all_text_clean_subject"].extend(dictionary[folder_to_scrape_key][file_key]["text_clean"]["Subject"])
dictionary[folder_to_scrape_key]["all_text_raw_subject"].extend(dictionary[folder_to_scrape_key][file_key]["text_raw"]["Subject"])
dictionary[folder_to_scrape_key]["all_text_clean_interviewer"].extend(dictionary[folder_to_scrape_key][file_key]["text_clean"]["Interviewer"])
dictionary[folder_to_scrape_key]["all_conll_subject"].extend(dictionary[folder_to_scrape_key][file_key]["conll"])
return dictionary
dictionary_out = Eaf_scraper(path_to_folders)
with open('dictionary.pickle', 'wb') as handle:
pickle.dump(dictionary_out, handle)
with open('dictionary.pickle', 'rb') as handle:
dictionary = pickle.load(handle)
dictionary["SG_01"]
Process Text Data
def writing_global_docs(path, dict_, WriteFile = False):
for folder_to_scrape_key in dict_.keys():
#writing the annotation dictionaries for every subject in a csv and the global conll file for every subject
if WriteFile == True:
#writing the subject clean text to txt files in gdrive
with open(path + "subjects_global" + "/" + folder_to_scrape_key + "_subject_global.txt", "w", encoding="utf8") as textOUT:
textOUT.write(" ".join(dict_[folder_to_scrape_key]["all_text_clean_subject"]))
#writing the interviewer clean text to txt files in gdrive
with open(path + "interviewers_global" + "/" + folder_to_scrape_key + "_interviewer_global.txt", "w", encoding="utf8") as textOUT:
textOUT.write(" ".join(dict_[folder_to_scrape_key]["all_text_clean_interviewer"]))
#writing the raw text to txt files in gdrive
with open(path + "subject_global_raw" + "/" + folder_to_scrape_key + "_subject_global_raw.txt", "w", encoding="utf8") as raw_textOUT:
raw_textOUT.write(" ".join(dict_[folder_to_scrape_key]["all_text_raw_subject"]))
with open(path + "conll_subjects_global" + "/" + folder_to_scrape_key + "_subject_global.conllu", "w", encoding="utf8") as conll_OUT:
for eaf in dict_[folder_to_scrape_key]["all_conll_subject"]:
for sentence in CoNLL.convert_dict(eaf.to_dict()):
for word in sentence:
line = "\t".join(word)+"\n"
conll_OUT.write(line)
conll_OUT.write("\n")
writing_global_docs(path_to_folders, dictionary)
def get_annotations(path, dict_, WriteFile = False):
def process_sentence_str(sentence):
d_out = {"Phonological Fragments": re.findall(r"&[\+][\w'’]+", sentence, re.UNICODE),
"Omissions": re.findall(r"(?<!\>\s)\{[\w'’ ]+\}\s", sentence, re.UNICODE),
#"Interposed words": re.findall(r"&\*", sentence, re.UNICODE),
"Pauses": re.findall(r"\(\.+\)", sentence, re.UNICODE),
"Filled Pauses": re.findall(r"&[\-][\w'’]+", sentence, re.UNICODE),
#"Trailingoff" : re.findall(r"\+\.\.\.", sentence, re.UNICODE),
"Interruptions": re.findall(r"\+/\.", sentence, re.UNICODE),
"Self Interruption_.": re.findall(r"\+//\.", sentence, re.UNICODE ),
"Self Interruption_?": re.findall(r"\+//\?", sentence, re.UNICODE ),
"Elongated sonorants": re.findall(r"(\w*) ?\[# ?[0-9]+\]", sentence, re.UNICODE ),
"Dialect Variations": re.findall(r"<([\w'’ ]+)>\s\[\??:d\s?([\w'’ ]+)\]", sentence, re.UNICODE),
"Best_guesses": re.findall(r"<([\w'’ ]+)>\s(\[\?\])", sentence, re.UNICODE),
"Single Repetitions": re.findall(r"<([\w'’ ]+)>\s\[/\] ?", sentence, re.UNICODE),
"Multiple Repetitions": re.findall(r"(<?[\w'’ ]+>?)\s\[x ([0-9]+)\]", sentence, re.UNICODE),
"Retracing": re.findall(r"<([\w'’ ]+)>\s\[//\]", sentence, re.UNICODE),
#"Restrarts": re.findall(r"^(.*)<([\w'’ ]+)>\s\[///\] ", sentence, re.UNICODE),
#"False Start" : re.findall(r"(<[\w'’ ]+>)\s\[\/-\]", sentence, re.UNICODE),
"Errors" : re.findall(r"(<[\w'’ ]+>\s?)(\{[\w'’ ]+\})", sentence, re.UNICODE),
"Incomplete_words" : re.findall(r"\([\w]+\)", sentence, re.UNICODE),
"Raw Sentence" : sentence.split()
}
return d_out
raw_text_to_process = {}
for folder_to_scrape_key in dict_.keys():
if folder_to_scrape_key.startswith("SG_"):
with open(path_to_folders + "subject_global_raw" + "/" + folder_to_scrape_key + "_subject_global_raw.txt", "r") as raw_textIN:
raw_text_to_process[folder_to_scrape_key] = raw_textIN.readlines()
#writing the annotation dictionaries for every subject in a csv
if WriteFile == True:
with open(path + "annotations_global" + "/" + folder_to_scrape_key + "_annotations_global.csv", "w", encoding="utf8") as csv_OUT:
writer = csv.writer(csv_OUT)
for eaf in dict_[folder_to_scrape_key]["all_text_raw_subject"]:
for key, val in process_sentence_str(eaf).items():
writer.writerow([key,val])
#processing annotations
annotations = {}
for key, val in raw_text_to_process.items():
for sent in val:
annotations[key] = process_sentence_str(sent)
for subject, ann_key in annotations.items():
for ann_key,val in annotations[subject].items():
annotations[subject][ann_key] = len(annotations[subject][ann_key])
return annotations
get_annotations(path_to_folders, dictionary)
annotations = get_annotations(path_to_folders, dictionary)
annotations_df = pd.DataFrame(annotations)
annotations_df = annotations_df.reindex(sorted(annotations_df.columns), axis=1)
annotations_df_trans = annotations_df.T
corrMatrix = annotations_df_trans.corr()
sn.heatmap(corrMatrix, annot=False)
plt.show()
correlated_features = set()
for i in range(len(corrMatrix.columns)):
for j in range(i):
if abs(corrMatrix.iloc[i, j]) > 0.8:
colname = corrMatrix.columns[i]
correlated_features.add(colname)
print(len(correlated_features))
print(correlated_features)
Features Extraction from CoNLL
def extract_lang_data(dictionary, subject_clean, interviewer_clean, write_file = False, path = "/data/data/subjects_folder/", subject = "SG_*"):
out = { "nADVS": [],
"nNOUNS": [],
"nPROPNS": [],
"nVERBS": [],
"nAUXS": [],
"nADJS": [],
"nPRONS": [],
"nPREPS": [],
"nDETS": [],
"nNUMS" : [],
"nCOORS": [],
"nSUBS": [],
"+polarity": 0,
"-polarity" : 0,
"spatial_ref": 0,
"temporal_ref": 0,
"personal_ref": 0,
"personal_DEIXIS" : [],
"spatial_DEIXIS" : [],
"SYN_complexity": [],
"idea_density": [],
"closed_class": [],
"TNW_subject [k]": "",
"TNW_interviewer [k]": ""
}
TNW = { "TNW_subject": "",
"TNW_interviewer": "",
}
#"questo", "questa", "questi", "queste", "quello", "quel", "quella", "quei", "quegli", "quelle",
#liste deittici
list_spatial_ref = ["qui", "qua", "lì", "là", "sopra", "sotto", "davanti", "dietro", "su", "giù", "lassù", "laggiù", "dentro", "fuori", "altrove", "intorno"]
list_temporal_ref = ["ora", "adesso", "ormai", "subito", "prima", "dopo", "sempre", "spesso", "talora", "ancora", "tuttora", "già", "mai", "presto", "tardi", "oggi", "domani", "stamani", "recentemente", "successivamente"]
list_personal_ref = [ "mio", "mia", "miei", "mie", "tuo", "tua", "tuoi", "tue", "suo", "sua", "suoi", "sue", "nostro", "nostra", "nostri", "nostre", "vostro", "vostra", "vostri", "vostre", "loro", "altrui", "proprio", "propria", "propri", "proprie"]
punctuation = string.punctuation
words_interviewer = " ".join(interviewer_clean)
words_interviewer = [word for word in nltk.word_tokenize(words_interviewer) if word not in punctuation]
TNW["TNW_interviewer"] = len(words_interviewer)
out["TNW_interviewer [k]"] = len(words_interviewer)/1000
words_subject = " ".join(subject_clean)
words_subject = [word for word in nltk.word_tokenize(words_subject) if word not in punctuation]
TNW["TNW_subject"] = len(words_subject)
out["TNW_subject [k]"] = len(words_subject)/1000
#print(f"length of words_subjects {len(words_subject)}")
tokpos = []
tokens = []
upos =[]
xpos = []
words = []
for doc in dictionary:
upos.append([i.upos for i in doc.iter_words()])
xpos.append([i.xpos for i in doc.iter_words()])
words.append([i.text for i in doc.iter_words() if i.text != "PUNCT"])
#list of tuples
word_pos_tuple = [(i.text, i.pos) for i in doc.iter_words()]
pos_feat_tuple = [(i.pos, i.feats) for i in doc.iter_words()]
pos_xpos_tuple = [(i.upos, i.xpos) for i in doc.iter_words()]
#INTJ
for i in pos_feat_tuple:
if i[1] == "Polarity=Neg":
out["-polarity"] += 1
elif i[1] == "Polarity=Pos":
out["+polarity"] += 1
sentence = [i[0].lower() for i in word_pos_tuple if i[1] != "PUNCT"]
tokpos.append("\t".join(tup) for tup in word_pos_tuple)
tokens.extend(sentence)
flat_list_words = [item for sublist in words for item in sublist]
#print(f"length of flat_list_words {len(flat_list_words)} uttered by {subject}")
#POS RATIO
flat_list_upos = [item for sublist in upos for item in sublist]
out["nADVS"] = round(len([i for i in flat_list_upos if i == "ADV"])/len(flat_list_upos),4)
out["nNOUNS"] = round(len([i for i in flat_list_upos if i == "NOUN"])/len(flat_list_upos),4) #nomi propri
out["nPROPNS"] = round(len([i for i in flat_list_upos if i == "NOUN"])/len(flat_list_upos),4)
out["nVERBS"] = round(len([i for i in flat_list_upos if i == "VERB"])/len(flat_list_upos),4)
out["nAUXS"] = round(len([i for i in flat_list_upos if i == "AUX"])/len(flat_list_upos),4)
out["nADJS"] = round(len([i for i in flat_list_upos if i == "ADJ"])/len(flat_list_upos),4)
out["nPRONS"] = round(len([i for i in flat_list_upos if i == "PRON"])/len(flat_list_upos),4)
out["nPREPS"] = round(len([i for i in flat_list_upos if i == "ADP"])/len(flat_list_upos),4)
out["nDETS"] = round(len([i for i in flat_list_upos if i == "DET"])/len(flat_list_upos),4)#articoli e dimostrativi
out["nNUMS"] = round(len([i for i in flat_list_upos if i == "NUM"])/len(flat_list_upos),4)
out["nCOORS"] = round(len([i for i in flat_list_upos if i == "CCONJ"])/len(flat_list_upos),4)
out["nSUBS"] = round(len([i for i in flat_list_upos if i == "SCONJ"])/len(flat_list_upos),4)
out["closed_class"] =round(len([i for i in flat_list_upos if i == "DET" or i == "PRON" or i == "ADP" or i == "CCONJ" or i == "SCONJ"])/len(words_subject),4)
out["idea_density"] = round(len([i for i in flat_list_upos if i == "VERB" or i == "ADV" or i == "ADJ" or i == "ADP" or i == "CCONJ" or i == "SCONJ"])/len(words_subject),4)
flat_list_xpos = [item for sublist in xpos for item in sublist]
#number of personal and relative prons and subordinate conjs
out["SYN_complexity"] = round(len([i for i in flat_list_xpos if i == "PR" or i == "PE" or i == "SCONJ"])/len(flat_list_xpos),4)
out["spatial_DEIXIS"] = round(len([i for i in flat_list_xpos if i == "DD"])/len(flat_list_xpos),4) #DD demonstrative determiners PD dem pronouns
out["personal_DEIXIS"] = round(len([i for i in flat_list_xpos if i == "PP" or i == "PE" or i == "PC" or i == "AP"])/len(flat_list_xpos),4)
#DEIXIS (usi endoforici ed esoforici)
out["spatial_ref"] = round(len([w for w in words_subject if w in list_spatial_ref])/len(words_subject),4)
out["temporal_ref"] = round(len([w for w in words_subject if w in list_temporal_ref])/len(words_subject),4)
out["personal_ref"] = round(len([w for w in words_subject if w in list_personal_ref])/len(words_subject),4)
# for key in ["len_sentence", ]:
# out[key] = np.mean(out[key])
#TYPES AND TOKENS
ntypes = len(set(tokens))
ntokens = len(tokens)
freqs = {}
out["ntokens"] = ntokens
for token in tokens:
if token in freqs:
freqs[token] += 1
else:
freqs[token] = 1
nhapax = len([key for key in freqs if freqs[key] == 1])
out["nhapax"] = nhapax
if write_file == True:
with open(path + subject + "_ling_measures.csv", "w", encoding="utf8") as wrout:
for i in tokpos:
wrout.write("\n".join(i) + "\n\n")
return out, TNW
#nested dictionary with all the linguistic measures for every subject
lingustic_measures = {}
TNW = {}
for folder in dictionary.keys():
if folder.startswith("SG_"):
_, TNW[folder] = extract_lang_data(dictionary[folder]["all_conll_subject"], dictionary[folder]["all_text_clean_subject"], dictionary[folder]["all_text_clean_interviewer"], write_file = True, path = path_to_folders + "nlp_output/", subject = folder)
lingustic_measures[folder], _= extract_lang_data(dictionary[folder]["all_conll_subject"], dictionary[folder]["all_text_clean_subject"], dictionary[folder]["all_text_clean_interviewer"], write_file = True, path = path_to_folders + "nlp_output/", subject = folder)
TNW_df = pd.DataFrame.from_dict(TNW)
TNW_df_sorted = TNW_df.reindex(sorted(TNW_df.columns), axis=1)
TNW_df_sorted.to_csv("/TNW_df.csv")
TNW_df_sorted
linguistic_measures_df = pd.DataFrame.from_dict(lingustic_measures)
linguistic_measures_df = linguistic_measures_df.reindex(sorted(linguistic_measures_df.columns), axis=1)
linguistic_measures_df.head()
Text Complexity
# for conll_file_path in os.listdir(path_to_conlls):
# conll_file_base = os.path.basename(conll_file_path)
# if conll_file_path.endswith(".conllu"):
# abs_file = path_to_conlls + conll_file_base
# out_name = path_to_conlls + conll_file_base[:-7] + ".json"
# ! txtcomplexity --input-format conllu --window-size 500 {abs_file} > {out_name}
# json_pattern = os.path.join("/data/data/subjects_folder/conll_subjects_global/", "*.json")
# json_pattern
# file_list = glob.glob(json_pattern)
file_list = ['/work/data/data/subjects_folder/conll_subjects_global/SG_15_subject_global.json', '/work/data/data/subjects_folder/conll_subjects_global/SG_11_subject_global.json', '/work/data/data/subjects_folder/conll_subjects_global/SG_14_subject_global.json', '/work/data/data/subjects_folder/conll_subjects_global/SG_10_subject_global.json', '/work/data/data/subjects_folder/conll_subjects_global/SG_12_subject_global.json', '/work/data/data/subjects_folder/conll_subjects_global/SG_13_subject_global.json', '/work/data/data/subjects_folder/conll_subjects_global/SG_16_subject_global.json', '/work/data/data/subjects_folder/conll_subjects_global/SG_01_subject_global.json', '/work/data/data/subjects_folder/conll_subjects_global/SG_02_subject_global.json', '/work/data/data/subjects_folder/conll_subjects_global/SG_03_subject_global.json', '/work/data/data/subjects_folder/conll_subjects_global/SG_04_subject_global.json', '/work/data/data/subjects_folder/conll_subjects_global/SG_05_subject_global.json', '/work/data/data/subjects_folder/conll_subjects_global/SG_06_subject_global.json', '/work/data/data/subjects_folder/conll_subjects_global/SG_07_subject_global.json', '/work/data/data/subjects_folder/conll_subjects_global/SG_08_subject_global.json', '/work/data/data/subjects_folder/conll_subjects_global/SG_09_subject_global.json']
path_to_conlls = "/data/data/subjects_folder/conll_subjects_global/"
def read_json(path_to_jsons, file_list):
#read list of jsons
# json_pattern = os.path.join(path_to_jsons, "*.json")
# file_list = glob.glob(glob.escape(json_pattern))
#function to delete stdev items
def delete_keys_from_dict(dict_to_edit, keys_to_del):
dict_foo = dict_to_edit.copy() #iterator to avoid DictionaryHasChanged Err
for field in dict_foo.keys():
if field in keys_to_del:
del dict_to_edit[field]
if isinstance(dict_foo[field], dict):
delete_keys_from_dict(dict_to_edit[field], keys_to_del)
return dict_to_edit
list_of_dicts = []
for dir in file_list:
f = open(dir)
list_of_dicts.append(json.load(f))
f.close()
for d in list_of_dicts:
dict(d)
delete_keys_from_dict(d, "stdev")
for key in d:
old_key = key
temporary_new_key = re.search(r"[A-Z]+_[0-9]+", key)
new_key = temporary_new_key.group(0)
d[new_key] = d.pop(old_key)
for folder,name in d.items():
for measure,value_dict in name.items():
for id,num in value_dict.items():
d[folder][measure] = value_dict[id]
return dict((key,d[key]) for d in list_of_dicts for key in d)
txtcomplexity_dicts = read_json(path_to_conlls,file_list)
txtcomplexity_df = pd.DataFrame.from_dict(txtcomplexity_dicts)
txtcomplexity_df = txtcomplexity_df.reindex(sorted(txtcomplexity_df.columns), axis=1)
txtcomplexity_df.head()
txtcomplexity_df.to_csv("txtcomplexity_df.csv")
txtcomp_controls = txtcomplexity_df[["SG_04", "SG_05", "SG_06", "SG_07", "SG_08", "SG_09", "SG_10", "SG_11", "SG_12"]].T
txtcomp_controls.insert(0, "Y", ["0" for i in range(0,9)])
txtcomp_experimental = txtcomplexity_df[["SG_01", "SG_02", "SG_03", "SG_13", "SG_14", "SG_15", "SG_16"]].T
txtcomp_experimental.insert(0, "Y", ["1" for i in range(0,7)])
lexical_TXT_0 = txtcomp_controls[["type-token ratio (disjoint windows)", "Honoré's H (disjoint windows)", "Entropy (disjoint windows)", "Simpson's D (disjoint windows)", "HD-D (disjoint windows)", "MTLD"]]
lexical_TXT_0.insert(0, "Y", ["0" for i in range(0,9)])
lexical_TXT_1 = txtcomp_experimental[["type-token ratio (disjoint windows)", "Honoré's H (disjoint windows)", "Entropy (disjoint windows)", "Simpson's D (disjoint windows)", "HD-D (disjoint windows)", "MTLD"]]
lexical_TXT_1.insert(0, "Y", ["1" for i in range(0,7)])
lexical_TXT_Y_list = [lexical_TXT_1, lexical_TXT_0]
lexical_TXT_Y = pd.concat(lexical_TXT_Y_list)
syntactic_TXT_0 = txtcomp_controls[["average dependency distance","closeness centralization", "closeness centrality", "dependents per word", "longest shortest path", "outdegree centralization"]]
syntactic_TXT_0.insert(0, "Y", ["0" for i in range(0,9)])
syntactic_TXT_1 = txtcomp_experimental[["average dependency distance","closeness centralization", "closeness centrality", "dependents per word", "longest shortest path", "outdegree centralization"]]
syntactic_TXT_1.insert(0, "Y", ["1" for i in range(0,7)])
syntactic_TXT_Y_list = [syntactic_TXT_1, syntactic_TXT_0]
syntactic_TXT_Y = pd.concat(syntactic_TXT_Y_list)
intermediate lexical
#saving and removing index
lexical_TXT_Y.to_csv("lexical_TXT_Y.csv", index=False)
lexical_TXT_Y = pd.read_csv("lexical_TXT_Y.csv")
lexical_TXT_Y.head()
import pickle
import pandas as pd
import numpy as np
import seaborn as sn
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import GaussianNB
from sklearn.naive_bayes import BernoulliNB
from sklearn import tree
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import LeaveOneOut
from sklearn.model_selection import GridSearchCV
from sklearn import metrics
from sklearn.metrics import accuracy_score
from sklearn.metrics import roc_auc_score
LogR = LogisticRegression(max_iter=500)
GauNB = GaussianNB()
BerNB = BernoulliNB()
DTree = DecisionTreeClassifier()
RanF = RandomForestClassifier()
AdaB = AdaBoostClassifier()
KNeigh = KNeighborsClassifier()
X = abs(lexical_TXT_Y.iloc[:, 1:])
print(X.shape)
y = lexical_TXT_Y.Y
print(y.shape)
# splitting X and y
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42) #seed
lenTr = len(X_train), len(y_train)
print(lenTr)
lenTe = len(X_test), len(y_test)
print(lenTe)
from collections import defaultdict
from sklearn.metrics import (accuracy_score, precision_score, recall_score, f1_score, roc_auc_score)
classifiers_list = [(LogR, "Logistic"),(GauNB, "Gaussian NB"),(BerNB, "Bernoulli NB"),(DTree, "Decision Tree"),
(RanF, "Random Forest"), (AdaB, "Ada Boost"),(KNeigh, "k-Nearest Neighbors")]
scores = defaultdict(list)
for i, (clf, name) in enumerate(classifiers_list):
# np.mean(cross_val_score(clf, X_train, y_train, cv = 5))
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)
scores["Classifier"].append(name)
for metric in [accuracy_score, precision_score, recall_score, f1_score, roc_auc_score]:
score_name = metric.__name__.replace("_", " ").replace("score", "").capitalize()
scores[score_name].append(metric(y_test, y_pred))
score_df_lex = pd.DataFrame(scores).set_index("Classifier")
score_df_lex.round(decimals=3)
score_df_lex
score_df_lex.to_csv("score_df_lex.csv")
score_df_lex.to_latex("score_df_lex.tex")
intermediate syntactic
#saving and removing index
syntactic_TXT_Y.to_csv("syntactic_TXT_Y.csv", index=False)
syntactic_TXT_Y = pd.read_csv("syntactic_TXT_Y.csv")
syntactic_TXT_Y.head()
X = abs(syntactic_TXT_Y.iloc[:, 1:])
print(X.shape)
y = syntactic_TXT_Y.Y
print(y.shape)
# splitting X and y
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42) #seed
lenTr = len(X_train), len(y_train)
print(lenTr)
lenTe = len(X_test), len(y_test)
print(lenTe)
from collections import defaultdict
from sklearn.metrics import (accuracy_score, precision_score, recall_score, f1_score, roc_auc_score)
classifiers_list = [(LogR, "Logistic"),(GauNB, "Gaussian NB"),(BerNB, "Bernoulli NB"),(DTree, "Decision Tree"),
(RanF, "Random Forest"), (AdaB, "Ada Boost"),(KNeigh, "k-Nearest Neighbors")]
scores = defaultdict(list)
for i, (clf, name) in enumerate(classifiers_list):
# np.mean(cross_val_score(clf, X_train, y_train, cv = 5))
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)
scores["Classifier"].append(name)
for metric in [accuracy_score, precision_score, recall_score, f1_score, roc_auc_score]:
score_name = metric.__name__.replace("_", " ").replace("score", "").capitalize()
scores[score_name].append(metric(y_test, y_pred))
score_df_syn = pd.DataFrame(scores).set_index("Classifier")
score_df_syn.round(decimals=3)
score_df_syn
score_df_syn.to_csv("score_df_syn.csv")
score_df_syn.to_latex("score_df_syn.tex")
Acoustic Features Extraction
def my_function(path):
df = pd.read_csv(path, delimiter="\t", header=None)
df.columns = ["ID", "start_POSIX", "start", "end_POSIX", "end", "duration_POSIX", "duration", "transcript"]
df = df.drop(["start_POSIX", "end_POSIX", "duration_POSIX", "duration"], axis=1)
df = df.sort_values("start").reset_index(drop=True)
df.to_csv("input.csv", index=False)
def get_row_in_interval(rows, start, end):
rows.sort_values("start").reset_index(drop=True)
for index in range(len(rows)):
row = rows.iloc[index]
if start >= row["start"] and end <= row["end"]:
return row, rows[index:]
return None, rows
def get_row_for_partitions(start, end, row_int, row_sogg, row_silence):
if row_silence["transcript"] == "silent":
if row_int is not None and row_sogg is not None:
if start == row_int["start"] or end == row_int["end"] or start == row_sogg["start"] or end == row_sogg["end"]:
return ["sounding", "Int & Sogg", start, end, f"Int: {row_int['transcript']}\nSogg: {row_sogg['transcript']}"]
else:
return ["silent", "Int & Sogg", start, end, f"Int: {row_int['transcript']}\nSogg: {row_sogg['transcript']}"]
elif row_int is not None:
if start == row_int["start"] or end == row_int["end"]:
return ["sounding", "Int", start, end, row_int["transcript"]]
else:
return ["silent", "Int", start, end, row_int["transcript"]]
elif row_sogg is not None:
if start == row_sogg["start"] or end == row_sogg["end"]:
return ["sounding", "Sogg", start, end, row_sogg["transcript"]]
else:
return ["silent", "Sogg", start, end, row_sogg["transcript"]]
else:
return ["silent", "Nessuno", start, end, row_silence["transcript"]]
elif row_silence["transcript"] == "sounding":
if row_int is not None and row_sogg is not None:
return ["sounding", "Int & Sogg", start, end, f"Int: {row_int['transcript']}\nSogg: {row_sogg['transcript']}"]
elif row_int is not None:
return ["sounding", "Int", start, end, row_int["transcript"]]
elif row_sogg is not None:
return ["sounding", "Sogg", start, end, row_sogg["transcript"]]
else:
return ["sounding", "Nessuno", start, end, row_silence["transcript"]]
raise Exception(f'row_silence["transcript"] is neither "sounding" or "silent". {row_silence}')
def merge_same_partitions(partitions):
results = []
i = 0
while i < len(partitions):
current = partitions[i]
j = 0
while True:
if i+j+1 < len(partitions) and partitions[i][0] == partitions[i+j+1][0] and partitions[i][1] == partitions[i+j+1][1] and partitions[i][4] == partitions[i+j+1][4]:
j += 1
else:
break
results.append([partitions[i][0], partitions[i][1], partitions[i][2], partitions[i+j][3], partitions[i][4]])
i = i + j + 1
return results
split_values = []
split_values.extend(df.start)
split_values.extend(df.end)
split_values = sorted(np.unique(split_values))
row_ints = df.query('ID == "Int"').sort_values("start").reset_index(drop=True)
row_soggs = df.query('ID == "Sogg"').sort_values("start").reset_index(drop=True)
row_silences = df.query('ID == "silences"').sort_values("start").reset_index(drop=True)
partitions = []
for i in range(len(split_values)-1):
start = split_values[i]
end = split_values[i+1]
#print(f"split_values index: {i}, start: {start}, end: {end}")
row_int, row_ints = get_row_in_interval(row_ints, start, end)
row_sogg, row_soggs = get_row_in_interval(row_soggs, start, end)
row_silence, row_silences = get_row_in_interval(row_silences, start, end)
if row_silence is None:
raise Exception(f"Interval without row_silence: start: {start} end: {end}")
partitions.append(get_row_for_partitions(start, end, row_int, row_sogg, row_silence))
results = merge_same_partitions(partitions)
results_df = pd.DataFrame(results)
results_df.columns = ["recording", "person", "start", "end", "transcript"]
results_df["duration"] = results_df["end"] - results_df["start"]
results_df
#Detected silences of interviewee during answer
silences_sogg_df = results_df.query('person == "Sogg" & recording == "silent"').sort_values("start").reset_index(drop=True)
# Calculation of reaction time of interviewee to questions
# delete noises that are not spoken parts of interviewee or interviewer
# transform sounding of nobody to silent and merge multiple same partitions to 1 big one
results_df['recording'] = np.where((results_df['recording'] == 'sounding') & (results_df['person'] == 'Nessuno'), "silent", results_df['recording'])
results_df['transcript'] = np.where((results_df['recording'] == 'silent') & (results_df['person'] == 'Nessuno'), "silent", results_df['transcript'])
results_df.drop("duration", inplace=True, axis=1)
results = results_df.values.tolist()
results = merge_same_partitions(results)
results_df = pd.DataFrame(results)
results_df.columns = ["recording", "person", "start", "end", "transcript"]
results_df["duration"] = results_df["end"] - results_df["start"]
results_df['previous_person'] = results_df['person'].shift(1)
results_df['following_person'] = results_df['person'].shift(-1)
results_df['previous_transcript'] = results_df['transcript'].shift(1)
results_df['following_transcript'] = results_df['transcript'].shift(-1)
no_reaction_time_list = results_df.query('recording == "sounding" & previous_person == "Int" & person == "Int & Sogg"')
for index in range(len(no_reaction_time_list)):
no_reaction_time = no_reaction_time_list.iloc[index]
results_df = results_df.append({'recording': "silent",
'person': "Nessuno",
'start': no_reaction_time["start"],
'end': no_reaction_time["start"],
'transcript': "silent",
'duration': 0,
'previous_person': "Int",
'following_person': "Sogg",
'previous_transcript': no_reaction_time['previous_transcript'],
'following_transcript': no_reaction_time['following_transcript']
}, ignore_index=True)
reaction_time_sogg_df = results_df.query('recording == "silent" & previous_person == "Int" & following_person == "Sogg"').sort_values("start").reset_index(drop=True)
speech_time_sogg_df = df.query('ID == "Sogg"').sort_values("start").reset_index(drop=True)
speech_time_sogg_df["duration"] = speech_time_sogg_df["end"] - speech_time_sogg_df["start"]
return results_df, silences_sogg_df, reaction_time_sogg_df, speech_time_sogg_df
path_to_subjects_folders = "/work/folder_to_process"
TNW_df = pd.read_csv("TNW_df.csv",index_col="Unnamed: 0" )
TNW_df.drop(index=TNW_df.index[1], axis=0, inplace=True)
TNW_df = TNW_df.reindex(sorted(TNW_df.columns), axis=1)
TNW = TNW_df.to_dict()
results = dict()
for folder in os.listdir(path_to_subjects_folders):
silences_sogg_df_list = []
reaction_time_sogg_df_list =[]
speech_time_sogg_df_list = []
for path_to_file in os.listdir(path_to_subjects_folders + "/" + folder):
path_to_file = path_to_subjects_folders + "/" + folder + "/" + path_to_file
#print(path_to_file)
_, silences_sogg_df, reaction_time_sogg_df, speech_time_sogg_df = my_function(path_to_file)
#print(silences_sogg_df["duration"])
silences_sogg_df_list.append(silences_sogg_df)
reaction_time_sogg_df_list.append(reaction_time_sogg_df)
speech_time_sogg_df_list.append(speech_time_sogg_df)
silences_sogg_df = pd.concat(silences_sogg_df_list, axis=0)
reaction_time_sogg_df = pd.concat(reaction_time_sogg_df_list, axis=0)
speech_time_sogg_df = pd.concat(speech_time_sogg_df_list, axis=0)
#print(reaction_time_sogg_df["duration"])
results[folder] = dict()
results[folder]["silence_time_average"] = np.mean(silences_sogg_df["duration"])
results[folder]["silence_time_std"] = np.std(silences_sogg_df["duration"])
results[folder]["silence_time_median"] = np.median(silences_sogg_df["duration"])
results[folder]["reaction_time_average"] = np.mean(reaction_time_sogg_df["duration"])
results[folder]["reaction_time_std"] = np.std(reaction_time_sogg_df["duration"])
results[folder]["speech_time_average"] = np.mean(speech_time_sogg_df["duration"])
results[folder]["speech_time_std"] = np.std(speech_time_sogg_df["duration"])
results[folder]["speech_time_median"] = np.median(speech_time_sogg_df["duration"])
results[folder]["total_speech_time_s"] = sum(speech_time_sogg_df["duration"])
acoustic_df = pd.DataFrame.from_dict(results)
acoustic_df = acoustic_df.reindex(sorted(acoustic_df.columns), axis=1)
with open('acoustic_df.pickle', 'wb') as handle:
pickle.dump(acoustic_df, handle)
with open('acoustic_df.pickle', 'rb') as handle:
acoustic_df = pickle.load(handle)
acoustic_df.head()
TNW = TNW_df.to_dict()
speech_rate_wpm = {"SG_01" : {"speech_rate_wpm" : 649/4.9168},
"SG_02" : {"speech_rate_wpm" : 443/2.723183333},
"SG_03" : {"speech_rate_wpm" : 986/6.525183333},
"SG_04" : {"speech_rate_wpm" : 949/6.8057},
"SG_05" : {"speech_rate_wpm" : 367/2.8926833333333 },
"SG_06" : {"speech_rate_wpm" : 895/5.160466667},
"SG_07" : {"speech_rate_wpm" : 1477/12.572866666667},
"SG_08" : {"speech_rate_wpm" : 832/4.966816667},
"SG_09" : {"speech_rate_wpm" : 1036/5.8120166666667},
"SG_10" : {"speech_rate_wpm" : 1324/6.8445666666667},
"SG_11" : {"speech_rate_wpm" : 1290/6.91823333},
"SG_12" : {"speech_rate_wpm" : 1003/5.705016667},
"SG_13" : {"speech_rate_wpm" : 3006/19.150216667},
"SG_14" : {"speech_rate_wpm" : 2007/14.0548333},
"SG_15" : {"speech_rate_wpm" : 749/6.2908},
"SG_16" : {"speech_rate_wpm" : 2819/12.478333}
}
speech_rate_wpm
pd.DataFrame(speech_rate_wpm)
speech_rate_wpm = pd.DataFrame(speech_rate_wpm)
dfs_to_combine = [acoustic_df,speech_rate_wpm]
acoustic_final_df = pd.concat(dfs_to_combine)
acoustic_final_df.to_csv("acoustic_final_df.csv", index=False)
intermediate acoustic
#saving and removing index
acoustic_df_Y = pd.read_csv("acoustic_df_Y.csv")
acoustic_df_Y.to_csv("acoustic_df_Y.csv", index=False)
acoustic_df_Y.head()
X = abs(acoustic_df_Y.iloc[:, 1:])
print(X.shape)
y = acoustic_df_Y.Y
print(y.shape)
# splitting X and y
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42) #seed
lenTr = len(X_train), len(y_train)
print(lenTr)
lenTe = len(X_test), len(y_test)
print(lenTe)
from collections import defaultdict
from sklearn.metrics import (accuracy_score, precision_score, recall_score, f1_score, roc_auc_score)
classifiers_list = [(LogR, "Logistic"),(GauNB, "Gaussian NB"),(BerNB, "Bernoulli NB"),(DTree, "Decision Tree"),
(RanF, "Random Forest"), (AdaB, "Ada Boost"),(KNeigh, "k-Nearest Neighbors")]
scores = defaultdict(list)
for i, (clf, name) in enumerate(classifiers_list):
# np.mean(cross_val_score(clf, X_train, y_train, cv = 5))
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)
scores["Classifier"].append(name)
for metric in [accuracy_score, precision_score, recall_score, f1_score, roc_auc_score]:
score_name = metric.__name__.replace("_", " ").replace("score", "").capitalize()
scores[score_name].append(metric(y_test, y_pred))
score_df_acu = pd.DataFrame(scores).set_index("Classifier")
score_df_acu.round(decimals=3)
score_df_acu
score_df_acu.to_csv("score_df_acu.csv")
score_df_acu.to_latex("score_df_acu.tex")
Final Classification
control_final_acu = acoustic_final_df[["SG_04", "SG_05", "SG_06", "SG_07", "SG_08", "SG_09", "SG_10", "SG_11", "SG_12"]].T
control_final_acu.insert(0, "Y", ["0" for i in range(0,9)])
experimental_final_acu = acoustic_final_df[["SG_01", "SG_02", "SG_03", "SG_13", "SG_14", "SG_15", "SG_16"]].T
experimental_final_acu.insert(0, "Y", ["1" for i in range(0,7)])
#merging the two datasets
groups_acu = [control_final_acu,experimental_final_acu]
acoustic_df_Y = pd.concat(groups_acu)
acoustic_df_Y.to_csv("acoustic_df_Y.csv", index=False)
descriptive_stats_0 = control_final_acu.describe()
descriptive_stats_0
descriptive_stats_1 = experimental_final_acu.describe()
descriptive_stats_1
Final Dataset
dfs = [linguistic_measures_df, annotations_df, txtcomplexity_df, acoustic_final_df]
final_df = pd.concat(dfs)
final_df.head()
#describe linguistic_measures_df
control_linguistics = linguistic_measures_df[["SG_04", "SG_05", "SG_06", "SG_07", "SG_08", "SG_09", "SG_10", "SG_11", "SG_12"]].T
control_linguistics.insert(0, "Y", ["0" for i in range(0,9)])
experimental_linguistics = linguistic_measures_df[["SG_01", "SG_02", "SG_03", "SG_13", "SG_14", "SG_15", "SG_16"]].T
experimental_linguistics.insert(0, "Y", ["1" for i in range(0,7)])
ling_describe_0 = control_linguistics.describe()
ling_describe_1 = experimental_linguistics.describe()
ling_describe_0
ling_describe_1
#describe annotations_df
control_annotations = annotations_df[["SG_04", "SG_05", "SG_06", "SG_07", "SG_08", "SG_09", "SG_10", "SG_11", "SG_12"]].T
control_annotations.insert(0, "Y", ["0" for i in range(0,9)])
experimental_annotations = annotations_df[["SG_01", "SG_02", "SG_03", "SG_13", "SG_14", "SG_15", "SG_16"]].T
experimental_annotations.insert(0, "Y", ["1" for i in range(0,7)])
annot_describe_0 = control_annotations.describe()
annot_describe_1 = experimental_annotations.describe()
annot_describe_0
annot_describe_1
#describe txtcomplexity_df
control_txtcomp = txtcomplexity_df[["SG_04", "SG_05", "SG_06", "SG_07", "SG_08", "SG_09", "SG_10", "SG_11", "SG_12"]].T
control_txtcomp.insert(0, "Y", ["0" for i in range(0,9)])
experimental_txtcomp = txtcomplexity_df[["SG_01", "SG_02", "SG_03", "SG_13", "SG_14", "SG_15", "SG_16"]].T
experimental_txtcomp.insert(0, "Y", ["1" for i in range(0,7)])
txt_com_describe_0 = control_txtcomp.describe()
txt_com_describe_1 = experimental_txtcomp.describe()
txt_com_describe_0
txt_com_describe_1
import pickle
with open('final_df.pickle', 'wb') as handle:
pickle.dump(final_df, handle)
with open('final_df.pickle', 'rb') as handle:
final_df_pickled = pickle.load(handle)
#splitting the dataframe into control and experimental by indexing 0 to control and 1 to impaired
#NB transposing .T at the end
control_final = final_df_pickled[["SG_04", "SG_05", "SG_06", "SG_07", "SG_08", "SG_09", "SG_10", "SG_11", "SG_12"]].T
control_final.insert(0, "Y", ["0" for i in range(0,9)])
experimental_final = final_df_pickled[["SG_01", "SG_02", "SG_03", "SG_13", "SG_14", "SG_15", "SG_16"]].T
experimental_final.insert(0, "Y", ["1" for i in range(0,7)])
#merging the two datasets
groups = [control_final,experimental_final]
final_df_Y = pd.concat(groups)
#saving and removing index SG
final_df_Y.to_csv("final_df_Y.csv", index=False)
Statistical Analysis
import pandas as pd
import numpy as np
import seaborn as sn
import matplotlib.pyplot as plt
final_df_Y = pd.read_csv("final_df_Y.csv")
final_df_Y.head()
acoustic_df_Y = pd.read_csv("acoustic_df_Y.csv")
acoustic_df_Y.head()
#descriptive statistics
descriptive_stats_final = final_df_Y.describe()
descriptive_stats_final
descriptive_stats_final.to_csv("descriptive_stats_final.csv")
descriptive_stats_final.to_excel("descriptive_stats_final.xls")
PCA
final_df_Y['Y'].replace(0, 'healthy',inplace=True)
final_df_Y['Y'].replace(1, 'pathological',inplace=True)
final_df_Y.head()
from sklearn.preprocessing import StandardScaler
import numpy as np
# Separating out the features
X = final_df_Y.iloc[:, 1:].values
# Separating out the target
y = final_df_Y.iloc[:,0].values
# Standardizing the features
x = StandardScaler().fit_transform(X)
X
x.shape
feat_cols = ['feature'+str(i) for i in range(x.shape[1])]
normalised_final = pd.DataFrame(x,columns=feat_cols)
normalised_final.head()
from sklearn.decomposition import PCA
pca = PCA(n_components=2)
principalComponents = pca.fit_transform(x)
principalDf = pd.DataFrame(data = principalComponents, columns = ['principal component 1', 'principal component 2'])
principalDf.head()
#the principal component 1 holds 35% of the information while the principal component 2 holds only 16% of the information
print('Explained variation per principal component: {}'.format(pca.explained_variance_ratio_))
plt.figure()
plt.figure(figsize=(10,10))
plt.xticks(fontsize=12)
plt.yticks(fontsize=14)
plt.xlabel('Principal Component - 1',fontsize=20)
plt.ylabel('Principal Component - 2',fontsize=20)
plt.title("PCA of AD and Healthy Individuals Spoken Language",fontsize=20)
targets = ['healthy', 'pathological']
colors = ['b', 'r']
for target, color in zip(targets,colors):
indicesToKeep = final_df_Y['Y'] == target
plt.scatter(principalDf.loc[indicesToKeep, 'principal component 1']
, principalDf.loc[indicesToKeep, 'principal component 2'], c = color, s = 50)
plt.legend(targets,prop={'size': 15})
from sklearn.model_selection import train_test_split
X = abs(final_df_Y.iloc[:, 1:])
y = final_df_Y.Y
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)
scaler = StandardScaler()
# Fit on training set only.
scaler.fit(X_train)
# Apply transform to both the training set and the test set.
X_train = scaler.transform(X_train)
X_test = scaler.transform(X_test)
pca_ = PCA(.95)
pca_.fit(X_train)
X_train = pca.transform(X_train)
X_test = pca.transform(X_test)
from sklearn.linear_model import LogisticRegression
logisticRegr = LogisticRegression(solver = 'lbfgs', max_iter=200)
logisticRegr.fit(X_train, y_train)
logisticRegr.predict(X_test)
print(logisticRegr.score(X_test, y_test))
from sklearn.tree import DecisionTreeClassifier
DecisionTree_model = DecisionTreeClassifier()
DecisionTree_model.fit(X_train, y_train)
DecisionTree_model.predict(X_test)
print(DecisionTree_model.score(X_test, y_test))
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier
RandomForestClassifier_model = RandomForestClassifier()
RandomForestClassifier_model.fit(X_train, y_train)
RandomForestClassifier_model.predict(X_test)
print(RandomForestClassifier_model.score(X_test, y_test))
AdaBoostClassifier_model = AdaBoostClassifier()
AdaBoostClassifier_model.fit(X_train, y_train)
AdaBoostClassifier_model.predict(X_test)
print(AdaBoostClassifier_model.score(X_test, y_test))
from sklearn.naive_bayes import GaussianNB, BernoulliNB
GaussianNB_model = GaussianNB ()
GaussianNB_model.fit(X_train, y_train)
GaussianNB_model.predict(X_test)
print(GaussianNB_model.score(X_test, y_test))
BernoulliNB_model = BernoulliNB ()
BernoulliNB_model.fit(X_train, y_train)
BernoulliNB_model.predict(X_test)
print(BernoulliNB_model.score(X_test, y_test))
Correlation
#RELOAD THE FINAL_DF_Y!
final_df_Y = pd.read_csv("final_df_Y.csv")
#building correlation matrix - whole dataset
corr = final_df_Y.corr()
#removing one of a pair of high-correlated
columns = np.full((corr.shape[0],), True, dtype=bool)
for i in range(corr.shape[0]):
for j in range(i + 1, corr.shape[0]):
if corr.iloc[i,j] >= 0.7:
if columns[j]:
columns[j] = False
selected_columns = final_df_Y.columns[columns]
final_df_Y_corr_rem = final_df_Y[selected_columns]
#new dataset 16 x 27
final_df_Y_corr_rem.head()
df_selected_pearson = final_df_Y_corr_rem.to_csv("df_selected_pearson.csv", index=False)
import numpy as np
import seaborn as sns
np.triu(np.ones_like(final_df_Y_corr_rem.corr()))
plt.figure(figsize=(18, 8))
mask = np.triu(np.ones_like(final_df_Y_corr_rem.corr(), dtype=np.bool))
heatmap = sns.heatmap(final_df_Y_corr_rem.corr(), mask=mask, vmin=-1, vmax=1, annot=False)
heatmap.set_title('Triangle Correlation Heatmap', fontdict={'fontsize':18}, pad=15)
F-test for feature selection
from sklearn.feature_selection import f_classif
from sklearn.feature_selection import SelectKBest, f_classif
#check for missing values = 0
#final_df_Y.isnull().sum()
#extract the regressand, Y, from the data frame
sampleData_Y = final_df_Y.iloc[:,0]
sampleData_Y = sampleData_Y.astype(int)
#extract the regressors, X, from the data frame
sampleData_X = final_df_Y.iloc[:, 1:]
k=30
#set the number of best-feature under scrutiny to 2
#the higher the k number, the higher the risk of overfitting
#create a handle to the feature selector
selector = SelectKBest(f_classif,k=k)
#fit to our sample data
selector.fit(sampleData_X, sampleData_Y)
#computes the best features (2 of them, because K=2)
best_feats = selector.get_support(indices=True)
#select the two columns corresponding to the best features
xTrain_best = sampleData_X.iloc[:,best_feats]
#show the best 15 features computed so far
xTrain_best.head()
best_30_final_df_Y = xTrain_best
best_30_final_df_Y.to_csv("best_30_final_df_Y.csv")
best_30_final_df_Y.head()
from sklearn import model_selection
X_sel = abs(final_df_Y_corr_rem.iloc[:, 1:])
y_sel = final_df_Y_corr_rem.Y
#selected DATASET TRAINED WITH Decision Tree
from sklearn.tree import DecisionTreeClassifier
from sklearn import tree
model = tree.DecisionTreeClassifier()
model = model.fit(X_sel, y_sel )
kfold = model_selection.KFold(n_splits=10, random_state=7, shuffle=True)
scoring = 'accuracy'
results = model_selection.cross_val_score(model, X_sel, y_sel, cv=kfold, scoring=scoring)
print("Accuracy: %.3f (%.3f)" % (results.mean(), results.std()))
#selected DATASET TRAINED WITH Adaboost
model = AdaBoostClassifier()
model = model.fit(X_sel, y_sel )
kfold = model_selection.KFold(n_splits=10, random_state=7, shuffle=True)
scoring = 'accuracy'
results = model_selection.cross_val_score(model, X_sel, y_sel, cv=kfold, scoring=scoring)
print("Accuracy: %.3f (%.3f)" % (results.mean(), results.std()))
#selected DATASET TRAINED WITH Random Forest
X_sel = abs(final_df_Y_corr_rem.iloc[:, 1:])
y_sel = final_df_Y_corr_rem.Y
model = RandomForestClassifier()
model = model.fit(X_sel, y_sel )
kfold = model_selection.KFold(n_splits=10, random_state=7, shuffle=True)
scoring = 'accuracy'
results = model_selection.cross_val_score(model, X_sel, y_sel, cv=kfold, scoring=scoring)
print("Accuracy: %.3f (%.3f)" % (results.mean(), results.std()))
#selected DATASET TRAINED WITH Gaussian_NB_model
model = GaussianNB()
model = model.fit(X_sel, y_sel)
kfold = model_selection.KFold(n_splits=10, random_state=7, shuffle=True)
scoring = 'accuracy'
results = model_selection.cross_val_score(model, X_sel, y_sel, cv=kfold, scoring=scoring)
print("Accuracy: %.3f (%.3f)" % (results.mean(), results.std()))
#selected DATASET TRAINED WITH LogisticRegression
model_log = LogisticRegression(max_iter=500)
model_log = model_log.fit(X_sel, y_sel)
kfold = model_selection.KFold(n_splits=10, random_state=7, shuffle=True)
scoring = 'accuracy'
results = model_selection.cross_val_score(model_log, X_sel, y_sel, cv=kfold, scoring=scoring)
print("Accuracy: %.3f (%.3f)" % (results.mean(), results.std()))
#WHOLE DATASET TRAINED WITH Decision Tree
from sklearn.tree import DecisionTreeClassifier
from sklearn import tree
from sklearn import model_selection
X = abs(final_df_Y.iloc[:, 1:])
y = final_df_Y.Y
model = tree.DecisionTreeClassifier()
model = model.fit(X, y)
kfold = model_selection.KFold(n_splits=10, random_state=7, shuffle=True)
scoring = 'accuracy'
results = model_selection.cross_val_score(model, X, y, cv=kfold, scoring=scoring)
print("Accuracy: %.3f (%.3f)" % (results.mean(), results.std()))
#extract the regressand, Y, from the data frame
sampleData_Y = acoustic_df_Y.iloc[:,0]
sampleData_Y = sampleData_Y.astype(int)
#extract the regressors, X, from the data frame
sampleData_X = acoustic_df_Y.iloc[:, 1:]
k=3
#set the number of best-feature under scrutiny to 2
#the higher the k number, the higher the risk of overfitting
#create a handle to the feature selector
selector = SelectKBest(f_regression,k=k)
#fit to our sample data
selector.fit(sampleData_X, sampleData_Y)
#computes the best features (2 of them, because K=2)
best_feats = selector.get_support(indices=True)
#select the two columns corresponding to the best features
xTrain_best = sampleData_X.iloc[:,best_feats]
#show the best 2 features computed so far
xTrain_best.head()
Kolmogorov-Smirnov test
import pandas as pd
finalforks = pd.read_csv("final_df_Y.csv")
from scipy.stats import ks_2samp
KS_stats = {}
pvalues = {}
data_no_Y = finalforks.iloc[:,1:]
control_sample = finalforks.loc[finalforks.Y==0]
impaired_sample = finalforks.loc[finalforks.Y==1]
for i in data_no_Y.columns:
KS_stats[i] = ks_2samp(control_sample[i], impaired_sample[i])
KS_stats
final_df_Y = pd.read_csv("final_df_Y.csv")
final_df_Y.head()
KS_stats = {}
pvalues = {}
data_no_Y = final_df_Y.iloc[:,1:]
control_sample = final_df_Y.loc[final_df_Y.Y==0]
impaired_sample = final_df_Y.loc[final_df_Y.Y==1]
for i in data_no_Y.columns:
KS_stats[i] = ks_2samp(control_sample[i], impaired_sample[i])
for key,results in KS_stats.items():
if results.pvalue < 0.05:
pvalues[key] = results.pvalue
KS_stats_005 = {key:results for key,results in KS_stats.items() if results.pvalue < 0.05}
KS_pvalues_005 = pvalues
KS_stats_df = pd.DataFrame.from_dict(KS_stats_005)
KS_stats_df_transposed = KS_stats_df.T
KS_stats_df_transposed.to_latex("KS_stats_005.tex")
KS_stats_df
Classification
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import GaussianNB
from sklearn.naive_bayes import BernoulliNB
from sklearn import tree
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import LeaveOneOut
from sklearn.model_selection import GridSearchCV
from sklearn import metrics
from sklearn.metrics import accuracy_score
from sklearn.metrics import roc_auc_score
final_df_Y
X = abs(final_df_Y.iloc[:, 1:])
# print(X.shape)
# print(X)
y = final_df_Y.Y
# print(y.shape)
# print(y)
# splitting X and y
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42) #seed
lenTr = len(X_train), len(y_train)
print(lenTr)
lenTe = len(X_test), len(y_test)
print(lenTe)
# initializing classifiers
LogR = LogisticRegression(max_iter=200)
GauNB = GaussianNB()
BerNB = BernoulliNB()
DTree = DecisionTreeClassifier()
RanF = RandomForestClassifier()
AdaB = AdaBoostClassifier()
KNeigh = KNeighborsClassifier()
from collections import defaultdict
from sklearn.metrics import (precision_score, recall_score, f1_score, brier_score_loss, log_loss, roc_auc_score)
classifiers_list = [
(LogR, "Logistic"),(GauNB, "Naive Bayes"),(BerNB, "Bernoulli"),(DTree, "Decision Tree"),
(RanF, "Random Forest"), (AdaB, "Ada Boost"),(KNeigh, "Nearest Neighbors")
]
scores = defaultdict(list)
for i, (clf, name) in enumerate(classifiers_list):
# np.mean(cross_val_score(clf, X_train, y_train, cv = 5))
clf.fit(X_train, y_train)
#y_prob = clf.predict_proba(X_test)
y_pred = clf.predict(X_test)
scores["Classifier"].append(name)
for metric in [accuracy_score, precision_score, recall_score, f1_score, roc_auc_score]:
score_name = metric.__name__.replace("_", " ").replace("score", "").capitalize()
scores[score_name].append(metric(y_test, y_pred))
score_df = pd.DataFrame(scores).set_index("Classifier")
score_df.round(decimals=3)
score_df
def estimator_tester(list_of_estimators, features, targets):
results = []
for est in list_of_estimators:
results.append((est.__class__.__name__, np.mean(cross_val_score(est, features, targets, cv = LeaveOneOut()))))
return results
estimators = [GaussianNB(), LogisticRegression(max_iter=200), KNeighborsClassifier(), DecisionTreeClassifier(), AdaBoostClassifier(), RandomForestClassifier()]
def split_and_train(data):
X = abs(data.iloc[:, 1:])
y = data.Y
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42)
lenTr = len(X_train), len(y_train)
lenTe = len(X_test), len(y_test)
out = estimator_tester(estimators, X_train, y_train)
return out
split_and_train(final_df_Y)
Single training
RandomForest_model = RandomForestClassifier()
RandomForest_model = RandomForest_model.fit(X_train,y_train)
RF_on_TestSet = RandomForest_model.predict(X_test)
accuracy_score(y_test, RF_on_TestSet)
from sklearn.model_selection import cross_val_score
clf = RandomForestClassifier()
scores = cross_val_score(clf, X, y, cv=5) # estimator, fetures, target, number of folds
print(scores)
print("%0.2f accuracy with a standard deviation of %0.2f" % (scores.mean(), scores.std()))
clf = AdaBoostClassifier()
scores = cross_val_score(clf, X, y, cv=5) # estimator, fetures, target, number of folds
print(scores)
print("%0.2f accuracy with a standard deviation of %0.2f" % (scores.mean(), scores.std()))
roc_auc_score(y, AdaBoost_model.predict_proba(X)[:, 1])
roc_auc_score(y, AdaBoost_model.decision_function(X))
metrics.plot_roc_curve(AdaBoost_model, X_test, y_test)
plt.show()
AdaBoost_model.get_params()
Fit the RF on Gridsearch
RanFor.get_params()
param_grid = {"bootstrap" : [True],
"max_depth": [10,15,20],
"max_features": ["sqrt"],
"min_samples_leaf": [1,2,3],
"min_samples_split": [3,5,7],
"n_estimators" : [500,1000,1500,2000]
}
grid = GridSearchCV(RandomForestClassifier(), param_grid, cv=7)
# like a normal estimator, this has not yet been applied to any data.
# calling the fit() method will fit the model at each grid point, keeping track of the scores along the way
grid.fit(X, y)
# now that this is fit, we can ask it for the best parameters:
grid.best_params_
#GRID SEARCH ON RANDOM FOREST
# instance of the clf
RanFor = RandomForestClassifier()
# having a look at the parameters
RanFor.get_params()
# dictionary with the params I want to modify
RanFor_hyperp = {"bootstrap" : [False],
"max_depth": [60],
"max_features": ["sqrt"],
"min_samples_leaf": [1],
"min_samples_split": [5],
"n_estimators" : [600]}
GridSe= GridSearchCV (RanFor, RanFor_hyperp)
GridSe.fit(X_train,y_train)
GridSe.best_params_
GridSe.best_score_
#RANDOM GRID SEARCH ON RF
from sklearn.model_selection import RandomizedSearchCV
from scipy.stats import uniform
from pprint import pprint
np.linspace(start = 100, stop = 200, num = 10)
# Number of trees in random forest
n_estimators = [int(x) for x in np.linspace(start = 200, stop = 2000, num = 10)]
# Number of features to consider at every split
max_features = ['auto', 'sqrt']
# Maximum number of levels in tree
max_depth = [int(x) for x in np.linspace(10, 110, num = 11)]
max_depth.append(None)
# Minimum number of samples required to split a node
min_samples_split = [2, 5, 10]
# Minimum number of samples required at each leaf node
min_samples_leaf = [1, 2, 4]
# Method of selecting samples for training each tree
bootstrap = [True, False]
# Create the random grid
random_grid = {'n_estimators': n_estimators,
'max_features': max_features,
'max_depth': max_depth,
'min_samples_split': min_samples_split,
'min_samples_leaf': min_samples_leaf,
'bootstrap': bootstrap}
pprint(random_grid)
# instanciating
RanFor = RandomForestClassifier()
rf_random = RandomizedSearchCV(estimator = RanFor,
param_distributions = random_grid,
n_iter = 50,
cv = 5,
verbose=2,
random_state=42,
n_jobs = -1)
rf_random.fit(X_train, y_train)
print(rf_random.best_params_)
print(rf_random.best_score_)