from _base import *
import pandas as pd
from imblearn.under_sampling import RandomUnderSampler
from sklearn.decomposition import PCA as sklearnPCA
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
class Data(object):
drugs = ['Alcohol',
'Amyl',
'Amphet',
'Benzos',
'Caff',
'Cannabis',
'Coke',
'Crack',
'Ecstasy',
'Heroin',
'Ketamine',
'Legalh',
'LSD',
'Meth',
'Mushrooms',
'Nicotine',
'VSA']
def __init__(self,
path,
target='Cannabis',
features=['Country', 'Gender', 'Nscore', 'Escore', 'Oscore', 'Cscore', 'Impulsive', 'SS'],
sensitive_feature='Gender'):
assert isinstance(path, str)
assert isinstance(target, str)
assert isinstance(features, list)
assert isinstance(sensitive_feature, str)
self.path = path
self.raw_data = pd.read_csv(path)
assert target in self.raw_data.columns
assert sensitive_feature in self.raw_data.columns
self.features = features
self.data = (self.raw_data
.drop('ID', axis=1)
.drop(self.raw_data[self.raw_data['Semer'] != 'CL0'].index)
.drop(['Choc','Semer'], axis=1)
.reset_index(drop=True)
.replace({'CL0': 0, 'CL1': 0, 'CL2': 1, 'CL3': 1, 'CL4': 1, 'CL5':1, 'CL6': 1}))
self.data.index.name = None
self.target= target
self.X = self.data[self.features]
self.y = self.data[self.target]
self.X = self.X.drop('Gender', axis=1).join(pd.get_dummies(self.X.Gender.replace({0.48246: 'XY', -0.48246: 'XX'})))
self.sensitive_feature = sensitive_feature
def undersample_(self): #in_place
self.X, self.y = RandomUnderSampler().fit_resample(self.X, self.y)
self.undersampled = True
def pca_transform_(self, n_components=2):
assert isinstance(n_components, int)
pca = sklearnPCA(n_components=n_components)
pca_transformed = pca.fit_transform(self.X)
self.X_t = pd.DataFrame(pca.fit_transform(self.X), columns=[f'PCA{i}' for i in range(pca_transformed.shape[1])])
self.pca = True
def train_test_split(self, test_size):
assert isinstance(test_size, float)
assert test_size < 1.0
return train_test_split(self.X_t, self.y, test_size=test_size)
def __repr__(self):
return f"Data from {self.path} | Size: {len(self)} | Target: {self.target} (1:0 {drug_data.y.value_counts()[1]}:{drug_data.y.value_counts()[0]}) |" + \
f" Sensitive: {self.sensitive_feature} (XX:XY {self.X['XX'].sum()}:{self.X['XY'].sum()})"
def __len__(self):
return len(self.X)
def generate_synthetic_data(plot_data=False, distrib_distance=np.array([5,5]), ax=None, title="", target='Cannabis'): ## backward compatibility
drug_data = Data(path='drug_consumption.csv', target=target, sensitive_feature='Gender')
drug_data.undersample_()
drug_data.pca_transform_()
source = drug_data
X_t = source.X_t
X = source.X
y = source.y
svm = SVC(gamma=0.1, kernel='rbf', C=3.5)
svm.fit(X_t, y)
lr = LogisticRegression()
lr.fit(X_t, y)
y_pred = pd.Series(svm.predict(X_t), index=y.index)
results = pd.concat([X, X_t, y, y_pred, pd.DataFrame(lr.predict_proba(X_t), columns=['prob_0', 'prob_1'])], axis=1)
filtered = results[((results.prob_0 < 0.65) & (results[source.target] == 1)) | ((results[source.target] == results[0]))]
X = filtered[['PCA0', 'PCA1']].values
y = filtered[source.target].values
x_control = {"s1": filtered.XX.values}
return X, y, x_control
n = 1
dimp_in_data = []
euc_distances = []
dimp_scenarios = []
X, y, X_control = generate_synthetic_data(target='Coke')
formatted_X=np.array([X[:,0], X[:,1], X_control['s1']]).T ## Concatenating X with sensible att
sec_ml_dataset_all = CDataset(X, y)
sensible_att_all = X_control['s1']
dimp_in_data.append(calculate_disparate_impact(sec_ml_dataset_all.Y.get_data(), sensible_att_all))
## Splitting data.
X_train_val, X_test, y_train_val, y_test = train_test_split(formatted_X, y, test_size=0.2, random_state=random_state)
X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.5, random_state=random_state)
training = CDataset(X_train[:,:2], y_train)
training_sensible_att = X_train[:,2]
validation = CDataset(X_val[:,:2], y_val)
validation_sensible_att = X_val[:,2]
val_lambda = np.zeros(validation.num_samples)
## Creating lambda vector
val_lambda[np.where((validation_sensible_att==0) & (y_val==0))[0]] == 1 ## Unprivileged denied
val_lambda[np.where((validation_sensible_att==0) & (y_val==1))[0]] == 1 ## Unprivileged granted
val_lambda[np.where((validation_sensible_att==1) & (y_val==0))[0]] == -1 ## Privileged denied
val_lambda[np.where((validation_sensible_att==1) & (y_val==1))[0]] == -1 ## Privileged granted
test = CDataset(X_test[:,:2], y_test)
test_sensible_att = X_test[:,2]
scenario = {
"name": "Use case 4 - {}".format(n),
"description": "Disparate impact attack. \n Euclidean distance between group averages: {}\n".format(n),
"training": training,
"training_sensible_att" : training_sensible_att,
"validation" : validation,
"validation_sensible_att" : validation_sensible_att,
"lambda_validation" : val_lambda,
"test": test,
"test_sensible_att" : test_sensible_att,
"all_data" : sec_ml_dataset_all,
"all_sensible_att" : sensible_att_all,
}
dimp_scenarios.append(scenario)
for scenario in dimp_scenarios:
################################
### ORIGINAL CLF PERFORMANCE ###
################################
original_model, original_acc = train_LogReg(scenario["training"], scenario["test"])
orig_y_pred = original_model.predict(scenario["test"].X)
orig_FNR, orig_FPR = get_error_rates(scenario["test"].Y.get_data(), orig_y_pred.get_data(), scenario["test_sensible_att"], 1, 1)
print("Orig")
orig_disparate_imp = calculate_disparate_impact(orig_y_pred.get_data(), scenario["test_sensible_att"])
orig_odds_diff = get_average_odds_difference(scenario["test"].Y.get_data(), orig_y_pred.get_data(), scenario["test_sensible_att"])
scenario['original_classifier'] = original_model
scenario['original_acc'] = original_acc
scenario['orig_d_imp'] = orig_disparate_imp
scenario['orig_odds'] = orig_odds_diff
scenario['orig_FNR'] = orig_FNR
scenario['orig_FPR'] = orig_FPR
########################
### WHITE BOX ATTACK ###
########################
white_pois_clf = deepcopy(original_model)
privileged_condition_valid = np.ones(scenario['validation'].num_samples)
privileged_condition_valid[scenario["validation_sensible_att"] == 0] == -1
white_pois_points, white_pois_tr = execute_adversarial_attack(white_pois_clf, scenario["training"], scenario["validation"], scenario["test"], scenario["test_sensible_att"], scenario["validation_sensible_att"])
## Retraining with poisoned points
white_pois_clf = white_pois_clf.fit(white_pois_tr.X, white_pois_tr.Y)
white_pois_y_pred = white_pois_clf.predict(scenario["test"].X)
metric = CMetricAccuracy()
white_pois_acc = metric.performance_score(scenario["test"].Y, y_pred=white_pois_y_pred)
print("->> white")
white_pois_disparate_imp = calculate_disparate_impact(white_pois_y_pred.get_data(), scenario["test_sensible_att"])
white_odds_diff = get_average_odds_difference(scenario["test"].Y.get_data(), white_pois_y_pred.get_data(), scenario["test_sensible_att"])
white_pois_FNR, white_pois_FPR = get_error_rates(scenario["test"].Y.get_data(), white_pois_y_pred.get_data(), scenario["test_sensible_att"], 1, 1)
print(f"Accuracy after poisoning: {round(white_pois_acc*100, 2)}%")
scenario['white_poisoned_classifier'] = white_pois_clf
scenario['white_poisoned_points'] = white_pois_points
scenario['white_pois_d_imp'] = white_pois_disparate_imp
scenario['white_odds'] = white_odds_diff
scenario['white_pois_y_pred'] = white_pois_y_pred
scenario['white_pois_acc'] = white_pois_acc
scenario['white_pois_FNR'] = white_pois_FNR
scenario['white_pois_FPR'] = white_pois_FPR
%matplotlib inline
%matplotlib inline
plot_2d_decision_boundary_for_test(dimp_scenarios[0])
################################
### Model Comparison ###
################################
RANGE = np.arange(0.05, 0.31, 0.04)
_results = []
for i in range(5):
scenarios2 = []
for perc_pois in RANGE:
scenario = deepcopy(dimp_scenarios[0])
#print(scenario["training"])
print("\n\n ==== {}-{} ====".format(scenario['name'], perc_pois))
print(" - {}\n".format(scenario['description']))
################################
### ORIGINAL LogReg PERFORMANCE ###
################################
original_model, original_acc = train_LogReg(scenario["training"], scenario["test"])
orig_y_pred = original_model.predict(scenario["test"].X)
orig_FNR, orig_FPR = get_error_rates(scenario["test"].Y.get_data(), orig_y_pred.get_data(), scenario["test_sensible_att"], 1, 1)
orig_disparate_imp = calculate_disparate_impact(orig_y_pred.get_data(), scenario["test_sensible_att"])
orig_odds_diff = get_average_odds_difference(scenario["test"].Y.get_data(), orig_y_pred.get_data(), scenario["test_sensible_att"])
scenario['original_classifier'] = original_model
scenario['original_acc'] = original_acc
scenario['orig_d_imp'] = orig_disparate_imp
scenario['orig_FNR'] = orig_FNR
scenario['orig_FPR'] = orig_FPR
scenario['orig_odds'] = orig_odds_diff
########################
### LR - Poison ###
########################
white_pois_clf = deepcopy(original_model)
privileged_condition_valid = np.ones(scenario['validation'].num_samples)
privileged_condition_valid[scenario["validation_sensible_att"] == 0] == -1
white_pois_points, white_pois_tr = execute_adversarial_attack(white_pois_clf,
scenario["training"],
scenario["validation"],
scenario["test"],
scenario["test_sensible_att"],
scenario["validation_sensible_att"],
perc_pois)
## Retraining with poisoned points
white_pois_clf = white_pois_clf.fit(white_pois_tr.X, white_pois_tr.Y)
white_pois_y_pred = white_pois_clf.predict(scenario["test"].X)
metric = CMetricAccuracy()
white_pois_acc = metric.performance_score(scenario["test"].Y, y_pred=white_pois_y_pred)
white_pois_disparate_imp = calculate_disparate_impact(white_pois_y_pred.get_data(), scenario["test_sensible_att"])
white_pois_FNR, white_pois_FPR = get_error_rates(scenario["test"].Y.get_data(), white_pois_y_pred.get_data(), scenario["test_sensible_att"], 1, 1)
white_odds_diff = get_average_odds_difference(scenario["test"].Y.get_data(), white_pois_y_pred.get_data(), scenario["test_sensible_att"])
scenario['white_poisoned_classifier'] = white_pois_clf
scenario['white_poisoned_points'] = white_pois_points
scenario['white_pois_d_imp'] = white_pois_disparate_imp
scenario['white_pois_y_pred'] = white_pois_y_pred
scenario['white_pois_acc'] = white_pois_acc
scenario['white_pois_FNR'] = white_pois_FNR
scenario['white_pois_FPR'] = white_pois_FPR
scenario['white_odds'] = white_odds_diff
########################
### RF ###
########################
real_model, real_acc = train_RF(scenario["training"], scenario["test"])
surrogate_clf = deepcopy(original_model)
black_pois_points, black_pois_tr = execute_adversarial_attack(surrogate_clf,
scenario["training"],
scenario["validation"],
scenario["test"],
scenario["test_sensible_att"],
scenario["validation_sensible_att"],
perc_pois)
## Retraining with poisoned points
black_pois_clf = deepcopy(real_model)
black_pois_clf = black_pois_clf.fit(black_pois_tr.X,black_pois_tr.Y)
black_pois_y_pred = black_pois_clf.predict(scenario["test"].X)
metric = CMetricAccuracy()
black_pois_acc = metric.performance_score(y_true=scenario["test"].Y, y_pred=black_pois_y_pred)
black_pois_disparate_imp = calculate_disparate_impact(black_pois_y_pred.get_data(), scenario["test_sensible_att"])
black_pois_FNR, black_pois_FPR = get_error_rates(scenario["test"].Y.get_data(), black_pois_y_pred.get_data(), scenario["test_sensible_att"], 1, 1)
black_odds_diff = get_average_odds_difference(scenario["test"].Y.get_data(), black_pois_y_pred.get_data(), scenario["test_sensible_att"])
scenario['black_poisoned_classifier'] = black_pois_clf
scenario['black_poisoned_points'] = black_pois_points
scenario['black_pois_d_imp'] = black_pois_disparate_imp
scenario['black_pois_y_pred'] = black_pois_y_pred
scenario['black_pois_acc'] = black_pois_acc
scenario['black_pois_FNR'] = black_pois_FNR
scenario['black_pois_FPR'] = black_pois_FPR
scenario['black_odds'] = black_odds_diff
################################
### CLASSIC POISONING ATTACK ###
################################
normal_pois_clf = deepcopy(original_model)
privileged_condition_valid = np.ones(scenario['validation'].num_samples)
privileged_condition_valid[scenario["validation_sensible_att"] == 0] == -1
normal_pois_points, normal_pois_tr = execute_normal_poisoning_attack(normal_pois_clf,
scenario["training"],
scenario["validation"],
scenario["test"],
scenario["test_sensible_att"],
scenario["validation_sensible_att"],
perc_pois)
## Retraining with poisoned points
normal_pois_clf = normal_pois_clf.fit(normal_pois_tr.X,normal_pois_tr.Y)
normal_pois_y_pred = normal_pois_clf.predict(scenario["test"].X)
metric = CMetricAccuracy()
normal_pois_acc = metric.performance_score(scenario["test"].Y, y_pred=normal_pois_y_pred)
print("->> normal")
normal_pois_disparate_imp = calculate_disparate_impact(normal_pois_y_pred.get_data(), scenario["test_sensible_att"])
normal_odds_diff = get_average_odds_difference(scenario["test"].Y.get_data(), normal_pois_y_pred.get_data(), scenario["test_sensible_att"])
normal_pois_FNR, normal_pois_FPR = get_error_rates(scenario["test"].Y.get_data(), normal_pois_y_pred.get_data(), scenario["test_sensible_att"], 1, 1)
normal_odds_diff = get_average_odds_difference(scenario["test"].Y.get_data(), normal_pois_y_pred.get_data(), scenario["test_sensible_att"])
scenario['normal_poisoned_classifier'] = normal_pois_clf
scenario['normal_poisoned_points'] = normal_pois_points
scenario['normal_pois_d_imp'] = normal_pois_disparate_imp
scenario['normal_odds'] = normal_odds_diff
scenario['normal_pois_y_pred'] = normal_pois_y_pred
scenario['normal_pois_acc'] = normal_pois_acc
scenario['normal_pois_FNR'] = normal_pois_FNR
scenario['normal_pois_FPR'] = normal_pois_FPR
scenario['normal_odds'] = normal_odds_diff
scenarios2.append(scenario)
_results.append(scenarios2)
## Converting to slides
!jupyter nbconvert "Poison Apple.ipynb" --to slides --reveal-prefix "http://cdn.jsdelivr.net/reveal.js/2.5.0"