from scipy import stats
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
plt.style.use('ggplot')
def test_samples(mu, var, n):
values = var**.5 * stats.norm.rvs(size=n) + mu
noise = var**.5 / n * stats.norm.rvs(size=n)
return values+noise
def resample(sample1, sample2):
all_samples = np.concatenate((sample1, sample2), axis=None)
np.random.shuffle(all_samples)
rs1 = all_samples[:len(sample1)]
rs2 = all_samples[len(sample1):]
return rs1, rs2
def randomization_test(sample1, sample2, B, alternative):
t_avg = np.average(sample1) - np.average(sample2)
t_std = np.std(sample1) - np.std(sample2)
t_star_avg = []
t_star_std = []
for _ in range(B):
rs1, rs2 = resample(sample1, sample2)
avg_diff = np.average(rs1) - np.average(rs2)
t_star_avg.append(avg_diff)
std_diff = np.std(rs1) - np.std(rs2)
t_star_std.append(std_diff)
t_star_avg = np.array(t_star_avg)
t_star_std = np.array(t_star_std)
if alternative == 'less.than':
p_val_avg = len(t_star_avg[t_star_avg<t_avg]) / len(t_star_avg)
p_val_std = len(t_star_std[t_star_std<t_std]) / len(t_star_std)
elif alternative == 'greater.than':
p_val_avg = len(t_star_avg[t_star_avg>t_avg]) / len(t_star_avg)
p_val_std = len(t_star_std[t_star_std>t_std]) / len(t_star_std)
else: # The case of a 2 sided t-test
t1_avg = len(t_star_avg[t_star_avg<-np.abs(t_avg)])
t2_avg = len(t_star_avg[t_star_avg>np.abs(t_avg)])
p_val_avg = (t1_avg + t2_avg) / len(t_star_avg)
t1_std = len(t_star_std[t_star_std<-np.abs(t_std)])
t2_std = len(t_star_std[t_star_std>np.abs(t_std)])
p_val_std = (t1_std + t2_std) / len(t_star_std)
return np.round(p_val_avg, 3), np.round(p_val_std, 3)
# Use Case:
sample1 = test_samples(mu= 0, var= 1, n=1000)
sample2 = test_samples(mu= 0.1, var= 1.1, n=1000)
randomization_test(sample1, sample2, 100, 'not.equal')
def generate_sample(delta, n, partno):
#rs1 = stats.norm.rvs(size=n) # normal random variables w/mu = 0, var = 1
rs1 = np.random.normal(0, 1, n)
if partno == 1:
#rs2 = stats.norm.rvs(size=n) + delta
rs2 = np.random.normal(delta, 1, n)
elif partno == 2:
# rs2 = (1+delta)**.5 * stats.norm.rvs(size=n)
rs2 = np.random.normal(0, (1+delta)**.5, n)
return rs1, rs2
def construct_dataframe(partno: int) -> pd.DataFrame:
val = 10
B_list = [10 + 30*i for i in range(34)]
delta_list = [0, 0.5, 1, 2]
rows = []
for B in B_list:
row = [B]
for delta in delta_list:
rs1, rs2 = generate_sample(delta, 100, partno)
p_avg, p_std = randomization_test(rs1, rs2, B, 'not.equal')
row.append([p_avg, p_std])
rows.append(row)
df = pd.DataFrame(rows)
df.set_index(0, inplace=True)
df.index.name = 'B'
df.columns = ['δ=0', 'δ=0.5','δ=1', 'δ=2']
return df
avg_df = construct_dataframe(partno=1)
avg_df
std_df = construct_dataframe(partno=2)
std_df
delta_list = [0, 0.5, 1, 2]
fig, ax = plt.subplots(figsize=(12,4))
for delta in delta_list:
col_name = f"δ={delta}"
ax.plot(avg_df[col_name].index, [i[0] for i in avg_df[col_name].values], label=f"avg_delta:{delta}")
plt.legend()
plt.xlabel("Number of permutations")
plt.ylabel("P-val")
plt.title("P-val for mean over different deltas and permutations")
plt.show()
delta_list = [0, 0.5, 1, 2]
fig, ax = plt.subplots(figsize=(12,4))
for delta in delta_list:
col_name = f"δ={delta}"
ax.plot(std_df[col_name].index, [i[0] for i in std_df[col_name].values], label=f"std_delta:{delta}")
plt.legend()
plt.xlabel("Number of permutations")
plt.ylabel("P-val")
plt.title("P-val for std over different deltas and permutations")
plt.show()
alpha = 0.05
delta = 1 # effect size
k = 1 # sample size multiplier
var = 1. # population variance
beta = 0.2
power = 1 - beta
#Testing block
alpha=0.05
stats.norm.ppf(1-0.8)
def find_zscore(alternative, alpha):
if alternative == 'not.equal':
z_alpha = stats.norm.ppf(1-alpha/2)
elif alternative == 'less.than':
z_alpha = stats.norm.ppf(1-alpha)
elif alternative == 'greater.than':
z_alpha = stats.norm.ppf(1-alpha)
#print(z_alpha)
return z_alpha
def power_analysis(alpha, delta, k, var, power, alternative):
"""Takes in variables and returns the ideal number of samples"""
z_alpha = find_zscore(alternative, alpha)
z_beta = stats.norm.ppf(power)
n2 = (1/k + 1)*(z_alpha + z_beta)**2 * var / delta**2
n1 = k * n2
return int(n1), int(n2)
def power_analysis_proportions(alpha, delta, k, var, power, pi_1, pi_2, alternative):
"""Takes in variables and returns the ideal number of samples"""
z_alpha = find_zscore(alternative, alpha)
z_beta = stats.norm.ppf(1-beta)
numerator = (z_alpha - z_beta)**2*(pi_1*(1 - pi_1)/k + pi_2*(1 - pi_2))
denominator = delta**2
n2 = numerator/denominator
n1 = k * n2
return int(n1), int(n2)
power_analysis(alpha=0.05, delta=0.10, k=1, var=1, power=0.8, alternative='greater.than')
alpha = 0.05
k = 1
var = 1
delta = 0.10
alternative = 'less.than'
def find_prob(Z, alternative):
if alternative == 'not.equal':
return 1 - stats.norm.cdf(Z)
elif alternative == 'less.than':
return stats.norm.cdf(np.abs(Z))
elif alternative == 'greater.than':
return 1-stats.norm.cdf(Z)
def calculate_power(alpha, n_2, k, var, delta, alternative, pi_1=None, pi_2=None):
z_alpha = find_zscore(alternative, alpha)
#print(z_alpha)
if pi_1 is None or pi_2 is None:
Z = z_alpha - delta / (np.sqrt(var*(1/n_2 + 1/(k*n_2))))
return find_prob(Z, alternative)
else:
denom = (pi_1 * (1-pi_1)/(k*n_2) + pi_2 * (1-pi_2)/(n_2))**.5
Z = z_alpha - delta / denom
return find_prob(Z, alternative)
calculate_power(alpha=0.05, n_2=1236, k=1, var=1, delta=0.1, alternative='greater.than')
delta_list = [i/10 for i in range(1, 10)]
powers = []
for delta in delta_list:
power = calculate_power(alpha=0.05,
n_2=30,
k=1,
var=1,
delta=delta,
alternative='not.equal',
pi_1=0.1,
pi_2=0.1+delta)
powers.append(power)
print(f"delta: {delta} power: {power:.3f}")
delta: 0.1 power: 0.194
delta: 0.2 power: 0.516
delta: 0.3 power: 0.816
delta: 0.4 power: 0.964
delta: 0.5 power: 0.998
delta: 0.6 power: 1.000
delta: 0.7 power: 1.000
delta: 0.8 power: 1.000
delta: 0.9 power: 1.000
plt.plot(delta_list, powers)
plt.xlabel('delta values')
plt.ylabel('Power')
plt.title('')
plt.show()
# Benjamani-Hochberg procedure
# https://www.statisticshowto.com/benjamini-hochberg-procedure/
p_vals = [0.001, 0.008, 0.039, 0.041, 0.042, 0.06, 0.074, 0.205, 0.356, 0.444]
# Put individuals p_vals in ascending order
# Assign ranks to p-vals (smallest = 1)
# critical value: (i/m) * Q
# i = p_val rank
# m = total number of tests
# Q = false discovery rate
def benjamani_hochberg_correction(p_vals: list, Q: int):
"""
Given a list of p_values, uses the Benjamani-Hochberg procedure to reduce
the false discovery rate in the experiments.
Algorithm:
1) Put individuals p_vals in ascending order
2) Assign ranks to p-vals (smallest = 1)
3) Find the critical values via for each rank via:
critical value = (i/m) * Q
i = p_val rank
m = total number of tests
Q = false discovery rate
4) Identify the largest p-value that's still less than the critical value
5) Return all p-values up to the rank of the p-value found in (4)
"""
p_vals = np.array(p_vals)
p_vals = p_vals[p_vals.argsort()]
rank = np.array([_ for _ in range(1, len(p_vals)+1)])
bh_cv = np.round(rank/len(rank)*Q, 3)
sig_p = 0
for i, j in zip(p_vals[::-1], bh_cv[::-1]):
if i < j:
break
sig_p += 1
# print(f"{i} : {j}")
return p_vals[:-sig_p-1]
def bonferonni_correction(p_vals: list, alpha=0.05):
"""
Adjustment to protect against false discovery rates, though is prone to Type II
errors.
"""
bonf_alpha = alpha / len(p_vals)
return bonf_alpha
print(benjamani_hochberg_correction(p_vals=p_vals, Q=0.1))
print(bonferonni_correction(p_vals=p_vals))
[0.001 0.008 0.039 0.041]
0.005
# Sample P-Values
pv1 = 0.05 * np.random.sample(size=30)
pv2 = 0.95 * np.random.sample(size=70) + 0.05
p_vals = np.concatenate((pv1, pv2))
def calc_power_BH_adjust(alpha, n_2, k, var, delta, alternative='not.equal', pi_1=None, pi_2=None, Q=0.25):
alpha = benjamani_hochberg_correction(p_vals, Q)
power = []
for a in alpha:
power.append(calculate_power(a,
n_2,
k,
var,
delta,
alternative,
pi_1=None,
pi_2=None)
)
return alpha, power
alpha, power = calc_power_BH_adjust(alpha = 0.05,
n_2=30,
var=1,
k=1,
delta=0.25
)
plt.plot([_ for _ in range(len(alpha))], power)
plt.xlabel('α rank')
plt.ylabel('Power')
plt.title('')
plt.show()