import pandas as pd
import datetime as dt
pd.read_csv('data/global.csv', nrows=5)
import psycopg2
from psycopg2 import Error
from IPython import display
# PostgreSQL DB credentials
postgresql_config = {
'user': 'pnrmxfoe',
'password': '24LD9uhKoOazLk84xAmx46ptOfzMNB1R',
'host': 'tai.db.elephantsql.com',
'port': '5432',
'database': 'pnrmxfoe'
}
# Helper function to show raw file content
def get_file_content(file):
file = open(file, 'r')
content = file.read()
file.close()
return content
# Helper function to execute SQL from string
def execute_sql(query):
connection = None
try:
# connect to database
connection = psycopg2.connect(**postgresql_config)
cursor = connection.cursor()
try:
cursor.execute(query)
print('SQL-Befehl(e) wurden ausgeführt.')
except (Exception, Error) as error:
print('SQL-Befehl konnte nicht ausgeführt werden: ', error)
connection.commit()
except(Exception, Error) as error:
print(error)
finally:
if connection is not None:
# close connection
cursor.close()
connection.close()
print(get_file_content('sql/1a-create-countries-table.sql'))
execute_sql(get_file_content('sql/1a-create-countries-table.sql'))
print(get_file_content('sql/1b-create-sensors-table.sql'))
execute_sql(get_file_content('sql/1b-create-sensors-table.sql'))
print(get_file_content('sql/1c-create-sensors_countries-table.sql'))
execute_sql(get_file_content('sql/1c-create-sensors_countries-table.sql'))
print(get_file_content('sql/1d-create-measurements-table.sql'))
execute_sql(get_file_content('sql/1d-create-measurements-table.sql'))
# only install pip packages if not installed already
try: from sqlalchemy_schemadisplay import create_schema_graph
except ImportError:
from pip._internal import main as pip
pip(['install', 'sqlalchemy_schemadisplay'])
from sqlalchemy_schemadisplay import create_schema_graph
try: from sqlalchemy import MetaData
except ImportError:
from pip._internal import main as pip
pip(['install', 'sqlalchemy'])
from sqlalchemy import MetaData
graph = create_schema_graph(
metadata = MetaData(
'postgresql://{user}:{password}@{host}/{database}'.format(user = postgresql_config['user'],
password = postgresql_config['password'],
host = postgresql_config['host'],
database = postgresql_config['database'])
)
)
graph.set_size('"8,6!"')
graph.write_png('postgres_ERD.png')
display.Image('postgres_ERD.png')
pd.read_csv('data/global.csv', nrows=0)
print(get_file_content('sql/2a-create-temporary-table.sql'))
execute_sql(get_file_content('sql/2a-create-temporary-table.sql'))
print(get_file_content('sql/2b-import-csv-into-temporary-table.sql'))
from contextlib import closing, contextmanager
connection = psycopg2.connect(**postgresql_config)
cursor = connection.cursor()
with open('data/global.csv', 'r') as f:
@contextmanager
def _paused_thread():
try:
thread = psycopg2.extensions.get_wait_callback()
psycopg2.extensions.set_wait_callback(None)
yield
finally:
psycopg2.extensions.set_wait_callback(thread)
with _paused_thread():
cursor.copy_expert(sql = get_file_content('sql/2b-import-csv-into-temporary-table.sql'), file = f)
connection.commit()
cursor.execute('SELECT * FROM imported_data ORDER BY random() LIMIT 5')
records = cursor.fetchall()
for record in records:
print(record)
cursor.close()
connection.close()
execute_sql('''
TRUNCATE TABLE sensors_countries CASCADE;
TRUNCATE TABLE countries CASCADE;
TRUNCATE TABLE sensors CASCADE;
TRUNCATE TABLE measurements CASCADE;
''')
print(get_file_content('sql/3a-migrate-countries-from-imported_data-into-countries-table.sql'))
execute_sql(get_file_content('sql/3a-migrate-countries-from-imported_data-into-countries-table.sql'))
print(get_file_content('sql/3b-insert-sensor-types-into-sensors-table.sql'))
execute_sql(get_file_content('sql/3b-insert-sensor-types-into-sensors-table.sql'))
print(get_file_content('sql/3c-migrate-sensors-and-countries-from-imported_data-into-sensors_countries-table.sql'))
execute_sql(get_file_content('sql/3c-migrate-sensors-and-countries-from-imported_data-into-sensors_countries-table.sql'))
print(get_file_content('sql/3d-migrate-measurements-from-imported_data-into-measurements-table.sql'))
execute_sql(get_file_content('sql/3d-migrate-measurements-from-imported_data-into-measurements-table.sql'))
# delete temporary table 'imported_data'
execute_sql('DROP TABLE imported_data;')
try: from influxdb_client import InfluxDBClient
except ImportError:
from pip._internal import main as pip
pip(['install', 'influxdb_client'])
from influxdb_client import InfluxDBClient
# InfluxDB credentials
influxdb_config = {
'url': 'https://eu-central-1-1.aws.cloud2.influxdata.com',
'token': '1_rHKu5fjH2KLOONhU3FI46vusPZNXcZ1tD_m8mF_OZoI0E4L3Uyaf-KY0w57TiTMwptf0SGolErfzA3YLJaYQ==',
'org': 'fabian.jordi@me.com',
'bucket': 'bucket'
}
client = InfluxDBClient(url = influxdb_config['url'], token = influxdb_config['token'])
# Read CSV into Pandas dataframe
df = pd.read_csv('data/global.csv')
df['date'] = pd.to_datetime(df['date'], format="%Y-%m-%dT")
df.set_index(['date'], inplace=True)
df_co2 = df.drop(columns = ['temperature'])
df_temp = df.drop(columns = ['co2'])
# Write CSV data into InfluxDB
write_client = client.write_api()
# Write CO2 values
write_client.write(influxdb_config['bucket'], influxdb_config['org'], record = df_co2, data_frame_measurement_name = 'co2', data_frame_tag_columns = ['country'])
# Write temperature values
write_client.write(influxdb_config['bucket'], influxdb_config['org'], record = df_temp, data_frame_measurement_name = 'temperature', data_frame_tag_columns = ['country'])
# Helper function to execute SQL query string for PostgreSQL
def execute_sql_and_show_result(query):
connection = None
try:
# connect to database
connection = psycopg2.connect(**postgresql_config)
cursor = connection.cursor()
try:
cursor.execute(query)
rows = cursor.fetchall()
print('Anzahl Resultate:', cursor.rowcount)
for row in rows:
print(row)
except (Exception, Error) as error:
print('SQL-Befehl konnte nicht ausgeführt werden: ', error)
connection.commit()
except(Exception, Error) as error:
print(error)
finally:
if connection is not None:
# close connection
cursor.close()
connection.close()
# Helper function to execute InfluxQL string for InfluxDB
def execute_influxql_and_show_result(query, aggregated = False):
query_api = client.query_api()
result = query_api.query(org = influxdb_config['org'], query = query)
for table in result:
print('Anzahl Resultate:', result)
for record in table.records:
if aggregated:
print(record.get_value())
else:
print((record.get_time(), record.get_value(), record.values.get('country')))
"""
get_measurement(): Returns the measurement name of the record.
get_field(): Returns the field name.
get_value(): Returns the actual field value.
values: Returns a map of column values.
values.get("<your tag>"): Returns a value from the record for given column.
get_time(): Returns the time of the record.
get_start(): Returns the inclusive lower time bound of all records in the current table.
get_stop(): Returns the exclusive upper time bound of all records in the current table.
"""
# Query for PostgreSQL
execute_sql_and_show_result("""
SELECT
date, value, c.country
FROM
measurements AS m
JOIN sensors_countries AS sc
ON sc.id = m.sensors_countries_id
JOIN countries AS c
ON c.id = sc.country_id
JOIN sensors AS s
ON s.id = sc.sensor_id
WHERE
s.type = 'temperature'
AND
c.country LIKE 'Sw%'
AND
m.date BETWEEN '1999-01-01' AND '2001-01-01'
ORDER BY date, c.country ASC
;
""")
# Query for InfluxDB
query = '''
from(bucket: "bucket")
|> range(start: 1999-01-01, stop: 2001-12-31)
|> filter(fn:(r) => r._measurement == "temperature")
|> filter(fn:(r) => r.country =~ /^Sw/)
|> group()
|> sort(columns: ["_time", "country"])
'''
execute_influxql_and_show_result(query)
# Query for PostgreSQL
execute_sql_and_show_result('''
SELECT first.co2 / second.co2 as factor
FROM
(
SELECT SUM(value) as co2
FROM
measurements AS m
JOIN sensors_countries AS sc
ON sc.id = m.sensors_countries_id
JOIN countries AS c
ON c.id = sc.country_id
JOIN sensors AS s
ON s.id = sc.sensor_id
WHERE
s.type = 'co2'
AND
c.country = 'Germany'
AND
date > '1999-12-31'
) first
JOIN
(
SELECT SUM(value) as co2
FROM
measurements AS m
JOIN sensors_countries AS sc
ON sc.id = m.sensors_countries_id
JOIN countries AS c
ON c.id = sc.country_id
JOIN sensors AS s
ON s.id = sc.sensor_id
WHERE
s.type = 'co2'
AND
c.country = 'Switzerland'
AND
date > '1999-12-31'
) second
ON 1 = 1
''')
# Query for InfluxDB
query = '''
t1 = from(bucket: "bucket")
|> range(start: 1999-12-31)
|> filter(fn:(r) => r._measurement == "co2")
|> filter(fn:(r) => r.country == "Germany")
|> group()
|> sum()
|> yield(name: "1")
'''
execute_influxql_and_show_result(query, aggregated = True)
query = '''
t2 = from(bucket: "bucket")
|> range(start: 1999-12-31)
|> filter(fn:(r) => r._measurement == "co2")
|> filter(fn:(r) => r.country == "Switzerland")
|> group()
|> sum()
|> yield(name: "2")
'''
execute_influxql_and_show_result(query, aggregated = True)
# Berechnung Faktor
12033.2183 / 616.2119