import pandas as pd
import datetime as dt
pd.read_csv('data/global.csv', nrows=5)
dateobject
countryobject
0
1961-01-01
Afghanistan
1
1962-01-01
Afghanistan
2
1963-01-01
Afghanistan
3
1964-01-01
Afghanistan
4
1965-01-01
Afghanistan
import psycopg2
from psycopg2 import Error
from IPython import display
# PostgreSQL DB credentials
postgresql_config = {
'user': 'pnrmxfoe',
'password': '24LD9uhKoOazLk84xAmx46ptOfzMNB1R',
'host': 'tai.db.elephantsql.com',
'port': '5432',
'database': 'pnrmxfoe'
}
# Helper function to show raw file content
def get_file_content(file):
file = open(file, 'r')
content = file.read()
file.close()
return content
# Helper function to execute SQL from string
def execute_sql(query):
connection = None
try:
# connect to database
connection = psycopg2.connect(**postgresql_config)
cursor = connection.cursor()
try:
cursor.execute(query)
print('SQL-Befehl(e) wurden ausgeführt.')
except (Exception, Error) as error:
print('SQL-Befehl konnte nicht ausgeführt werden: ', error)
connection.commit()
except(Exception, Error) as error:
print(error)
finally:
if connection is not None:
# close connection
cursor.close()
connection.close()
print(get_file_content('sql/1a-create-countries-table.sql'))
-- Create countries table
DROP TABLE IF EXISTS countries CASCADE;
CREATE TABLE countries (
id SERIAL PRIMARY KEY NOT NULL,
country VARCHAR(255) NOT NULL,
UNIQUE (country)
);
execute_sql(get_file_content('sql/1a-create-countries-table.sql'))
SQL-Befehl(e) wurden ausgeführt.
print(get_file_content('sql/1b-create-sensors-table.sql'))
-- Create sensors table
DROP TABLE IF EXISTS sensors CASCADE;
CREATE TABLE sensors (
id SERIAL PRIMARY KEY NOT NULL,
type VARCHAR(50),
UNIQUE (type)
);
execute_sql(get_file_content('sql/1b-create-sensors-table.sql'))
SQL-Befehl(e) wurden ausgeführt.
print(get_file_content('sql/1c-create-sensors_countries-table.sql'))
-- Create sensors_countries table
DROP TABLE IF EXISTS sensors_countries CASCADE;
CREATE TABLE sensors_countries (
id SERIAL PRIMARY KEY NOT NULL,
country_id INTEGER REFERENCES countries(id) NOT NULL,
sensor_id INTEGER REFERENCES sensors(id) NOT NULL,
UNIQUE (country_id, sensor_id)
);
execute_sql(get_file_content('sql/1c-create-sensors_countries-table.sql'))
SQL-Befehl(e) wurden ausgeführt.
print(get_file_content('sql/1d-create-measurements-table.sql'))
-- Create measurements table
DROP TABLE IF EXISTS measurements CASCADE;
CREATE TABLE measurements (
id SERIAL PRIMARY KEY NOT NULL,
date DATE NOT NULL,
value NUMERIC NOT NULL,
sensors_countries_id INTEGER REFERENCES sensors_countries(id) NOT NULL
);
execute_sql(get_file_content('sql/1d-create-measurements-table.sql'))
SQL-Befehl(e) wurden ausgeführt.
# only install pip packages if not installed already
try: from sqlalchemy_schemadisplay import create_schema_graph
except ImportError:
from pip._internal import main as pip
pip(['install', 'sqlalchemy_schemadisplay'])
from sqlalchemy_schemadisplay import create_schema_graph
try: from sqlalchemy import MetaData
except ImportError:
from pip._internal import main as pip
pip(['install', 'sqlalchemy'])
from sqlalchemy import MetaData
graph = create_schema_graph(
metadata = MetaData(
'postgresql://{user}:{password}@{host}/{database}'.format(user = postgresql_config['user'],
password = postgresql_config['password'],
host = postgresql_config['host'],
database = postgresql_config['database'])
)
)
graph.set_size('"8,6!"')
graph.write_png('postgres_ERD.png')
display.Image('postgres_ERD.png')
pd.read_csv('data/global.csv', nrows=0)
dateobject
countryobject
print(get_file_content('sql/2a-create-temporary-table.sql'))
-- Create table to hold raw imported data
DROP TABLE IF EXISTS imported_data;
CREATE TABLE imported_data(
date DATE,
country VARCHAR,
co2 NUMERIC,
temperature NUMERIC
);
execute_sql(get_file_content('sql/2a-create-temporary-table.sql'))
SQL-Befehl(e) wurden ausgeführt.
print(get_file_content('sql/2b-import-csv-into-temporary-table.sql'))
-- Import data
COPY imported_data FROM STDIN WITH CSV HEADER DELIMITER as ',';
from contextlib import closing, contextmanager
connection = psycopg2.connect(**postgresql_config)
cursor = connection.cursor()
with open('data/global.csv', 'r') as f:
@contextmanager
def _paused_thread():
try:
thread = psycopg2.extensions.get_wait_callback()
psycopg2.extensions.set_wait_callback(None)
yield
finally:
psycopg2.extensions.set_wait_callback(thread)
with _paused_thread():
cursor.copy_expert(sql = get_file_content('sql/2b-import-csv-into-temporary-table.sql'), file = f)
connection.commit()
cursor.execute('SELECT * FROM imported_data ORDER BY random() LIMIT 5')
records = cursor.fetchall()
for record in records:
print(record)
cursor.close()
connection.close()
(datetime.date(1965, 1, 1), 'Ghana', Decimal('1.7038'), Decimal('-0.342'))
(datetime.date(1999, 1, 1), 'Bosnia and Herzegovina', Decimal('10.3339'), Decimal('0.882'))
(datetime.date(1986, 1, 1), 'Taiwan', Decimal('93.411'), Decimal('-0.765'))
(datetime.date(1976, 1, 1), 'Zimbabwe', Decimal('10.8549'), Decimal('-0.555'))
(datetime.date(2011, 1, 1), 'Burkina Faso', Decimal('2.1302'), Decimal('0.237'))
execute_sql('''
TRUNCATE TABLE sensors_countries CASCADE;
TRUNCATE TABLE countries CASCADE;
TRUNCATE TABLE sensors CASCADE;
TRUNCATE TABLE measurements CASCADE;
''')
SQL-Befehl(e) wurden ausgeführt.
print(get_file_content('sql/3a-migrate-countries-from-imported_data-into-countries-table.sql'))
-- insert countries into countries table
INSERT INTO countries (country)
SELECT
i.country AS country
FROM (SELECT DISTINCT country FROM imported_data) i
;
execute_sql(get_file_content('sql/3a-migrate-countries-from-imported_data-into-countries-table.sql'))
SQL-Befehl(e) wurden ausgeführt.
print(get_file_content('sql/3b-insert-sensor-types-into-sensors-table.sql'))
-- fill co2 and temperature values into sensors table
INSERT INTO
sensors (type)
VALUES
('co2'),
('temperature')
;
execute_sql(get_file_content('sql/3b-insert-sensor-types-into-sensors-table.sql'))
SQL-Befehl(e) wurden ausgeführt.
print(get_file_content('sql/3c-migrate-sensors-and-countries-from-imported_data-into-sensors_countries-table.sql'))
-- fill intermediate table sensors_countries
INSERT INTO
sensors_countries (country_id, sensor_id)
SELECT
countries.id AS country_id, sensors.id AS sensor_id
FROM
countries
CROSS JOIN
sensors
;
execute_sql(get_file_content('sql/3c-migrate-sensors-and-countries-from-imported_data-into-sensors_countries-table.sql'))
SQL-Befehl(e) wurden ausgeführt.
print(get_file_content('sql/3d-migrate-measurements-from-imported_data-into-measurements-table.sql'))
-- migrate measurements from imported_data table into measurements table
TRUNCATE measurements;
-- CO2 values
INSERT INTO
measurements (date, value, sensors_countries_id)
SELECT i.date, ii.co2, sc.id
FROM imported_data AS i
INNER JOIN countries AS c
ON c.country = i.country
INNER JOIN sensors_countries AS sc
ON sc.country_id = c.id
LEFT JOIN sensors s
ON s.id = sc.sensor_id
LEFT JOIN
(SELECT date, country, co2
FROM imported_data) as ii
ON ii.date = i.date AND ii.country = c.country
WHERE s.type = 'co2' AND ii.co2 IS NOT NULL;
-- temperature values
INSERT INTO
measurements (date, value, sensors_countries_id)
SELECT i.date, ii.temperature, sc.id
FROM imported_data AS i
INNER JOIN countries AS c
ON c.country = i.country
INNER JOIN sensors_countries AS sc
ON sc.country_id = c.id
LEFT JOIN sensors s
ON s.id = sc.sensor_id
LEFT JOIN
(SELECT date, country, temperature
FROM imported_data) as ii
ON ii.date = i.date AND ii.country = c.country
WHERE s.type = 'temperature' AND ii.temperature IS NOT NULL;
execute_sql(get_file_content('sql/3d-migrate-measurements-from-imported_data-into-measurements-table.sql'))
SQL-Befehl(e) wurden ausgeführt.
# delete temporary table 'imported_data'
execute_sql('DROP TABLE imported_data;')
SQL-Befehl(e) wurden ausgeführt.
try: from influxdb_client import InfluxDBClient
except ImportError:
from pip._internal import main as pip
pip(['install', 'influxdb_client'])
from influxdb_client import InfluxDBClient
# InfluxDB credentials
influxdb_config = {
'url': 'https://eu-central-1-1.aws.cloud2.influxdata.com',
'token': '1_rHKu5fjH2KLOONhU3FI46vusPZNXcZ1tD_m8mF_OZoI0E4L3Uyaf-KY0w57TiTMwptf0SGolErfzA3YLJaYQ==',
'org': 'fabian.jordi@me.com',
'bucket': 'bucket'
}
client = InfluxDBClient(url = influxdb_config['url'], token = influxdb_config['token'])
# Read CSV into Pandas dataframe
df = pd.read_csv('data/global.csv')
df['date'] = pd.to_datetime(df['date'], format="%Y-%m-%dT")
df.set_index(['date'], inplace=True)
df_co2 = df.drop(columns = ['temperature'])
df_temp = df.drop(columns = ['co2'])
# Write CSV data into InfluxDB
write_client = client.write_api()
# Write CO2 values
write_client.write(influxdb_config['bucket'], influxdb_config['org'], record = df_co2, data_frame_measurement_name = 'co2', data_frame_tag_columns = ['country'])
# Write temperature values
write_client.write(influxdb_config['bucket'], influxdb_config['org'], record = df_temp, data_frame_measurement_name = 'temperature', data_frame_tag_columns = ['country'])
# Helper function to execute SQL query string for PostgreSQL
def execute_sql_and_show_result(query):
connection = None
try:
# connect to database
connection = psycopg2.connect(**postgresql_config)
cursor = connection.cursor()
try:
cursor.execute(query)
rows = cursor.fetchall()
print('Anzahl Resultate:', cursor.rowcount)
for row in rows:
print(row)
except (Exception, Error) as error:
print('SQL-Befehl konnte nicht ausgeführt werden: ', error)
connection.commit()
except(Exception, Error) as error:
print(error)
finally:
if connection is not None:
# close connection
cursor.close()
connection.close()
# Helper function to execute InfluxQL string for InfluxDB
def execute_influxql_and_show_result(query, aggregated = False):
query_api = client.query_api()
result = query_api.query(org = influxdb_config['org'], query = query)
for table in result:
print('Anzahl Resultate:', result)
for record in table.records:
if aggregated:
print(record.get_value())
else:
print((record.get_time(), record.get_value(), record.values.get('country')))
"""
get_measurement(): Returns the measurement name of the record.
get_field(): Returns the field name.
get_value(): Returns the actual field value.
values: Returns a map of column values.
values.get("<your tag>"): Returns a value from the record for given column.
get_time(): Returns the time of the record.
get_start(): Returns the inclusive lower time bound of all records in the current table.
get_stop(): Returns the exclusive upper time bound of all records in the current table.
"""
# Query for PostgreSQL
execute_sql_and_show_result("""
SELECT
date, value, c.country
FROM
measurements AS m
JOIN sensors_countries AS sc
ON sc.id = m.sensors_countries_id
JOIN countries AS c
ON c.id = sc.country_id
JOIN sensors AS s
ON s.id = sc.sensor_id
WHERE
s.type = 'temperature'
AND
c.country LIKE 'Sw%'
AND
m.date BETWEEN '1999-01-01' AND '2001-01-01'
ORDER BY date, c.country ASC
;
""")
Anzahl Resultate: 9
(datetime.date(1999, 1, 1), Decimal('0.399'), 'Swaziland')
(datetime.date(1999, 1, 1), Decimal('0.502'), 'Sweden')
(datetime.date(1999, 1, 1), Decimal('2.273'), 'Switzerland')
(datetime.date(2000, 1, 1), Decimal('-1.356'), 'Swaziland')
(datetime.date(2000, 1, 1), Decimal('3.748'), 'Sweden')
(datetime.date(2000, 1, 1), Decimal('0.22'), 'Switzerland')
(datetime.date(2001, 1, 1), Decimal('-0.075'), 'Swaziland')
(datetime.date(2001, 1, 1), Decimal('4.042'), 'Sweden')
(datetime.date(2001, 1, 1), Decimal('1.781'), 'Switzerland')
# Query for InfluxDB
query = '''
from(bucket: "bucket")
|> range(start: 1999-01-01, stop: 2001-12-31)
|> filter(fn:(r) => r._measurement == "temperature")
|> filter(fn:(r) => r.country =~ /^Sw/)
|> group()
|> sort(columns: ["_time", "country"])
'''
execute_influxql_and_show_result(query)
Anzahl Resultate: [<FluxTable: 9 columns, 9 records>]
(datetime.datetime(1999, 1, 1, 0, 0, tzinfo=tzlocal()), 0.399, 'Swaziland')
(datetime.datetime(1999, 1, 1, 0, 0, tzinfo=tzlocal()), 0.502, 'Sweden')
(datetime.datetime(1999, 1, 1, 0, 0, tzinfo=tzlocal()), 2.273, 'Switzerland')
(datetime.datetime(2000, 1, 1, 0, 0, tzinfo=tzlocal()), -1.356, 'Swaziland')
(datetime.datetime(2000, 1, 1, 0, 0, tzinfo=tzlocal()), 3.748, 'Sweden')
(datetime.datetime(2000, 1, 1, 0, 0, tzinfo=tzlocal()), 0.22, 'Switzerland')
(datetime.datetime(2001, 1, 1, 0, 0, tzinfo=tzlocal()), -0.075, 'Swaziland')
(datetime.datetime(2001, 1, 1, 0, 0, tzinfo=tzlocal()), 4.042, 'Sweden')
(datetime.datetime(2001, 1, 1, 0, 0, tzinfo=tzlocal()), 1.781, 'Switzerland')
# Query for PostgreSQL
execute_sql_and_show_result('''
SELECT first.co2 / second.co2 as factor
FROM
(
SELECT SUM(value) as co2
FROM
measurements AS m
JOIN sensors_countries AS sc
ON sc.id = m.sensors_countries_id
JOIN countries AS c
ON c.id = sc.country_id
JOIN sensors AS s
ON s.id = sc.sensor_id
WHERE
s.type = 'co2'
AND
c.country = 'Germany'
AND
date > '1999-12-31'
) first
JOIN
(
SELECT SUM(value) as co2
FROM
measurements AS m
JOIN sensors_countries AS sc
ON sc.id = m.sensors_countries_id
JOIN countries AS c
ON c.id = sc.country_id
JOIN sensors AS s
ON s.id = sc.sensor_id
WHERE
s.type = 'co2'
AND
c.country = 'Switzerland'
AND
date > '1999-12-31'
) second
ON 1 = 1
''')
Anzahl Resultate: 1
(Decimal('19.5277278806202866'),)
# Query for InfluxDB
query = '''
t1 = from(bucket: "bucket")
|> range(start: 1999-12-31)
|> filter(fn:(r) => r._measurement == "co2")
|> filter(fn:(r) => r.country == "Germany")
|> group()
|> sum()
|> yield(name: "1")
'''
execute_influxql_and_show_result(query, aggregated = True)
Anzahl Resultate: [<FluxTable: 5 columns, 1 records>]
12033.2183
query = '''
t2 = from(bucket: "bucket")
|> range(start: 1999-12-31)
|> filter(fn:(r) => r._measurement == "co2")
|> filter(fn:(r) => r.country == "Switzerland")
|> group()
|> sum()
|> yield(name: "2")
'''
execute_influxql_and_show_result(query, aggregated = True)
Anzahl Resultate: [<FluxTable: 5 columns, 1 records>]
616.2119000000001
# Berechnung Faktor
12033.2183 / 616.2119