import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
import pandas
def cmd(x):
x=os.system(x)
return x
def view(x,y):
x.createOrReplaceTempView(f"{y}")
def http_metod(str):
x=str.split("/")[0]
o=x.strip()
c=o.replace('"','')
return c
def remove_character(str):
x=str.replace('"','')
o=x.strip()
return o
spark.udf.register("http_metod", http_metod,StringType())
spark.udf.register("remove_character", remove_character,StringType())
22/07/13 21:35:23 WARN SimpleFunctionRegistry: The function http_metod replaced a previously registered function.
22/07/13 21:35:23 WARN SimpleFunctionRegistry: The function remove_character replaced a previously registered function.
cmd("wget -c ftp://ita.ee.lbl.gov/traces/NASA_access_log_ago95.gz")
spark = SparkSession.builder.appName("load-logs-nasa").getOrCreate()
logs = spark.read.text(os.getcwd()+"/*.gz")
view(logs,"table_logs")
spark.sql("SELECT * FROM table_logs").show()
+--------------------+
| value|
+--------------------+
|199.72.81.55 - - ...|
|unicomp6.unicomp....|
|199.120.110.21 - ...|
|burger.letters.co...|
|199.120.110.21 - ...|
|burger.letters.co...|
|burger.letters.co...|
|205.212.115.106 -...|
|d104.aa.net - - [...|
|129.94.144.152 - ...|
|unicomp6.unicomp....|
|unicomp6.unicomp....|
|unicomp6.unicomp....|
|d104.aa.net - - [...|
|d104.aa.net - - [...|
|d104.aa.net - - [...|
|129.94.144.152 - ...|
|199.120.110.21 - ...|
|ppptky391.asahi-n...|
|net-1-141.eden.co...|
+--------------------+
only showing top 20 rows
transfor_01 = spark.sql("""
WITH address AS
(SELECT REPLACE(CAST(SPLIT(value, ' - - ') AS STRING), ',', '"') AS value1
FROM table_logs),
date AS
(SELECT REPLACE(REPLACE(REPLACE(value1, '[', '"'), ']', '"'), ' -', '-') AS value2
FROM address)
SELECT *
FROM date
""")
view(transfor_01,"table_transfor_01")
transfor_01.show()
+--------------------+
| value2|
+--------------------+
|"199.72.81.55" "0...|
|"unicomp6.unicomp...|
|"199.120.110.21" ...|
|"burger.letters.c...|
|"199.120.110.21" ...|
|"burger.letters.c...|
|"burger.letters.c...|
|"205.212.115.106"...|
|"d104.aa.net" "01...|
|"129.94.144.152" ...|
|"unicomp6.unicomp...|
|"unicomp6.unicomp...|
|"unicomp6.unicomp...|
|"d104.aa.net" "01...|
|"d104.aa.net" "01...|
|"d104.aa.net" "01...|
|"129.94.144.152" ...|
|"199.120.110.21" ...|
|"ppptky391.asahi-...|
|"net-1-141.eden.c...|
+--------------------+
only showing top 20 rows
transfor_02 = spark.sql("""SELECT * FROM table_transfor_01""")
view(transfor_02,"table_transfor_02")
transfor_03 = spark.sql("""
WITH getReplace AS
(SELECT REPLACE(REPLACE(value2, 'GET ', 'GET_'), ' HTTP', '_HTTP') AS value1
FROM table_transfor_02),
splitText AS
(SELECT SPLIT(value1, ' ')[0] AS address,
SPLIT(value1, ' ')[1] AS TIMESTAMP,
SPLIT(value1, ' ')[2] AS url,
SPLIT(value1, ' ')[3] AS position3,
SPLIT(value1, ' ')[4] AS position4
FROM getReplace),
removeCharacter AS
(SELECT *,
REPLACE(position4, '"', '') AS POSITION,
REPLACE(position3, '-', '') AS position33,
REPLACE(REPLACE(url, 'GET_', 'GET '), '_HTTP', ' HTTP') AS url2
FROM splitText),
selectColumns AS
(SELECT REPLACE(address, '"', '') AS address,
REPLACE(TIMESTAMP, '"', '') AS TIMESTAMP,
url2 AS URL,
CAST(REPLACE(position33, '"', '') AS Integer) AS code_error,
CAST(POSITION AS Integer) AS bytes_returned
FROM removeCharacter
ORDER BY address),
timestampTransformer01 AS
(SELECT REPLACE(TIMESTAMP, '/Jul/', '-07-') AS timer,
*
FROM selectColumns),
timestampTransformer02 AS
(SELECT address,
concat_ws(' ', SUBSTRING(timer, 1, 10), SUBSTRING(timer, 12, 8)) AS timestamp2,
URL AS request,
code_error,
bytes_returned
FROM timestampTransformer01),
timestampTransformer03 AS
(SELECT address,
to_timestamp(concat_ws(' ', concat_ws('-', SUBSTRING(timestamp2, 7, 4), SUBSTRING(timestamp2, 4, 2), SUBSTRING(timestamp2, 1, 2)), SUBSTRING(timestamp2, 12, 8))) AS TIMESTAMP,
request,
code_error,
bytes_returned
FROM timestampTransformer02)
SELECT *
FROM timestampTransformer03
""")
transfor_03.show()
[Stage 69:> (0 + 1) / 1]
+--------------+-------------------+--------------------+----------+--------------+
| address| timestamp| request|code_error|bytes_returned|
+--------------+-------------------+--------------------+----------+--------------+
| ***.novo.dk|1995-07-11 08:17:09|"GET /ksc.html HT...| 200| 7067|
| ***.novo.dk|1995-07-11 08:17:11|"GET /images/kscl...| 200| 5866|
| ***.novo.dk|1995-07-11 08:17:31|"GET /images/MOSA...| 200| 363|
| ***.novo.dk|1995-07-11 08:17:33|"GET /images/USA-...| 200| 234|
| ***.novo.dk|1995-07-11 08:17:34|"GET /images/NASA...| 200| 786|
| ***.novo.dk|1995-07-11 08:17:38|"GET /images/WORL...| 200| 669|
| ***.novo.dk|1995-07-11 08:17:48|"GET /shuttle/mis...| 200| 8678|
| ***.novo.dk|1995-07-11 08:17:51|"GET /images/laun...| 200| 11853|
| ***.novo.dk|1995-07-11 08:19:13|"GET /images/KSC-...| 200| 1204|
| ***.novo.dk|1995-07-11 08:19:17|"GET /images/NASA...| 200| 786|
| ***.novo.dk|1995-07-11 08:21:05|"GET /shuttle/mis...| 200| 12118|
| ***.novo.dk|1995-07-11 08:21:19|"GET /shuttle/mis...| 200| 12118|
| ***.novo.dk|1995-07-11 08:21:27|"GET /shuttle/mis...| 200| 13393|
| ***.novo.dk|1995-07-11 08:22:03|"GET /images/laun...| 200| 1713|
| ***.novo.dk|1995-07-11 08:22:08|"GET /history/apo...| 200| 1173|
| ***.novo.dk|1995-07-11 08:23:01|"GET /shuttle/res...| 200| 6922|
|007.thegap.com|1995-07-06 17:22:35|"GET /shuttle/cou...| 200| 3998|
|007.thegap.com|1995-07-06 17:22:40|"GET /shuttle/cou...| 200| 40310|
|007.thegap.com|1995-07-06 17:22:46|"GET /images/NASA...| 200| 786|
|007.thegap.com|1995-07-06 17:22:47|"GET /images/KSC-...| 200| 1204|
+--------------+-------------------+--------------------+----------+--------------+
only showing top 20 rows
view(transfor_03,"table_transfor_03")
transfor_04 = spark.sql("""
WITH methods AS
(SELECT *,
http_metod(request) AS http_method,
remove_character(request) AS requests
FROM table_transfor_03),
fields AS
(SELECT *
FROM methods
WHERE http_method IN ("GET",
"POST")
AND bytes_returned=200 )
SELECT cast(address AS string) AS address,
to_timestamp(TIMESTAMP) AS TIMESTAMP,
cast(requests AS string) AS request,
cast(coalesce(code_error, "0") AS int) AS code_error,
cast(bytes_returned AS int) AS bytes_returned,
cast(http_method AS string) AS http_methods
FROM fields
""")
transfor_04.show()
[Stage 127:> (0 + 1) / 1]
+--------------+-------------------+--------------------+----------+--------------+------------+
| address| timestamp| request|code_error|bytes_returned|http_methods|
+--------------+-------------------+--------------------+----------+--------------+------------+
|128.159.112.43|1995-07-09 13:51:12| GET /| 0| 200| GET|
|128.159.146.17|1995-07-26 13:03:41| POST| 0| 200| POST|
| 145.2.66.76|1995-07-04 05:57:42| GET /| 0| 200| GET|
| 152.148.10.34|1995-07-26 14:40:31|GET /history/apol...| 0| 200| GET|
| 159.233.80.21|1995-07-07 19:15:03| GET /| 0| 200| GET|
| 159.233.80.21|1995-07-07 19:15:29| GET /| 0| 200| GET|
| 163.205.1.45|1995-07-03 10:40:07| POST| 0| 200| POST|
| 163.205.1.45|1995-07-03 10:40:30| POST| 0| 200| POST|
| 163.205.1.45|1995-07-03 10:42:08| POST| 0| 200| POST|
| 163.205.1.45|1995-07-03 10:42:26| POST| 0| 200| POST|
| 163.205.1.45|1995-07-03 10:44:12| POST| 0| 200| POST|
| 163.205.1.45|1995-07-03 10:45:09| POST| 0| 200| POST|
| 163.205.1.45|1995-07-03 10:45:32| POST| 0| 200| POST|
| 163.205.1.45|1995-07-03 10:45:47| POST| 0| 200| POST|
| 163.205.1.45|1995-07-03 10:46:21| POST| 0| 200| POST|
| 163.205.1.45|1995-07-03 10:46:28| POST| 0| 200| POST|
| 163.205.1.45|1995-07-03 10:46:41| POST| 0| 200| POST|
| 163.205.1.45|1995-07-03 10:46:59| POST| 0| 200| POST|
| 163.205.1.45|1995-07-03 10:56:57| POST| 0| 200| POST|
| 163.205.1.45|1995-07-03 10:57:16| POST| 0| 200| POST|
+--------------+-------------------+--------------------+----------+--------------+------------+
only showing top 20 rows