def ts_plot(residuals, stan_residuals, lags=50):
residuals.plot(title='GARCH Residuals', figsize=(15, 10))
plt.show()
fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(20, 10))
ax[0].set_title('GARCH Standardized Residuals KDE')
ax[1].set_title('GARCH Standardized Resduals Probability Plot')
residuals.plot(kind='kde', ax=ax[0])
probplot(stan_residuals, dist='norm', plot=ax[1])
plt.show()
acf = plot_acf(stan_residuals, lags=lags)
pacf = plot_pacf(stan_residuals, lags=lags, method='ywm')
acf.suptitle('GARCH Model Standardized Residual Autocorrelation', fontsize=20)
acf.set_figheight(5)
acf.set_figwidth(15)
pacf.set_figheight(5)
pacf.set_figwidth(15)
plt.show()
Modeling Volatility of BTC Price
def aggregate_data(df, time_interval, coin):
filter = df['ticker'] == coin
df['hour'] = df['timestamp'].apply(lambda x: x.to_period(freq='H'))
df['day'] = df['timestamp'].apply(lambda x: x.to_period(freq='D'))
yesterday = (datetime.datetime.utcnow() - datetime.timedelta(1))
days = (yesterday - df[filter]['day'].min().to_timestamp()).days
if time_interval == 'hour':
time_index = pd.date_range(df['hour'].min().to_timestamp(), periods=days*24, freq="H").to_period(freq='H').to_timestamp()
time_spine = pd.DataFrame(index=time_index)
grouped = df[filter].groupby('hour').agg(min=('price_usd', 'min'),
mean=('price_usd', 'mean'),
max=('price_usd', 'max'),
volatility=('price_usd', 'std')
).reset_index()
elif time_interval == 'day':
time_index = pd.date_range(df['hour'].min().to_timestamp(), periods=days, freq="D").to_period(freq='D').to_timestamp()
time_spine = pd.DataFrame(index=time_index)
grouped = df[filter].groupby('day').agg(min=('price_usd', 'min'),
mean=('price_usd', 'mean'),
max=('price_usd', 'max'),
volatility=('price_usd', 'std')
).reset_index()
grouped['log_price'] = np.log(grouped['mean'])
grouped['p_log_price'] = grouped['log_price'].shift(1)
grouped['log_returns'] = (grouped['log_price'] - grouped['p_log_price']) * 100
grouped[time_interval] = grouped[time_interval].apply(lambda x: x.to_timestamp())
returns = grouped.set_index(time_interval)['log_returns']
return grouped, time_spine.join(pd.DataFrame(returns))
def gridsearch(data, p_rng, q_rng, model_type):
top_score, top_results = float('inf'), None
top_models = []
for p in p_rng:
for q in q_rng:
try:
model = arch_model(data, vol=model_type, p=p, q=q, dist='normal')
model_fit = model.fit(disp='off')
resid = model_fit.resid
st_resid = np.divide(resid, model_fit.conditional_volatility)
results = evaluate_model(resid, st_resid)
results['AIC'] = model_fit.aic
results['params']['p'] = p
results['params']['q'] = q
if results['AIC'] < top_score:
top_score = results['AIC']
top_results = results
elif results['LM_pvalue'][1] is False:
top_models.append(results)
except:
continue
top_models.append(top_results)
return top_models
def evaluate_model(residuals, st_residuals, lags=50):
results = {
'LM_pvalue': None,
'F_pvalue': None,
'SW_pvalue': None,
'AIC': None,
'params': {'p': None, 'q': None}
}
arch_test = het_arch(residuals, nlags=lags)
shap_test = shapiro(st_residuals)
results['LM_pvalue'] = [arch_test[1], arch_test[1] < .05]
results['F_pvalue'] = [arch_test[3], arch_test[3] < .05]
results['SW_pvalue'] = [shap_test[1], shap_test[1] < .05]
return results
def generate_rw(y0, n, sigma, ymin=None, ymax=None):
rw = [y0]
for t in range(1, n):
yt = rw[t-1] + np.random.normal(0,sigma)
if (ymax is not None) and (yt > ymax):
yt = rw[t-1] - abs(np.random.normal(0,sigma))
elif (ymin is not None) and (yt < ymin):
yt = rw[t-1] + abs(np.random.normal(0,sigma))
rw.append(yt)
return rw
df, price = aggregate_data(df_1, 'day', 'BTC')
interpolated_price = price.interpolate(method='from_derivatives').dropna()['log_returns']
X = interpolated_price.reset_index()['index']
y = interpolated_price.reset_index()['log_returns']
train = int(np.floor(len(price)*0.95))
test = int(np.floor(len(price)*0.05))
split_date = interpolated_price[train:]
Testing for ADF, Dicky-Fuller GLS, Phillips-Perron and KPSS
Realized Volatility and Log Returns
Comparing Implied Volatility to Realized Volatility
with open('/work/btc_iv.json', 'r') as f:
btc_iv = json.load(f)
btc_iv = pd.DataFrame.from_dict(btc_iv['Series']['ATM 30']['Data'])
btc_iv['Timestamp'] = btc_iv['Timestamp'].apply(lambda x: datetime.datetime.utcfromtimestamp(x).date())
btc_iv['Result'] = btc_iv['Result']
btc_iv.set_index('Timestamp', inplace=True)
bitvol_iv = pd.read_csv('bitvol.csv')
bitvol_iv['date'] = bitvol_iv['date'].apply(lambda x: datetime.datetime.strptime(x, '%d/%m/%Y %H:%M').date())
bitvol_iv = bitvol_iv.sort_values(by='date').set_index('date')
IV from Genesis Volatility and t3index
a = btc_iv.join(interpolated_price.rolling(30).std()*np.sqrt(365)).dropna()
b = bitvol_iv.join(interpolated_price.rolling(30).std()*np.sqrt(365)).dropna()
fig1 = plt.figure()
ax1 = fig1.add_subplot(1, 1, 1)
ax1.plot(a['Result'], label=f'Genesis Implied Volatility', linewidth=1, color='Red')
ax1.plot(b['volatility_index'], label=f't3index Implied Volatility', linewidth=1, color='Black')
ax1.plot(a['log_returns'], label=f'Realized Volatility', linewidth=1, color='Blue')
ax1.title.set_text('BTC IV vs RV')
ax1.set_xlabel('Date')
ax1.set_ylabel('Volatility')
ax1.legend()
plt.grid(True)
plt.show()
a.corr()
b.corr()
Auto-correlation of Volatility
Gridsearch to find the best P and Q params
For GARCH
For EGARCH
Model Fit and Diagnostics
vol_act = interpolated_price[test:].std()*np.sqrt(365)
Establishing Baseline
df_summary = pd.DataFrame(columns=['type', 'mean_squared_error', 'mean_absolute_error', 'mean_absolute_percentage_error'])
def log_results(summary_df, ytrue, ypred, exp_type):
df = pd.DataFrame(data=[exp_type], columns=['type'])
df['mean_squared_error'] = mean_squared_error([ytrue], [ypred])
df['mean_absolute_error'] = mean_absolute_error([ytrue], [ypred])
df['mean_absolute_percentage_error'] = mean_absolute_percentage_error([ytrue], [ypred])*100
merged_df = pd.concat([summary_df, df], axis=0, ignore_index=True)
return merged_df
def calculate_error_metrics(ytrue, ypred):
mse = mean_squared_error(ytrue, ypred)
mae = mean_absolute_error(ytrue, ypred)
mape = mean_absolute_percentage_error(ytrue, ypred)*100
return mse, mae, mape
Past 7 day mean volatility
tscv = TimeSeriesSplit(n_splits=5)
mse_list=[]
mae_list=[]
mape_list=[]
for x, (train_index, test_index) in enumerate(tscv.split(interpolated_price)):
seven_day_avg = interpolated_price[train_index[-7:]].std()*np.sqrt(365)
act_avg = interpolated_price[test_index[:7]].std()*np.sqrt(365)
mse, mae, mape = calculate_error_metrics(ypred=[seven_day_avg], ytrue=[act_avg])
mse_list.append(mse)
mae_list.append(mae)
mape_list.append(mape)
print(f'Forecast : {seven_day_avg}')
print(f'Actual : {act_avg}')
results = {
'type': ['Seven Day Average'],
'mean_squared_error' : [np.mean(mse_list)],
'mean_absolute_error' : [np.mean(mae_list)],
'mean_absolute_percentage_error' : [np.mean(mape_list)]
}
df_summary = pd.concat([df_summary, pd.DataFrame(results)], axis=0, ignore_index=True)
Random Walk
tscv = TimeSeriesSplit(n_splits=5)
mse_list=[]
mae_list=[]
mape_list=[]
for x, (train_index, test_index) in enumerate(tscv.split(interpolated_price)):
rw_forecasted = generate_rw(interpolated_price[train_index[-7]], n=7, sigma=1)
rw_forecasted_vol = pd.Series(rw_forecasted).std()*np.sqrt(365)
act_avg = interpolated_price[:test_index[0]].std()*np.sqrt(365)
mse, mae, mape = calculate_error_metrics(ypred=[rw_forecasted_vol], ytrue=[act_avg])
mse_list.append(mse)
mae_list.append(mae)
mape_list.append(mape)
print(f'Forecast : {rw_forecasted_vol}')
print(f'Actual : {act_avg}')
results = {
'type': ['Random Walk'],
'mean_squared_error' : [np.mean(mse_list)],
'mean_absolute_error' : [np.mean(mae_list)],
'mean_absolute_percentage_error' : [np.mean(mape_list)]
}
df_summary = pd.concat([df_summary, pd.DataFrame(results)], axis=0, ignore_index=True)
Past day volatility
tscv = TimeSeriesSplit(n_splits=5)
mse_list=[]
mae_list=[]
mape_list=[]
for x, (train_index, test_index) in enumerate(tscv.split(interpolated_price)):
one_day_avg = interpolated_price[train_index[-2:]].std()*np.sqrt(365)
act_avg = interpolated_price[test_index[:2]].std()*np.sqrt(365)
mse, mae, mape = calculate_error_metrics(ypred=[one_day_avg], ytrue=[act_avg])
mse_list.append(mse)
mae_list.append(mae)
mape_list.append(mape)
print(f'Forecast : {one_day_avg}')
print(f'Actual : {act_avg}')
results = {
'type': ['One Day Average'],
'mean_squared_error' : [np.mean(mse_list)],
'mean_absolute_error' : [np.mean(mae_list)],
'mean_absolute_percentage_error' : [np.mean(mape_list)]
}
df_summary = pd.concat([df_summary, pd.DataFrame(results)], axis=0, ignore_index=True)
Genesis Implied Volatility
tscv = TimeSeriesSplit(n_splits=5)
mse_list=[]
mae_list=[]
mape_list=[]
for x, (train_index, test_index) in enumerate(tscv.split(a)):
g_iv = a['Result'][train_index]
act_rv = a['log_returns'][train_index]
mse, mae, mape = calculate_error_metrics(ypred=g_iv, ytrue=act_rv)
mse_list.append(mse)
mae_list.append(mae)
mape_list.append(mape)
results = {
'type': ['Genesis Implied Volatility'],
'mean_squared_error' : [np.mean(mse_list)],
'mean_absolute_error' : [np.mean(mae_list)],
'mean_absolute_percentage_error' : [np.mean(mape_list)]
}
df_summary = pd.concat([df_summary, pd.DataFrame(results)], axis=0, ignore_index=True)
t3index Implied Volatility
tscv = TimeSeriesSplit(n_splits=5)
mse_list=[]
mae_list=[]
mape_list=[]
for x, (train_index, test_index) in enumerate(tscv.split(a)):
g_iv = b['volatility_index'][train_index]
act_rv = b['log_returns'][train_index]
mse, mae, mape = calculate_error_metrics(ypred=g_iv, ytrue=act_rv)
mse_list.append(mse)
mae_list.append(mae)
mape_list.append(mape)
results = {
'type': ['t3index Implied Volatility'],
'mean_squared_error' : [np.mean(mse_list)],
'mean_absolute_error' : [np.mean(mae_list)],
'mean_absolute_percentage_error' : [np.mean(mape_list)]
}
df_summary = pd.concat([df_summary, pd.DataFrame(results)], axis=0, ignore_index=True)
GARCH(1,1) Model
garch = arch_model(interpolated_price, vol='garch', p=1,q=1,o=0, rescale=True, dist='normal')
fgarch = garch.fit(update_freq=5)
resid = fgarch.resid
st_resid = np.divide(resid, fgarch.conditional_volatility)
ts_plot(resid, st_resid, lags=40)
fgarch.summary()
forcasted_vol = fgarch.forecast(horizon=5)
vol_for = np.sqrt(forcasted_vol.variance.iloc[-1].mean())*np.sqrt(365)
print(
f"""
Forecasted volatility: {vol_for}
Realised volatility: {vol_act}
"""
)
tscv = TimeSeriesSplit(n_splits=5)
mse_list=[]
mae_list=[]
mape_list=[]
for x, (train_index, test_index) in enumerate(tscv.split(interpolated_price)):
print(f'{x} Fold')
print(f'---')
garch = arch_model(interpolated_price, vol='garch', p=1,q=1,o=0, rescale=True, dist='normal')
fgarch = garch.fit(update_freq=5, last_obs=max(interpolated_price[train_index].index).date().strftime("%Y-%m-%d"))
forcasted_vol = fgarch.forecast(horizon=5)
forecast = np.sqrt(forcasted_vol.variance['h.2'])*np.sqrt(365)
forecast = forecast[:len(test_index)]
actual = interpolated_price.rolling(30).std()*np.sqrt(365)
fig1 = plt.figure()
ax1 = fig1.add_subplot(1, 1, 1)
ax1.plot(forecast, label='Arch(1) Forecast', linewidth=1, color='Red')
ax1.plot(actual.loc[forecast.index], label='Rolling 30 Day Realized Vol', linewidth=1, color='Blue')
ax1.title.set_text('Actual vs Predicted')
ax1.set_xlabel('Date')
ax1.set_ylabel('Volatility')
ax1.legend()
plt.grid(True)
plt.show()
x = pd.DataFrame(forecast).join(pd.DataFrame(actual)).dropna()
mse, mae, mape = calculate_error_metrics(ypred=x['h.2'], ytrue=x['log_returns'])
mse_list.append(mse)
mae_list.append(mae)
mape_list.append(mape)
results = {
'type': ['Garch (1,1)'],
'mean_squared_error' : [np.mean(mse_list)],
'mean_absolute_error' : [np.mean(mae_list)],
'mean_absolute_percentage_error' : [np.mean(mape_list)]
}
df_summary = pd.concat([df_summary, pd.DataFrame(results)], axis=0, ignore_index=True)
ARCH(1) Model
Cross Validation
tscv = TimeSeriesSplit(n_splits=5)
mse_list=[]
mae_list=[]
mape_list=[]
for x, (train_index, test_index) in enumerate(tscv.split(interpolated_price)):
print(f'{x} Fold')
print(f'---')
garch = arch_model(interpolated_price, vol='arch', p=1,q=0,o=0, rescale=True, dist='normal')
fgarch = garch.fit(update_freq=5, last_obs=max(interpolated_price[train_index].index).date().strftime("%Y-%m-%d"))
forcasted_vol = fgarch.forecast(horizon=5)
forecast = np.sqrt(forcasted_vol.variance['h.2'])*np.sqrt(365)
forecast = forecast[:len(test_index)]
actual = interpolated_price.rolling(30).std()*np.sqrt(365)
fig1 = plt.figure()
ax1 = fig1.add_subplot(1, 1, 1)
ax1.plot(forecast, label='Arch(1) Forecast', linewidth=1, color='Red')
ax1.plot(actual.loc[forecast.index], label='Rolling 30 Day Realized Vol', linewidth=1, color='Blue')
ax1.title.set_text('Actual vs Predicted')
ax1.set_xlabel('Date')
ax1.set_ylabel('Volatility')
ax1.legend()
plt.grid(True)
plt.show()
x = pd.DataFrame(forecast).join(pd.DataFrame(actual)).dropna()
mse, mae, mape = calculate_error_metrics(ypred=x['h.2'], ytrue=x['log_returns'])
mse_list.append(mse)
mae_list.append(mae)
mape_list.append(mape)
results = {
'type': ['Arch (1)'],
'mean_squared_error' : [np.mean(mse_list)],
'mean_absolute_error' : [np.mean(mae_list)],
'mean_absolute_percentage_error' : [np.mean(mape_list)]
}
df_summary = pd.concat([df_summary, pd.DataFrame(results)], axis=0, ignore_index=True)
EGARCH(9,1)
Cross Validation
tscv = TimeSeriesSplit(n_splits=5)
mse_list=[]
mae_list=[]
mape_list=[]
for x, (train_index, test_index) in enumerate(tscv.split(interpolated_price)):
print(f'{x} Fold')
print(f'---')
garch = arch_model(
interpolated_price,
vol='egarch',
p=9,q=1,o=0,
rescale=True,
dist='normal')
fgarch = garch.fit(update_freq=5, last_obs=max(interpolated_price[train_index].index).date().strftime("%Y-%m-%d"))
forcasted_vol = fgarch.forecast(horizon=1)
forecast = np.sqrt(forcasted_vol.variance['h.1'])*np.sqrt(365)
forecast = forecast[:len(test_index)]
actual = interpolated_price.rolling(30).std()*np.sqrt(365)
fig1 = plt.figure()
ax1 = fig1.add_subplot(1, 1, 1)
ax1.plot(forecast, label='Egarch(9,1) Forecast', linewidth=1, color='Red')
ax1.plot(actual.loc[forecast.index], label='Rolling 30 Day Realized Vol', linewidth=1, color='Blue')
ax1.title.set_text('Actual vs Predicted')
ax1.set_xlabel('Date')
ax1.set_ylabel('Volatility')
ax1.legend()
plt.grid(True)
plt.show()
x = pd.DataFrame(forecast).join(pd.DataFrame(actual)).dropna()
mse, mae, mape = calculate_error_metrics(ypred=x['h.1'], ytrue=x['log_returns'])
mse_list.append(mse)
mae_list.append(mae)
mape_list.append(mape)
results = {
'type': ['Egarch (9,1)'],
'mean_squared_error' : [np.mean(mse_list)],
'mean_absolute_error' : [np.mean(mae_list)],
'mean_absolute_percentage_error' : [np.mean(mape_list)]
}
df_summary = pd.concat([df_summary, pd.DataFrame(results)], axis=0, ignore_index=True)
Conclusion
Additional: Simulation of Volatility Forecasts
forcasted_vol = fgarch.forecast(horizon=5, method ='simulation', reindex=False)
vol_for = forcasted_vol.variance.iloc[-1].mean()
x = np.arange(1, 6)
lines = plt.plot(x, np.sqrt(forcasted_vol.simulations.residual_variances[-1, ::5].T)*np.sqrt(365), color="#9cb2d6", alpha=0.5, linewidth=0.8)
lines[0].set_label("Simulated path")
line = plt.plot(x, np.sqrt(forcasted_vol.variance.iloc[-1].values)*np.sqrt(365), color="#002868")
line[0].set_label("Expected variance")
plt.gca().set_xticks(x)
plt.gca().set_xlim(1, 5)
legend = plt.legend()