def ts_plot(residuals, stan_residuals, lags=50):
    residuals.plot(title='GARCH Residuals', figsize=(15, 10))
    plt.show()
    fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(20, 10))
    ax[0].set_title('GARCH Standardized Residuals KDE')
    ax[1].set_title('GARCH Standardized Resduals Probability Plot')    
    residuals.plot(kind='kde', ax=ax[0])
    probplot(stan_residuals, dist='norm', plot=ax[1])
    plt.show()
    acf = plot_acf(stan_residuals, lags=lags)
    pacf = plot_pacf(stan_residuals, lags=lags, method='ywm')
    acf.suptitle('GARCH Model Standardized Residual Autocorrelation', fontsize=20)
    acf.set_figheight(5)
    acf.set_figwidth(15)
    pacf.set_figheight(5)
    pacf.set_figwidth(15)
    plt.show()
Modeling Volatility of BTC Price
def aggregate_data(df, time_interval, coin): 
    filter = df['ticker'] == coin
    df['hour'] = df['timestamp'].apply(lambda x: x.to_period(freq='H'))
    df['day'] = df['timestamp'].apply(lambda x: x.to_period(freq='D'))
    yesterday = (datetime.datetime.utcnow() - datetime.timedelta(1))
    days = (yesterday - df[filter]['day'].min().to_timestamp()).days
    if time_interval == 'hour': 
        time_index = pd.date_range(df['hour'].min().to_timestamp(), periods=days*24, freq="H").to_period(freq='H').to_timestamp()
        time_spine = pd.DataFrame(index=time_index)
        grouped = df[filter].groupby('hour').agg(min=('price_usd', 'min'),
                                mean=('price_usd', 'mean'),
                                max=('price_usd', 'max'),
                                volatility=('price_usd', 'std')
                                ).reset_index()
    elif time_interval == 'day': 
        time_index = pd.date_range(df['hour'].min().to_timestamp(), periods=days, freq="D").to_period(freq='D').to_timestamp()
        time_spine = pd.DataFrame(index=time_index)
        grouped = df[filter].groupby('day').agg(min=('price_usd', 'min'),
                                mean=('price_usd', 'mean'),
                                max=('price_usd', 'max'),
                                volatility=('price_usd', 'std')
                                ).reset_index()
    grouped['log_price'] = np.log(grouped['mean'])
    grouped['p_log_price'] = grouped['log_price'].shift(1)
    grouped['log_returns'] = (grouped['log_price'] - grouped['p_log_price']) * 100
    grouped[time_interval] = grouped[time_interval].apply(lambda x: x.to_timestamp())
    returns = grouped.set_index(time_interval)['log_returns']
    return grouped, time_spine.join(pd.DataFrame(returns))
def gridsearch(data, p_rng, q_rng, model_type):
    top_score, top_results = float('inf'), None
    top_models = []
    for p in p_rng:
        for q in q_rng:
            try:
                model = arch_model(data, vol=model_type, p=p, q=q, dist='normal')
                model_fit = model.fit(disp='off')
                resid = model_fit.resid
                st_resid = np.divide(resid, model_fit.conditional_volatility)
                results = evaluate_model(resid, st_resid)
                results['AIC'] = model_fit.aic
                results['params']['p'] = p
                results['params']['q'] = q
                if results['AIC'] < top_score: 
                    top_score = results['AIC']
                    top_results = results
                elif results['LM_pvalue'][1] is False:
                    top_models.append(results)
            except:
                continue
    top_models.append(top_results)
    return top_models
                
def evaluate_model(residuals, st_residuals, lags=50):
    results = {
        'LM_pvalue': None,
        'F_pvalue': None,
        'SW_pvalue': None,
        'AIC': None,
        'params': {'p': None, 'q': None}
    }
    arch_test = het_arch(residuals, nlags=lags)
    shap_test = shapiro(st_residuals)
    results['LM_pvalue'] = [arch_test[1], arch_test[1] < .05]
    results['F_pvalue'] = [arch_test[3], arch_test[3] < .05]
    results['SW_pvalue'] = [shap_test[1], shap_test[1] < .05]
    return results
def generate_rw(y0, n, sigma, ymin=None, ymax=None):
    rw = [y0]
    for t in range(1, n):
        yt = rw[t-1] + np.random.normal(0,sigma)
        if (ymax is not None) and (yt > ymax):
            yt = rw[t-1] - abs(np.random.normal(0,sigma))
        elif (ymin is not None) and (yt < ymin):
            yt = rw[t-1] + abs(np.random.normal(0,sigma))
        rw.append(yt)
    return rw
df, price = aggregate_data(df_1, 'day', 'BTC')
interpolated_price = price.interpolate(method='from_derivatives').dropna()['log_returns']
X = interpolated_price.reset_index()['index']
y = interpolated_price.reset_index()['log_returns']
train = int(np.floor(len(price)*0.95))
test = int(np.floor(len(price)*0.05))
split_date = interpolated_price[train:]
Testing for ADF, Dicky-Fuller GLS, Phillips-Perron and KPSS
Realized Volatility and Log Returns
Comparing Implied Volatility to Realized Volatility
with open('/work/btc_iv.json', 'r') as f:
  btc_iv = json.load(f)
btc_iv = pd.DataFrame.from_dict(btc_iv['Series']['ATM 30']['Data'])
btc_iv['Timestamp'] = btc_iv['Timestamp'].apply(lambda x: datetime.datetime.utcfromtimestamp(x).date())
btc_iv['Result'] = btc_iv['Result']
btc_iv.set_index('Timestamp', inplace=True)
bitvol_iv = pd.read_csv('bitvol.csv')
bitvol_iv['date'] = bitvol_iv['date'].apply(lambda x: datetime.datetime.strptime(x, '%d/%m/%Y %H:%M').date())
bitvol_iv = bitvol_iv.sort_values(by='date').set_index('date')
IV from Genesis Volatility and t3index
a = btc_iv.join(interpolated_price.rolling(30).std()*np.sqrt(365)).dropna()
b = bitvol_iv.join(interpolated_price.rolling(30).std()*np.sqrt(365)).dropna()
fig1 = plt.figure()
ax1 = fig1.add_subplot(1, 1, 1)
ax1.plot(a['Result'], label=f'Genesis Implied Volatility', linewidth=1, color='Red')
ax1.plot(b['volatility_index'], label=f't3index Implied Volatility', linewidth=1, color='Black')
ax1.plot(a['log_returns'], label=f'Realized Volatility', linewidth=1, color='Blue')
ax1.title.set_text('BTC IV vs RV')
ax1.set_xlabel('Date')
ax1.set_ylabel('Volatility')
ax1.legend()
plt.grid(True)
plt.show()
a.corr()
b.corr()
Auto-correlation of Volatility
Gridsearch to find the best P and Q params
For GARCH
For EGARCH
Model Fit and Diagnostics
vol_act = interpolated_price[test:].std()*np.sqrt(365)
Establishing Baseline
df_summary = pd.DataFrame(columns=['type', 'mean_squared_error', 'mean_absolute_error', 'mean_absolute_percentage_error'])
def log_results(summary_df, ytrue, ypred, exp_type): 
    df = pd.DataFrame(data=[exp_type], columns=['type'])
    df['mean_squared_error'] = mean_squared_error([ytrue], [ypred])
    df['mean_absolute_error'] = mean_absolute_error([ytrue], [ypred])
    df['mean_absolute_percentage_error'] = mean_absolute_percentage_error([ytrue], [ypred])*100
    merged_df = pd.concat([summary_df, df], axis=0, ignore_index=True)
    return merged_df
def calculate_error_metrics(ytrue, ypred):
    mse = mean_squared_error(ytrue, ypred)
    mae = mean_absolute_error(ytrue, ypred)
    mape = mean_absolute_percentage_error(ytrue, ypred)*100
    return mse, mae, mape
Past 7 day mean volatility
tscv = TimeSeriesSplit(n_splits=5)
mse_list=[]
mae_list=[]
mape_list=[]
for x, (train_index, test_index) in enumerate(tscv.split(interpolated_price)):
    seven_day_avg = interpolated_price[train_index[-7:]].std()*np.sqrt(365)
    act_avg = interpolated_price[test_index[:7]].std()*np.sqrt(365)
    mse, mae, mape = calculate_error_metrics(ypred=[seven_day_avg], ytrue=[act_avg])
    mse_list.append(mse)
    mae_list.append(mae)
    mape_list.append(mape)
    print(f'Forecast : {seven_day_avg}')
    print(f'Actual : {act_avg}')
results = {
    'type': ['Seven Day Average'], 
    'mean_squared_error' : [np.mean(mse_list)], 
    'mean_absolute_error' : [np.mean(mae_list)], 
    'mean_absolute_percentage_error' : [np.mean(mape_list)]
    }
df_summary = pd.concat([df_summary, pd.DataFrame(results)], axis=0, ignore_index=True)
Random Walk
tscv = TimeSeriesSplit(n_splits=5)
mse_list=[]
mae_list=[]
mape_list=[]
for x, (train_index, test_index) in enumerate(tscv.split(interpolated_price)):
    rw_forecasted = generate_rw(interpolated_price[train_index[-7]], n=7, sigma=1)
    rw_forecasted_vol = pd.Series(rw_forecasted).std()*np.sqrt(365)
    act_avg = interpolated_price[:test_index[0]].std()*np.sqrt(365)
    mse, mae, mape = calculate_error_metrics(ypred=[rw_forecasted_vol], ytrue=[act_avg])
    mse_list.append(mse)
    mae_list.append(mae)
    mape_list.append(mape)
    print(f'Forecast : {rw_forecasted_vol}')
    print(f'Actual : {act_avg}')
results = {
    'type': ['Random Walk'], 
    'mean_squared_error' : [np.mean(mse_list)], 
    'mean_absolute_error' : [np.mean(mae_list)], 
    'mean_absolute_percentage_error' : [np.mean(mape_list)]
    }
df_summary = pd.concat([df_summary, pd.DataFrame(results)], axis=0, ignore_index=True)
Past day volatility
tscv = TimeSeriesSplit(n_splits=5)
mse_list=[]
mae_list=[]
mape_list=[]
for x, (train_index, test_index) in enumerate(tscv.split(interpolated_price)):
    one_day_avg = interpolated_price[train_index[-2:]].std()*np.sqrt(365)
    act_avg = interpolated_price[test_index[:2]].std()*np.sqrt(365)
    mse, mae, mape = calculate_error_metrics(ypred=[one_day_avg], ytrue=[act_avg])
    mse_list.append(mse)
    mae_list.append(mae)
    mape_list.append(mape)
    print(f'Forecast : {one_day_avg}')
    print(f'Actual : {act_avg}')
results = {
    'type': ['One Day Average'], 
    'mean_squared_error' : [np.mean(mse_list)], 
    'mean_absolute_error' : [np.mean(mae_list)], 
    'mean_absolute_percentage_error' : [np.mean(mape_list)]
    }
df_summary = pd.concat([df_summary, pd.DataFrame(results)], axis=0, ignore_index=True)
Genesis Implied Volatility
tscv = TimeSeriesSplit(n_splits=5)
mse_list=[]
mae_list=[]
mape_list=[]
for x, (train_index, test_index) in enumerate(tscv.split(a)):
    g_iv = a['Result'][train_index]
    act_rv = a['log_returns'][train_index]
    mse, mae, mape = calculate_error_metrics(ypred=g_iv, ytrue=act_rv)
    mse_list.append(mse)
    mae_list.append(mae)
    mape_list.append(mape)
results = {
    'type': ['Genesis Implied Volatility'], 
    'mean_squared_error' : [np.mean(mse_list)], 
    'mean_absolute_error' : [np.mean(mae_list)], 
    'mean_absolute_percentage_error' : [np.mean(mape_list)]
    }
df_summary = pd.concat([df_summary, pd.DataFrame(results)], axis=0, ignore_index=True)
t3index Implied Volatility
tscv = TimeSeriesSplit(n_splits=5)
mse_list=[]
mae_list=[]
mape_list=[]
for x, (train_index, test_index) in enumerate(tscv.split(a)):
    g_iv = b['volatility_index'][train_index]
    act_rv = b['log_returns'][train_index]
    mse, mae, mape = calculate_error_metrics(ypred=g_iv, ytrue=act_rv)
    mse_list.append(mse)
    mae_list.append(mae)
    mape_list.append(mape)
results = {
    'type': ['t3index Implied Volatility'], 
    'mean_squared_error' : [np.mean(mse_list)], 
    'mean_absolute_error' : [np.mean(mae_list)], 
    'mean_absolute_percentage_error' : [np.mean(mape_list)]
    }
df_summary = pd.concat([df_summary, pd.DataFrame(results)], axis=0, ignore_index=True)
GARCH(1,1) Model
garch = arch_model(interpolated_price, vol='garch', p=1,q=1,o=0, rescale=True, dist='normal')
fgarch = garch.fit(update_freq=5) 
resid = fgarch.resid
st_resid = np.divide(resid, fgarch.conditional_volatility)
ts_plot(resid, st_resid, lags=40)
fgarch.summary()
forcasted_vol = fgarch.forecast(horizon=5)
vol_for = np.sqrt(forcasted_vol.variance.iloc[-1].mean())*np.sqrt(365)
print(
f"""
Forecasted volatility: {vol_for}
Realised volatility: {vol_act}
"""
)
tscv = TimeSeriesSplit(n_splits=5)
mse_list=[]
mae_list=[]
mape_list=[]
for x, (train_index, test_index) in enumerate(tscv.split(interpolated_price)):
    print(f'{x} Fold')
    print(f'---')
    garch = arch_model(interpolated_price, vol='garch', p=1,q=1,o=0, rescale=True, dist='normal')
    fgarch = garch.fit(update_freq=5, last_obs=max(interpolated_price[train_index].index).date().strftime("%Y-%m-%d")) 
    forcasted_vol = fgarch.forecast(horizon=5)
    forecast = np.sqrt(forcasted_vol.variance['h.2'])*np.sqrt(365)
    forecast = forecast[:len(test_index)]
    actual = interpolated_price.rolling(30).std()*np.sqrt(365)
    fig1 = plt.figure()
    ax1 = fig1.add_subplot(1, 1, 1)
    ax1.plot(forecast, label='Arch(1) Forecast', linewidth=1, color='Red')
    ax1.plot(actual.loc[forecast.index], label='Rolling 30 Day Realized Vol', linewidth=1, color='Blue')
    ax1.title.set_text('Actual vs Predicted')
    ax1.set_xlabel('Date')
    ax1.set_ylabel('Volatility')
    ax1.legend()
    plt.grid(True)
    plt.show()
    x = pd.DataFrame(forecast).join(pd.DataFrame(actual)).dropna()
    mse, mae, mape = calculate_error_metrics(ypred=x['h.2'], ytrue=x['log_returns'])
    mse_list.append(mse)
    mae_list.append(mae)
    mape_list.append(mape)
results = {
    'type': ['Garch (1,1)'], 
    'mean_squared_error' : [np.mean(mse_list)], 
    'mean_absolute_error' : [np.mean(mae_list)], 
    'mean_absolute_percentage_error' : [np.mean(mape_list)]
    }
df_summary = pd.concat([df_summary, pd.DataFrame(results)], axis=0, ignore_index=True)
ARCH(1) Model
Cross Validation
tscv = TimeSeriesSplit(n_splits=5)
mse_list=[]
mae_list=[]
mape_list=[]
for x, (train_index, test_index) in enumerate(tscv.split(interpolated_price)):
    print(f'{x} Fold')
    print(f'---')
    garch = arch_model(interpolated_price, vol='arch', p=1,q=0,o=0, rescale=True, dist='normal')
    fgarch = garch.fit(update_freq=5, last_obs=max(interpolated_price[train_index].index).date().strftime("%Y-%m-%d")) 
    forcasted_vol = fgarch.forecast(horizon=5)
    forecast = np.sqrt(forcasted_vol.variance['h.2'])*np.sqrt(365)
    forecast = forecast[:len(test_index)]
    actual = interpolated_price.rolling(30).std()*np.sqrt(365)
    fig1 = plt.figure()
    ax1 = fig1.add_subplot(1, 1, 1)
    ax1.plot(forecast, label='Arch(1) Forecast', linewidth=1, color='Red')
    ax1.plot(actual.loc[forecast.index], label='Rolling 30 Day Realized Vol', linewidth=1, color='Blue')
    ax1.title.set_text('Actual vs Predicted')
    ax1.set_xlabel('Date')
    ax1.set_ylabel('Volatility')
    ax1.legend()
    plt.grid(True)
    plt.show()
    x = pd.DataFrame(forecast).join(pd.DataFrame(actual)).dropna()
    mse, mae, mape = calculate_error_metrics(ypred=x['h.2'], ytrue=x['log_returns'])
    mse_list.append(mse)
    mae_list.append(mae)
    mape_list.append(mape)
results = {
    'type': ['Arch (1)'], 
    'mean_squared_error' : [np.mean(mse_list)], 
    'mean_absolute_error' : [np.mean(mae_list)], 
    'mean_absolute_percentage_error' : [np.mean(mape_list)]
    }
df_summary = pd.concat([df_summary, pd.DataFrame(results)], axis=0, ignore_index=True)
EGARCH(9,1)
Cross Validation
tscv = TimeSeriesSplit(n_splits=5)
mse_list=[]
mae_list=[]
mape_list=[]
for x, (train_index, test_index) in enumerate(tscv.split(interpolated_price)):
    print(f'{x} Fold')
    print(f'---')
    garch = arch_model(
                    interpolated_price, 
                    vol='egarch', 
                    p=9,q=1,o=0, 
                    rescale=True, 
                    dist='normal')
    fgarch = garch.fit(update_freq=5, last_obs=max(interpolated_price[train_index].index).date().strftime("%Y-%m-%d")) 
    forcasted_vol = fgarch.forecast(horizon=1)
    forecast = np.sqrt(forcasted_vol.variance['h.1'])*np.sqrt(365)
    forecast = forecast[:len(test_index)]
    actual = interpolated_price.rolling(30).std()*np.sqrt(365)
    fig1 = plt.figure()
    ax1 = fig1.add_subplot(1, 1, 1)
    ax1.plot(forecast, label='Egarch(9,1) Forecast', linewidth=1, color='Red')
    ax1.plot(actual.loc[forecast.index], label='Rolling 30 Day Realized Vol', linewidth=1, color='Blue')
    ax1.title.set_text('Actual vs Predicted')
    ax1.set_xlabel('Date')
    ax1.set_ylabel('Volatility')
    ax1.legend()
    plt.grid(True)
    plt.show()
    x = pd.DataFrame(forecast).join(pd.DataFrame(actual)).dropna()
    mse, mae, mape = calculate_error_metrics(ypred=x['h.1'], ytrue=x['log_returns'])
    mse_list.append(mse)
    mae_list.append(mae)
    mape_list.append(mape)
results = {
    'type': ['Egarch (9,1)'], 
    'mean_squared_error' : [np.mean(mse_list)], 
    'mean_absolute_error' : [np.mean(mae_list)], 
    'mean_absolute_percentage_error' : [np.mean(mape_list)]
    }
df_summary = pd.concat([df_summary, pd.DataFrame(results)], axis=0, ignore_index=True)
Conclusion
Additional: Simulation of Volatility Forecasts
forcasted_vol = fgarch.forecast(horizon=5, method ='simulation', reindex=False)
vol_for = forcasted_vol.variance.iloc[-1].mean()
x = np.arange(1, 6)
lines = plt.plot(x, np.sqrt(forcasted_vol.simulations.residual_variances[-1, ::5].T)*np.sqrt(365), color="#9cb2d6", alpha=0.5, linewidth=0.8)
lines[0].set_label("Simulated path")
line = plt.plot(x, np.sqrt(forcasted_vol.variance.iloc[-1].values)*np.sqrt(365), color="#002868")
line[0].set_label("Expected variance")
plt.gca().set_xticks(x)
plt.gca().set_xlim(1, 5)
legend = plt.legend()