import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
import pandas
def cmd(x):
x=os.system(x)
return x
def view(x,y):
x.createOrReplaceTempView(f"{y}")
def http_metod(str):
x=str.split("/")[0]
o=x.strip()
c=o.replace('"','')
return c
def remove_character(str):
x=str.replace('"','')
o=x.strip()
return o
spark.udf.register("http_metod", http_metod,StringType())
spark.udf.register("remove_character", remove_character,StringType())
cmd("wget -c ftp://ita.ee.lbl.gov/traces/NASA_access_log_ago95.gz")
spark = SparkSession.builder.appName("load-logs-nasa").getOrCreate()
logs = spark.read.text(os.getcwd()+"/*.gz")
view(logs,"table_logs")
spark.sql("SELECT * FROM table_logs").show()
transfor_01 = spark.sql("""
WITH address AS
(SELECT REPLACE(CAST(SPLIT(value, ' - - ') AS STRING), ',', '"') AS value1
FROM table_logs),
date AS
(SELECT REPLACE(REPLACE(REPLACE(value1, '[', '"'), ']', '"'), ' -', '-') AS value2
FROM address)
SELECT *
FROM date
""")
view(transfor_01,"table_transfor_01")
transfor_01.show()
transfor_02 = spark.sql("""SELECT * FROM table_transfor_01""")
view(transfor_02,"table_transfor_02")
transfor_03 = spark.sql("""
WITH getReplace AS
(SELECT REPLACE(REPLACE(value2, 'GET ', 'GET_'), ' HTTP', '_HTTP') AS value1
FROM table_transfor_02),
splitText AS
(SELECT SPLIT(value1, ' ')[0] AS address,
SPLIT(value1, ' ')[1] AS TIMESTAMP,
SPLIT(value1, ' ')[2] AS url,
SPLIT(value1, ' ')[3] AS position3,
SPLIT(value1, ' ')[4] AS position4
FROM getReplace),
removeCharacter AS
(SELECT *,
REPLACE(position4, '"', '') AS POSITION,
REPLACE(position3, '-', '') AS position33,
REPLACE(REPLACE(url, 'GET_', 'GET '), '_HTTP', ' HTTP') AS url2
FROM splitText),
selectColumns AS
(SELECT REPLACE(address, '"', '') AS address,
REPLACE(TIMESTAMP, '"', '') AS TIMESTAMP,
url2 AS URL,
CAST(REPLACE(position33, '"', '') AS Integer) AS code_error,
CAST(POSITION AS Integer) AS bytes_returned
FROM removeCharacter
ORDER BY address),
timestampTransformer01 AS
(SELECT REPLACE(TIMESTAMP, '/Jul/', '-07-') AS timer,
*
FROM selectColumns),
timestampTransformer02 AS
(SELECT address,
concat_ws(' ', SUBSTRING(timer, 1, 10), SUBSTRING(timer, 12, 8)) AS timestamp2,
URL AS request,
code_error,
bytes_returned
FROM timestampTransformer01),
timestampTransformer03 AS
(SELECT address,
to_timestamp(concat_ws(' ', concat_ws('-', SUBSTRING(timestamp2, 7, 4), SUBSTRING(timestamp2, 4, 2), SUBSTRING(timestamp2, 1, 2)), SUBSTRING(timestamp2, 12, 8))) AS TIMESTAMP,
request,
code_error,
bytes_returned
FROM timestampTransformer02)
SELECT *
FROM timestampTransformer03
""")
transfor_03.show()
view(transfor_03,"table_transfor_03")
transfor_04 = spark.sql("""
WITH methods AS
(SELECT *,
http_metod(request) AS http_method,
remove_character(request) AS requests
FROM table_transfor_03),
fields AS
(SELECT *
FROM methods
WHERE http_method IN ("GET",
"POST")
AND bytes_returned=200 )
SELECT cast(address AS string) AS address,
to_timestamp(TIMESTAMP) AS TIMESTAMP,
cast(requests AS string) AS request,
cast(coalesce(code_error, "0") AS int) AS code_error,
cast(bytes_returned AS int) AS bytes_returned,
cast(http_method AS string) AS http_methods
FROM fields
""")
transfor_04.show()