from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import regexp_replace, trim, col, lower
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import Tokenizer
import os
from bs4 import BeautifulSoup
from collections import Counter
import re
sc = SparkContext.getOrCreate()
input1 = sc.wholeTextFiles("gs://dataproc-staging-us-central1-754837649975-l6odifqz/notebooks/jupyter/input_docs/");
rdd = input1.map(lambda x: (os.path.basename(x[0]).split(".")[0], BeautifulSoup(x[1]).get_text()))
schema = StructType( [
StructField('id', StringType()),
StructField('text', StringType()),
])
df = spark.createDataFrame(rdd,schema)
print(df.show())
+---------+--------------------+
| id| text|
+---------+--------------------+
|PageWeb-1|26-FEB-1987 15:02...|
|PageWeb-2|26-FEB-1987 15:03...|
|PageWeb-3|26-FEB-1987 15:07...|
|PageWeb-4|26-FEB-1987 15:10...|
|PageWeb-5|26-FEB-1987 15:14...|
|PageWeb-6|26-FEB-1987 15:14...|
|PageWeb-7|26-FEB-1987 15:15...|
|PageWeb-8|26-FEB-1987 15:17...|
|PageWeb-9|26-FEB-1987 15:01...|
+---------+--------------------+
None
tokenizer = Tokenizer(inputCol="text", outputCol="array")
filterw = tokenizer.transform(df)
filterw.take(1)
remover = StopWordsRemover(inputCol="array", outputCol="arraywithoutstopwords")
filtered_final = remover.transform(filterw)
filtered_final.take(2)
def inverted_index(documents):
#MAP
#Invierte las palabras y el index por cada palabra
documents = documents.flatMap(lambda x: [(tag, x[0]) for tag in x[1]])
#REDUCE
#Agrupa las palabras y crear un arreglo de los index en los que aparece
documents = documents.groupByKey()
inverted_index = documents.map(lambda x: (x[0], list(x[1])))
print("Inverted Index creado")
return inverted_index
messages = filtered_final.select("id","arraywithoutstopwords").rdd
inverted_index = inverted_index(messages)
inverted_index.take(4)
Inverted Index creado
import requests
URL = 'https://elcomercio.pe'
def get_array_links(url):
html_content = requests.get(URL).text
soup = BeautifulSoup(html_content, 'lxml')
arraylinks = [a.get('href') if 'https://' in a.get('href') else (URL+a.get('href')) for a in soup.find_all('a', href=True)]
return arraylinks
url = 'https://elcomercio.pe'
arraylinks = get_array_links(url)
array_tuples = []
array_tuples.append((url,arraylinks))
array_tuples[0][0],array_tuples[0][1][:15]
for link in arraylinks :
arraylink = get_array_links(link)
array_tuples.append((link,arraylink))
print('Cantidad de tuplas en el arreglo: ',len(array_tuples))
Cantidad de tuplas en el arreglo: 332
RDDlinks = sc.parallelize(array_tuples)
print('Cantidad de filas en el RDD: ', RDDlinks.count())
RDDlinks.take(1)[0][0], RDDlinks.take(1)[0][1][:15]
Cantidad de filas en el RDD: 332
#MAP
#Invierte las palabras y el index por cada palabra
documents = RDDlinks.flatMap(lambda x: [(tag, x[0]) for tag in x[1]])
#REDUCE
#Agrupa las palabras y crear un arreglo de los index en los que aparece
documents = documents.groupByKey()
inverted_index = documents.map(lambda x: (x[0], list(x[1])))
print('Cantidad de filas en despues de aplicar indice invertido',inverted_index.count())
inverted_index.take(1)[0][0], inverted_index.take(1)[0][1][:15]
Cantidad de filas en despues de aplicar indice invertido 209
N = inverted_index.count()
print(N)
209
ranks = ranks1 = inverted_index.map(lambda node: (node[0],1.0/N))
print('Cantidad de paginas :', len(ranks.collect()))
ranks.collect()[:10]
Cantidad de paginas : 209
for i in range(2):
#Se unen el rango con el array de paginas
ranks = inverted_index.join(ranks)
#MAP
ranks = ranks.flatMap(lambda x : [(i, float(x[1][1])/len(x[1][0])) for i in x[1][0]])
#REDUCE
ranks = ranks.reduceByKey(lambda x,y: x+y)
array_elements_0 = ranks1.subtractByKey(ranks).collect()
if len(array_elements_0) > 0:
# print(array_elements_0)
new_array_tuples = [(element[0],0.0) for element in array_elements_0]
# print(new_array_tuples)
RDD_elements_0 = sc.parallelize(new_array_tuples)
# RDD_elements_0
ranks = ranks.union(RDD_elements_0)
# print(ranks.sortByKey().collect())
print('Cantidad de paginas :', len(ranks.collect()))
ranks.sortByKey().collect()[:10]
Cantidad de paginas : 209
from google.cloud import storage
client = storage.Client()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
schema = StructType([
StructField('link', StringType()),
StructField('Importancia',FloatType())
])
df = spark.createDataFrame(ranks,schema)
df.show()
+--------------------+------------+
| link| Importancia|
+--------------------+------------+
|https://elcomerci...|0.0031293256|
|https://elcomerci...|0.0031293256|
|https://elcomerci...|0.0029440154|
|https://elcomerci...|0.0029440154|
|https://elcomerci...| 0.009202667|
|https://elcomerci...| 0.006258651|
|https://elcomerci...|0.0031293256|
|https://elcomerci...| 0.006258651|
|https://elcomerci...|0.0029440154|
|https://elcomerci...| 0.005888031|
|https://elcomerci...|0.0029440154|
|https://elcomerci...|0.0029440154|
|https://elcomerci...|0.0029440154|
|https://elcomerci...| 0.005888031|
|https://elcomerci...| 0.011776062|
|https://elcomerci...| 0.005888031|
|https://elcomerci...| 0.006073341|
|https://elcomerci...|0.0031293256|
|https://elcomerci...| 0.006258651|
|https://elcomerci...|0.0029440154|
+--------------------+------------+
only showing top 20 rows
df.toPandas().to_json('gs://dataproc-staging-us-central1-754837649975-l6odifqz/notebooks/jupyter/ComercioDiarioDatosUltimo.json')