import pandas as pd
import datetime as dt
pd.read_csv('data/global.csv', nrows=5)
import psycopg2
from psycopg2 import Error
from IPython import display
# PostgreSQL DB credentials
postgresql_config = {
  'user': 'pnrmxfoe',
  'password': '24LD9uhKoOazLk84xAmx46ptOfzMNB1R',
  'host': 'tai.db.elephantsql.com',
  'port': '5432',
  'database': 'pnrmxfoe'
}
# Helper function to show raw file content
def get_file_content(file):
    file = open(file, 'r')
    content = file.read()
    file.close()
    return content
# Helper function to execute SQL from string
def execute_sql(query):
    connection = None
    try:
        # connect to database
        connection = psycopg2.connect(**postgresql_config)
        cursor = connection.cursor()
        try:
            cursor.execute(query)
            print('SQL-Befehl(e) wurden ausgeführt.')
        except (Exception, Error) as error:
            print('SQL-Befehl konnte nicht ausgeführt werden: ', error)
        connection.commit()
    except(Exception, Error) as error:
        print(error)
    finally:
        if connection is not None:
            # close connection
            cursor.close()
            connection.close()
print(get_file_content('sql/1a-create-countries-table.sql'))
execute_sql(get_file_content('sql/1a-create-countries-table.sql'))
print(get_file_content('sql/1b-create-sensors-table.sql'))
execute_sql(get_file_content('sql/1b-create-sensors-table.sql'))
print(get_file_content('sql/1c-create-sensors_countries-table.sql'))
execute_sql(get_file_content('sql/1c-create-sensors_countries-table.sql'))
print(get_file_content('sql/1d-create-measurements-table.sql'))
execute_sql(get_file_content('sql/1d-create-measurements-table.sql'))
# only install pip packages if not installed already
try: from sqlalchemy_schemadisplay import create_schema_graph
except ImportError:
    from pip._internal import main as pip
    pip(['install', 'sqlalchemy_schemadisplay'])
    from sqlalchemy_schemadisplay import create_schema_graph
try: from sqlalchemy import MetaData
except ImportError:
    from pip._internal import main as pip
    pip(['install', 'sqlalchemy'])
    from sqlalchemy import MetaData
graph = create_schema_graph(
    metadata = MetaData(
        'postgresql://{user}:{password}@{host}/{database}'.format(user = postgresql_config['user'],
        password = postgresql_config['password'],
        host = postgresql_config['host'],
        database = postgresql_config['database'])
    )
)
graph.set_size('"8,6!"')
graph.write_png('postgres_ERD.png')
display.Image('postgres_ERD.png')
pd.read_csv('data/global.csv', nrows=0)
print(get_file_content('sql/2a-create-temporary-table.sql'))
execute_sql(get_file_content('sql/2a-create-temporary-table.sql'))
print(get_file_content('sql/2b-import-csv-into-temporary-table.sql'))
from contextlib import closing, contextmanager
connection = psycopg2.connect(**postgresql_config)
cursor = connection.cursor()
with open('data/global.csv', 'r') as f:
    def _paused_thread():
        try:
            thread = psycopg2.extensions.get_wait_callback()
            psycopg2.extensions.set_wait_callback(None)
            yield
        finally:
            psycopg2.extensions.set_wait_callback(thread)
    with _paused_thread():
        cursor.copy_expert(sql = get_file_content('sql/2b-import-csv-into-temporary-table.sql'), file = f)
connection.commit()
cursor.execute('SELECT * FROM imported_data ORDER BY random() LIMIT 5')
records = cursor.fetchall()
for record in records:
    print(record)
cursor.close()
connection.close()
execute_sql('''
TRUNCATE TABLE sensors_countries CASCADE;
TRUNCATE TABLE countries CASCADE;
TRUNCATE TABLE sensors CASCADE;
TRUNCATE TABLE measurements CASCADE;
''')
print(get_file_content('sql/3a-migrate-countries-from-imported_data-into-countries-table.sql'))
execute_sql(get_file_content('sql/3a-migrate-countries-from-imported_data-into-countries-table.sql'))
print(get_file_content('sql/3b-insert-sensor-types-into-sensors-table.sql'))
execute_sql(get_file_content('sql/3b-insert-sensor-types-into-sensors-table.sql'))
print(get_file_content('sql/3c-migrate-sensors-and-countries-from-imported_data-into-sensors_countries-table.sql'))
execute_sql(get_file_content('sql/3c-migrate-sensors-and-countries-from-imported_data-into-sensors_countries-table.sql'))
print(get_file_content('sql/3d-migrate-measurements-from-imported_data-into-measurements-table.sql'))
execute_sql(get_file_content('sql/3d-migrate-measurements-from-imported_data-into-measurements-table.sql'))
# delete temporary table 'imported_data'
execute_sql('DROP TABLE imported_data;')
try: from influxdb_client import InfluxDBClient
except ImportError:
    from pip._internal import main as pip
    pip(['install', 'influxdb_client'])
    from influxdb_client import InfluxDBClient
# InfluxDB credentials
influxdb_config = {
  'url': 'https://eu-central-1-1.aws.cloud2.influxdata.com',
  'token': '1_rHKu5fjH2KLOONhU3FI46vusPZNXcZ1tD_m8mF_OZoI0E4L3Uyaf-KY0w57TiTMwptf0SGolErfzA3YLJaYQ==',
  'org': 'fabian.jordi@me.com',
  'bucket': 'bucket'
}
client = InfluxDBClient(url = influxdb_config['url'], token = influxdb_config['token'])
# Read CSV into Pandas dataframe
df = pd.read_csv('data/global.csv') 
df['date'] = pd.to_datetime(df['date'], format="%Y-%m-%dT")
df.set_index(['date'], inplace=True)
df_co2 = df.drop(columns = ['temperature'])
df_temp = df.drop(columns = ['co2'])
# Write CSV data into InfluxDB
write_client = client.write_api()
# Write CO2 values
write_client.write(influxdb_config['bucket'], influxdb_config['org'], record = df_co2, data_frame_measurement_name = 'co2', data_frame_tag_columns = ['country'])
# Write temperature values
write_client.write(influxdb_config['bucket'], influxdb_config['org'], record = df_temp, data_frame_measurement_name = 'temperature', data_frame_tag_columns = ['country'])
# Helper function to execute SQL query string for PostgreSQL
def execute_sql_and_show_result(query):
    connection = None
    try:
        # connect to database
        connection = psycopg2.connect(**postgresql_config)
        cursor = connection.cursor()
        try:
            cursor.execute(query)
            rows = cursor.fetchall()
            print('Anzahl Resultate:', cursor.rowcount)
            for row in rows:
                print(row)
        except (Exception, Error) as error:
            print('SQL-Befehl konnte nicht ausgeführt werden: ', error)
        connection.commit()
    except(Exception, Error) as error:
        print(error)
    finally:
        if connection is not None:
            # close connection
            cursor.close()
            connection.close()
# Helper function to execute InfluxQL string for InfluxDB
def execute_influxql_and_show_result(query, aggregated = False):
    query_api = client.query_api()
    result = query_api.query(org = influxdb_config['org'], query = query)
    for table in result:
        print('Anzahl Resultate:', result)
        for record in table.records:
            if aggregated:
                print(record.get_value())
            else:
                print((record.get_time(), record.get_value(), record.values.get('country')))
            """
            get_measurement(): Returns the measurement name of the record.
            get_field(): Returns the field name.
            get_value(): Returns the actual field value.
            values: Returns a map of column values.
            values.get("<your tag>"): Returns a value from the record for given column.
            get_time(): Returns the time of the record.
            get_start(): Returns the inclusive lower time bound of all records in the current table.
            get_stop(): Returns the exclusive upper time bound of all records in the current table.
            """
# Query for PostgreSQL
execute_sql_and_show_result("""
SELECT
    date, value, c.country
FROM 
    measurements AS m
JOIN sensors_countries AS sc
    ON sc.id = m.sensors_countries_id
JOIN countries AS c
    ON c.id = sc.country_id
JOIN sensors AS s
    ON s.id = sc.sensor_id
WHERE
    s.type = 'temperature'
    AND
        c.country LIKE 'Sw%'
    AND 
        m.date BETWEEN '1999-01-01' AND '2001-01-01'
ORDER BY date, c.country ASC
;
""")
# Query for InfluxDB
query = '''
from(bucket: "bucket")
|> range(start: 1999-01-01, stop: 2001-12-31)
|> filter(fn:(r) => r._measurement == "temperature")
|> filter(fn:(r) => r.country =~ /^Sw/)
|> group()
|> sort(columns: ["_time", "country"])
'''
execute_influxql_and_show_result(query)
# Query for PostgreSQL
execute_sql_and_show_result('''
SELECT  first.co2 / second.co2 as factor
FROM
    (
        SELECT SUM(value) as co2
        FROM 
            measurements AS m
        JOIN sensors_countries AS sc
            ON sc.id = m.sensors_countries_id
        JOIN countries AS c
            ON c.id = sc.country_id
        JOIN sensors AS s
            ON s.id = sc.sensor_id
        WHERE
            s.type = 'co2'
            AND 
            c.country = 'Germany'
            AND 
            date > '1999-12-31'
    ) first
            
        JOIN
            
    (
        SELECT SUM(value) as co2
        FROM 
            measurements AS m
        JOIN sensors_countries AS sc
            ON sc.id = m.sensors_countries_id
        JOIN countries AS c
            ON c.id = sc.country_id
        JOIN sensors AS s
            ON s.id = sc.sensor_id
        WHERE
            s.type = 'co2'
            AND 
            c.country = 'Switzerland'
            AND 
            date > '1999-12-31'
    ) second
        ON 1 = 1
''')
# Query for InfluxDB
query = '''
t1 = from(bucket: "bucket")
|> range(start: 1999-12-31)
|> filter(fn:(r) => r._measurement == "co2")
|> filter(fn:(r) => r.country == "Germany")
|> group()
|> sum()
|> yield(name: "1")
'''
execute_influxql_and_show_result(query, aggregated = True)
query = '''
t2 = from(bucket: "bucket")
|> range(start: 1999-12-31)
|> filter(fn:(r) => r._measurement == "co2")
|> filter(fn:(r) => r.country == "Switzerland")
|> group()
|> sum()
|> yield(name: "2")
'''
execute_influxql_and_show_result(query, aggregated = True)
# Berechnung Faktor
12033.2183 / 616.2119