# Install packackes
!pip install gspread
import numpy as np
from datetime import date, datetime, timedelta
import pytz
import requests
import io
import pandas as pd
import gspread
# Google Drive file path
ROOT = '/datasets/tims-google-drive/'
PROJECT = 'Leisure & Travel/Skiing/Snow Monitoring/'
CREDENTIALS = 'GoogleDriveAccess.json'
# Google Sheets to create. We will fill these with data later.
sheets = [
{'title': 'Weather', 'workbook_id': '1svxkaCDCtXuV-EQhA9vdQkQvplp507uxg6MIIhCw3Ts'},
{'title': 'Metadata', 'workbook_id': '1svxkaCDCtXuV-EQhA9vdQkQvplp507uxg6MIIhCw3Ts'},
{'title': 'Models', 'workbook_id': '1wj8Jmt8uhjA3dEtuKqCBL3KQyyt9OwZ6jbKsgD83ODk'},
{'title': 'Fit', 'workbook_id': '1wj8Jmt8uhjA3dEtuKqCBL3KQyyt9OwZ6jbKsgD83ODk'},
{'title': 'Criteria', 'workbook_id': '1wj8Jmt8uhjA3dEtuKqCBL3KQyyt9OwZ6jbKsgD83ODk'}
]
# NWAC data portal
BASE_URL = 'https://nwac.us/data-portal/csv'
# Name of the NWAC data download timestamp column.
TIMESTAMP = 'Date/Time (PST)'
# Relative date range in days. NWAC permits downloads of up to 365 days of data at a time.
DATE_RANGE = 30
# NWAC measurement areas. Each area contains one or more weather stations.
AREAS = [
'crystal', 'snoqualmie-pass', 'stevens-pass', 'mt-baker-ski-area',
'washington-pass', 'mt-rainier', 'mt-st-helens',
'white-pass-ski-area', 'lake-wenatchee', 'hurricane-ridge', 'chinook-pass'
]
# NWAC measures. Include interpolation criteria for those that require correction.
MEASURES = [
{'Name': 'temperature', 'Units': 'degrees F', 'Interpolation': {'Trigger': 20, 'Increment': 10}},
{'Name': 'precipitation', 'Units': 'inches SWE'},
{'Name': 'snowfall_24_hour', 'Units': 'inches', 'Interpolation': {'Trigger': 10, 'Increment': 2, 'Floor': 0}},
{'Name': 'snow_depth', 'Units': 'inches', 'Interpolation': {'Trigger': 10, 'Increment': 2, 'Floor': 0}},
{'Name': 'wind_speed_minimum', 'Units': 'mph'},
{'Name': 'wind_speed_average', 'Units': 'mph'},
{'Name': 'wind_speed_maximum', 'Units': 'mph'}
]
# Set the number of interpolation models to run and select the best fit from.
INTERPOLATIONS = 3
# Set an interpolation rate above which to discard data entirely.
JUNK = 0.35
# Return the current time as a datetime in Pacific time
def now_pt():
now_utc = datetime.now(tz = pytz.utc)
now_pt = now_utc.astimezone(pytz.timezone('US/Pacific'))
now_pt = now_pt.replace(tzinfo = None)
return now_pt
# Return time formatted as string
def hour_str(ts = now_pt()):
ts_formatted = ts.strftime('%-m/%-d/%y %-I%p')
return ts_formatted
# Get an upper bound equal to the today's most recent hour, allowing for some amount data lag
# period_end = datetime.now(pytz.timezone('US/Pacific')) - timedelta(minutes=20)
period_end = now_pt() - timedelta(minutes=20)
period_end = datetime(period_end.year, period_end.month, period_end.day, period_end.hour)
# Get a lower bound equal to some number of days ago from today, starting at midnight
period_start = period_end - timedelta(days=DATE_RANGE-1)
period_start = datetime(period_start.year, period_start.month, period_start.day)
# Progress report
message = 'Will retrieve data from {t} through {e}'.format(t=hour_str(period_start), e=hour_str(period_end))
print(message)
# Get unique timestamps in date range
calendar_range = pd.date_range(start=period_start, end=period_end, freq='1h')
calendar_df = pd.DataFrame(calendar_range, columns=['Calendar Timestamp'])
calendar_df['Calendar Date'] = calendar_df['Calendar Timestamp'].dt.date.astype('datetime64[ns]')
# Select and sort columns
calendar_df = calendar_df[['Calendar Date', 'Calendar Timestamp']]
# Preview results
calendar_df.tail()
# Create a friendly measure alias
for measure in MEASURES:
measure['Alias'] = measure['Name'].replace('_', ' ').title()
# Get a dataframe
measures_df = pd.json_normalize(MEASURES, sep=' ')
attr={'Name': 'Measure Name', 'Alias': 'Measure Alias', 'Units': 'Measure Units'}
measures_df.rename(columns=attr, inplace=True)
# Preview results
measures_df.tail()
def get_data(area, measure, start_date, end_date):
"Get hourly data for a single measure for a single area."
# Define period
start_date = start_date.strftime('%Y-%m-%d')
end_date = end_date.strftime('%Y-%m-%d')
# Compose request URL
url = '{base}/{area}/sensortype/{measure}/start-date/{start}/end-date/{end}'.format(
base = BASE_URL,
area = area,
measure = measure['Name'],
start = start_date,
end = end_date
)
# Get request data
page = requests.get(url)
measure_df = pd.read_csv(io.StringIO(page.text))
measure_df['Area'] = area.replace('-', ' ').title()
measure_df.rename(columns={TIMESTAMP: 'Calendar Timestamp'}, inplace=True)
# Unpivot stations
measure_df = pd.melt(
measure_df,
id_vars = ['Area', 'Calendar Timestamp'],
var_name = 'Station Detail',
value_name = 'Measure Value'
)
# Drop null values
measure_df = measure_df.dropna(how='any')
# Raise error if empty
if measure_df.empty:
error = 'No {measure} measurements between {start} and {end}.'.format(
measure = measure['Alias'],
start = start_date,
end = end_date
)
raise Exception(error)
# Parse station detail
try:
measure_df['Elevation'] = measure_df['Station Detail'].str.extract(pat='\((.*)\'\)')
measure_df['Elevation'] = measure_df['Elevation'].str.strip().astype(int)
measure_df['Location'] = measure_df['Station Detail'].str.extract(pat=' - (.*)')
measure_df['Location'] = measure_df['Location'].str.split(pat='(', n=1, expand=True)[0]
measure_df['Station'] = measure_df['Station Detail'].str.extract(pat='\)(.*)')
measure_df['Station'] = measure_df['Location'].fillna(measure_df['Station']).str.strip()
measure_df['Station'] = (
measure_df['Elevation']
.map('{:,}'.format)
.astype(str) + "' " + measure_df['Station']
)
except:
error = 'Unable to parse station detail for {area}'.format(area=area)
raise Exception(error)
# Force data types
measure_df['Calendar Timestamp'] = pd.to_datetime(
measure_df['Calendar Timestamp'].str.strip(),
errors = 'coerce'
)
measure_df['Measure Value'] = pd.to_numeric(
measure_df['Measure Value'],
errors = 'coerce'
)
# Aggregate for safety in case grain is unexpected
dim = ['Calendar Timestamp', 'Area', 'Station', 'Elevation']
measure_df = measure_df.groupby(dim, dropna=False, as_index=False)['Measure Value'].mean()
return measure_df
# Create an empty dataframe to union each area to
nwac_df = pd.DataFrame()
for area in AREAS:
# Progress update
message = '\n{a}'.format(a=area.upper())
print(message)
for measure in MEASURES:
# Get data from NWAC
try:
measure_df = get_data(area, measure, period_start, period_end)
print(measure_df)
except Exception as x:
print(x)
else:
# Get measure name
measure_df['Measure Name'] = measure['Name']
# Select and sort colums
dim = ['Calendar Timestamp', 'Area', 'Station', 'Elevation', 'Measure Name']
fct = ['Measure Value']
measure_df = measure_df[dim + fct]
# Append to results
nwac_df = pd.concat([nwac_df, measure_df])
# Progress update
message = 'Retrieved {measure} data from {start} to {end}'.format(
measure = measure['Alias'],
start = period_start,
end = period_end
)
print(message)
# Preview results
nwac_df.tail()
# Cross join stations and dates
calendar_col = ['Calendar Date', 'Calendar Timestamp']
station_col = ['Area', 'Station', 'Elevation']
measure_col = ['Measure Name', 'Measure Alias', 'Interpolation Trigger', 'Interpolation Increment', 'Interpolation Floor']
dense_df = pd.merge(
calendar_df[calendar_col],
nwac_df[station_col].dropna(how='any').drop_duplicates(),
how = 'cross'
).merge(
measures_df[measure_col],
how = 'cross'
)
# Join dense dates with measurements
dense_dim = ['Calendar Timestamp', 'Area', 'Station', 'Measure Name']
dense_fct = ['Measure Value']
dense_df = dense_df.merge(
nwac_df[dense_dim + dense_fct],
how = 'left',
on = dense_dim
)
# Select and sort columns
dim = ['Calendar Date', 'Calendar Timestamp', 'Area', 'Station', 'Elevation', 'Measure Name', 'Measure Alias']
fct = ['Measure Value', 'Interpolation Trigger', 'Interpolation Increment', 'Interpolation Floor']
dense_df = dense_df[dim + fct]
# Preview results
dense_df.tail()
# Get a list of measures to interpolate
interpolated_measures = [measure['Name'] for measure in MEASURES if 'Interpolation' in measure.keys()]
# Get measurements for measures that require interpolation
remeasure_df = dense_df[dense_df['Measure Name'].isin(interpolated_measures)].reset_index(drop=True)
# Select and sort columns
dim = ['Calendar Date', 'Calendar Timestamp', 'Area', 'Station', 'Measure Name']
fct = ['Measure Value', 'Interpolation Trigger', 'Interpolation Increment', 'Interpolation Floor']
remeasure_df = remeasure_df[dim + fct]
# Preview results
remeasure_df.tail()
# Initialize a list of anchor dates
anchor_dates = []
# Create anchor dates equally spaced throughout the period
for d in pd.date_range(start=period_start, end=period_end, periods=INTERPOLATIONS):
# Truncate to the hour
hourstamp = datetime(year=d.year, month=d.month, day=d.day, hour=d.hour)
if hourstamp not in anchor_dates:
anchor = hourstamp
anchor_dates.append(anchor)
# Progress update
anchor_dates_str = [hour_str(d) for d in anchor_dates]
message = 'Will anchor corrections to {d}'.format(d=', '.join(anchor_dates_str))
print(message)
def interpolate(target_df, baseline_df, target_date, offset=1):
# Get target measurements
target_dim = ['Calendar Date', 'Calendar Timestamp', 'Area', 'Station', 'Measure Name']
target_fct = ['Measure Value', 'Interpolation Trigger', 'Interpolation Increment', 'Interpolation Floor']
target_df = target_df[target_df['Calendar Timestamp'] == target_date][target_dim + target_fct].copy()
# Get baseline measurements
baseline_dim = ['Area', 'Station', 'Measure Name']
baseline_fct = ['Upper Bound', 'Lower Bound', 'Corrected Value']
baseline_date = target_date - timedelta(hours=offset)
baseline_df = baseline_df[baseline_df['Calendar Timestamp']==baseline_date][baseline_dim + baseline_fct].copy()
# Join target measurements with baseline measurements for comparison
df = pd.merge(
target_df,
baseline_df,
how = 'inner',
on = baseline_dim
)
# Get hourly change
df['Hourly Change'] = df['Measure Value'] - df['Corrected Value']
# Get upper bound
df['Upper Bound'] = np.where(
df['Hourly Change'].abs() <= df['Interpolation Trigger'],
df['Measure Value'],
df['Upper Bound'] + df['Interpolation Increment']
)
# Get lower bound
df['Lower Bound'] = np.where(
df['Hourly Change'].abs() <= df['Interpolation Trigger'],
df['Measure Value'],
df['Lower Bound'] - df['Interpolation Increment']
)
df['Lower Bound'] = np.where(
df['Interpolation Floor'].isna(),
df['Lower Bound'],
df[['Lower Bound', 'Interpolation Floor']].max(axis=1)
)
# Get a Boolean indicator for missing values
df['Is Missing'] = df['Measure Value'].isna()
# Get a Boolean indicator for corrections
df['Is Corrected'] = (
~df['Measure Value'].isna() &
~df['Measure Value'].between(df['Lower Bound'], df['Upper Bound'])
)
# Get corrected measure
df['Corrected Value'] = np.where(
df['Is Corrected'],
df['Corrected Value'],
df['Measure Value']
)
return df
# Create an empty data frame
models_df = pd.DataFrame()
for anchor_date in anchor_dates:
# Progress update
message = 'Anchoring corrections to {d}'.format(d=hour_str(anchor_date))
print(message)
# Get initial anchor records
anchor_df = remeasure_df[
(remeasure_df['Calendar Timestamp']==anchor_date) &
~(remeasure_df['Measure Value'].isna())
].copy()
# Interpolate
anchor_df['Hourly Change'] = 0
anchor_df['Lower Bound'] = anchor_df['Measure Value']
anchor_df['Upper Bound'] = anchor_df['Measure Value']
anchor_df['Corrected Value'] = anchor_df['Measure Value']
anchor_df['Is Missing'] = False
anchor_df['Is Corrected'] = False
# Select and sort columns
dim = ['Calendar Date', 'Calendar Timestamp', 'Area', 'Station', 'Measure Name']
fct = ['Measure Value', 'Hourly Change', 'Interpolation Trigger', 'Interpolation Increment', 'Interpolation Floor']
cor = ['Lower Bound', 'Upper Bound', 'Is Missing', 'Is Corrected', 'Corrected Value']
anchor_df = anchor_df[dim + fct + cor]
# Initialize reference dates for filtering
forward_date = anchor_date + timedelta(hours=1)
backward_date = anchor_date - timedelta(hours=1)
# Get forward comparison
while True:
# Interpolate
forward_df = interpolate(remeasure_df, anchor_df, forward_date, offset=1)
if forward_df.empty:
break
# Append to results
anchor_df = pd.concat([anchor_df, forward_df])
# Advance to next timestamp
forward_date = forward_date + timedelta(hours=1)
# Get backward comparison
while True:
# Interpolate
backward_df = interpolate(remeasure_df, anchor_df, backward_date, offset=-1)
if backward_df.empty:
break
# Append to results
anchor_df = pd.concat([anchor_df, backward_df])
# Advance to next timestamp
backward_date = backward_date - timedelta(hours=1)
# Append to results
anchor_df['Anchor Timestamp'] = anchor_date
models_df = pd.concat([models_df, anchor_df])
# Select and sort columns
dim = ['Calendar Date', 'Calendar Timestamp', 'Area', 'Station', 'Anchor Timestamp', 'Measure Name']
fct = ['Measure Value', 'Hourly Change', 'Lower Bound', 'Upper Bound']
cor = ['Is Missing', 'Is Corrected', 'Corrected Value']
models_df = models_df[dim + fct + cor].reset_index(drop=True)
# Preview results
models_df.tail()
# Get correction count
dim = ['Area', 'Station', 'Measure Name', 'Anchor Timestamp']
fits_df = models_df.groupby(dim, as_index=False).agg({
'Calendar Timestamp': 'count',
'Is Missing': 'sum',
'Is Corrected': 'sum'
}).rename(columns={
'Calendar Timestamp': 'Record Count',
'Is Missing': 'Missing Count',
'Is Corrected': 'Correction Count'
})
# Rank models
dim = ['Area', 'Station', 'Measure Name']
fits_df['Model Rank'] = (
fits_df
.sort_values(['Correction Count', 'Anchor Timestamp'])
.groupby(dim, as_index=False)['Correction Count']
.rank(method='first')
)
# Get missing/correction rates
fits_df['Missing Rate'] = fits_df['Missing Count'] / fits_df['Record Count']
fits_df['Correction Rate'] = fits_df['Correction Count'] / (fits_df['Record Count'] - fits_df['Missing Count'])
# Get model (if any) to apply
fits_df['Apply Model'] = (fits_df['Model Rank'] == 1) & (fits_df['Correction Rate'] < JUNK)
# Select and sort columns
dim = ['Area', 'Station', 'Measure Name', 'Anchor Timestamp']
fct = ['Missing Rate', 'Correction Rate', 'Apply Model']
fits_df = fits_df[dim + fct]
# Preview results
fits_df.tail()
# Get best fit corrections
model_dim = ['Calendar Timestamp', 'Area', 'Station', 'Measure Name', 'Anchor Timestamp', 'Corrected Value']
fit_dim = ['Area', 'Station', 'Measure Name', 'Anchor Timestamp']
best_fit_df = pd.merge(
models_df[model_dim],
fits_df[fits_df['Apply Model']==True][fit_dim],
how = 'inner'
).drop(columns='Anchor Timestamp')
# Preview results
best_fit_df.tail()
corrected_df = pd.merge(
dense_df,
best_fit_df,
how = 'left',
on = ['Calendar Timestamp', 'Area', 'Station', 'Measure Name']
)
# Apply correction
corrected_df['Measure Value'] = corrected_df['Corrected Value'].fillna(corrected_df['Measure Value'])
# Pivot measures
corrected_df = corrected_df.pivot(
columns = 'Measure Alias',
index = ['Calendar Date', 'Calendar Timestamp', 'Area', 'Station', 'Elevation'],
values = 'Measure Value'
).reset_index()
# Preview results
corrected_df.tail()
# Get precipitation by area
dim = ['Calendar Timestamp', 'Area']
precip_df = corrected_df.groupby(dim, as_index=False)['Precipitation'].sum()
precip_df['Precipitation Indicator'] = precip_df['Precipitation'].fillna(0) > 0
# Select and sort columns
precip_df = precip_df[['Calendar Timestamp', 'Area', 'Precipitation Indicator']]
# Preview results
precip_df.tail()
# Join precip with rest of data
nwac_df = pd.merge(
corrected_df,
precip_df,
how = 'inner',
on = ['Calendar Timestamp', 'Area']
)
# Select and sort columns
dim = ['Calendar Date', 'Calendar Timestamp', 'Area', 'Station', 'Elevation', 'Precipitation Indicator']
fct = [measure['Alias'] for measure in MEASURES]
nwac_df = nwac_df[dim + fct]
# Preview results
nwac_df.tail()
# Create a dataframe of weather metadata
metadata_df = pd.DataFrame({
'Date Range Start': [period_start],
'Date Range End': [period_end],
'Date Range Days': [DATE_RANGE],
'Job Run Date': [now_pt()]
})
# Preview result
metadata_df.tail()
# Get copy of measures
criteria_df = measures_df.copy()
# Filter to only measures with interpolation criteria
criteria = ['Interpolation Trigger', 'Interpolation Increment']
for criterion in criteria:
criteria_df = criteria_df[~criteria_df[criterion].isna()]
criteria_df['Model Count'] = INTERPOLATIONS
criteria_df['Discard Theshold'] = JUNK
# Preview results
criteria_df.tail()
Google Sheets
# Connect to Google Sheets
gsheets = gspread.service_account(filename = ROOT + CREDENTIALS)
# Assign data frames to sheets
sheets_data = {
'Weather': nwac_df,
'Metadata': metadata_df,
'Models': models_df,
'Fit': fits_df,
'Criteria': criteria_df
}
for sheet in sheets:
# Open parent workbook
workbook = gsheets.open_by_key(sheet['workbook_id'])
# Get corresponding data
sheet_data = sheets_data[sheet['title']]
# Convert datetimes to strings because Google Sheets does not like datetimes
datetime_columns = sheet_data.select_dtypes(include=np.datetime64)
for column in datetime_columns:
sheet_data[column] = sheet_data[column].astype(str)
# Fill nulls because Google Sheets does not like NaN values
for column in sheet_data.columns:
sheet_data[column] = sheet_data[column].fillna('')
# Create new worksheet
if sheet['title'] not in [gsheet.title for gsheet in workbook.worksheets()]:
gsheet = workbook.add_worksheet(title=sheet['title'], rows=0, cols=0)
message = 'Created a new sheet named "{t}"'.format(t=gsheet.title)
print(message)
# Clear existing worksheet
else:
gsheet = workbook.worksheet(sheet['title'])
gsheet.clear()
message = 'Cleared the "{t}" sheet'.format(t=gsheet.title)
print(message)
# Insert data into worksheet
headers = [sheet_data.columns.values.tolist()]
contents = sheet_data.values.tolist()
gsheet.update(headers + contents)
message = 'Inserted {r} records into the "{t}" sheet'.format(r=len(sheet_data.index), t=gsheet.title)
print(message)