Apuntes del curso en Notion
Importando Spark
Creando la Session
Una vez creada la session podemos realizar todas las operaciones que spark permite como por ejemplo crear RDD, Dataframe, importar data externa, etc.
Terminar la session o context para que no siga en memoria consumiendo recursos del Cluster con .stop()
Creando Context
SparkContext Permite crear un session para comenzar a utilizar spark
Podemos pasar Context a Session de la siguiente manera
RDD
Transformaciones y acciones
Collect
Podemos verificar la data del RDD con la siguiente acción
Podemos cargar csv en rdd. Recordar que si tenemos algún error solo sera posible ver cuando se ejecute un action
Take
.take() para ver los registros del rdd
Exploramos un Rdd con .map() que recibe una función lambda por ejemplo y aplica dicha función a cada elemento en el rdd y devuelve un nuevo rdd, vamos a ver un ejemplo en la siguiente celda con un conteo de los distintos elemento de la columna 'sigla'. [ x[2] ] -> Procesando como una lista. ( x[2] ) ->Procesando como una tupla
GroupBy
Vamos a utilizar .groupBy() para agrupar la data y .map() para seleccionar primero la columna key por la que necesitamos la agrupación y segundo la columna que vamos a realizar el count. .mapValues(len or list or xfunction) -> retorna la data en base a lo que recibe como parámetro
Filter
Podemos realizar filtros de data en rdd con .filter()
Count
Si crees que tienes un rdd inmenso puedes realizar un .countApprox(20) y enviarle como param el tiempo en milisegundo para que solo cuente hasta ese momento
Union
Vamos a realizar uniones de Rdd con el .union()
Join
Podemos utilizar .join() para realizar cruces en rdd. .top() -> Nos permite observar las dos primeras registros del dataset
Vamos a ordenar las columnas del rdd de tal manera que quede al inicio el campo por el cual se va a hacer join
Uniremos mediante join las dos celdas anteriores y utilizamos .takeSample() para elegir una muestra aleatorias del rdd y se le envía como parámetros True o False para repetidos, la cantidad de registros que quiero ver y por ultimo un numero aleatorio o semilla que va a mostrar los mismos registros siempre
Creamos un nuevo rdd para los ganadores de medallas olimpicas
Vamos a realizar una agrupación y conteo del campo medalla para ver los distintos valores que tiene
Como vemos que son pocos registros tranquilamente podemos usar un .collect() para agrupar
Filtramos la data no queremos los NA
Reto: Haremos un join del deportistas_equipos_rdd que contiene los equipos y los deportista que pertenecen a cada uno de ellos con los ganadores_rdd que tienen solo los que ganaron medallas
Necesitamos solo el campo deportista_id y las siglas del pais al que pertenecen para esto utilizamos slices
Hacemos el join el primer dato es el campo 'deportista_id' de ambos rdd
Cuando hacemos join se crean sub tuplas y para hacer slices para acceder a esos valores es de la siguiente manera
add
Vamos a crear un diccionarios con el puntaje que le asignan a cada pais por medalla
Utilizamos el map del rdd donde tenemos el pais y la medalla ganada y Aplicamos el diccionario donde tenemos mapeados los puntajes de las medallas
Por ultimo aplicamos una sumatoria por paises y ordenamos para obtener el top 5 de los mas ganadores
Dataframe
/root/venv/lib/python3.7/site-packages/pyspark/sql/context.py:79: FutureWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead.
FutureWarning
,nombre_juego,annio,temporada,ciudad
1,1896 Verano,1896,Verano,Athina
2,1900 Verano,1900,Verano,Paris
3,1904 Verano,1904,Verano,St. Louis
4,1906 Verano,1906,Verano,Athina
Load
Vamos a crear el esquema para el dataframe
+--------+-----------+---------+------+
|juego_id| anio|Temporada|Ciudad|
+--------+-----------+---------+------+
| 1|1896 Verano| 1896|Verano|
| 2|1900 Verano| 1900|Verano|
| 3|1904 Verano| 1904|Verano|
| 4|1906 Verano| 1906|Verano|
| 5|1908 Verano| 1908|Verano|
+--------+-----------+---------+------+
only showing top 5 rows
Otra manera de cargar dataframe en las siguientes celdas
+---+--------------------+-----+
| id| equipo|sigla|
+---+--------------------+-----+
| 1| 30. Februar| AUT|
| 2|A North American ...| MEX|
| 3| Acipactli| MEX|
| 4| Acturus| ARG|
| 5| Afghanistan| AFG|
+---+--------------------+-----+
only showing top 5 rows
+-------------+---------+------+----+------+----+---------+
|deportista_id| nombre|genero|edad|altura|peso|equipo_id|
+-------------+---------+------+----+------+----+---------+
| 1|A Dijiang| 1| 24| 180| 80| 199|
| 2| A Lamusi| 1| 23| 170| 60| 199|
+-------------+---------+------+----+------+----+---------+
only showing top 2 rows
+-----+----------+---+---+---+---+---+
| _c0| _c1|_c2|_c3|_c4|_c5|_c6|
+-----+----------+---+---+---+---+---+
|67787|Lee BongJu| 1| 27|167| 56|970|
|67788| Lee BuTi| 1| 23|164| 54|203|
+-----+----------+---+---+---+---+---+
only showing top 2 rows
+-------------+--------------------+------+----+------+----+---------+
|deportista_id| nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
| 1| A Dijiang| 1| 24| 180| 80| 199|
| 2| A Lamusi| 1| 23| 170| 60| 199|
| 3| Gunnar Nielsen Aaby| 1| 24| 0| 0| 273|
| 4|Edgar Lindenau Aabye| 1| 34| 0| 0| 278|
| 5|Christine Jacoba ...| 2| 21| 185| 82| 705|
+-------------+--------------------+------+----+------+----+---------+
only showing top 5 rows
+------------+-------+-------------+--------+---------+
|resultado_id|medalla|deportista_id|juego_id|evento_id|
+------------+-------+-------------+--------+---------+
| 1| NA| 1| 39| 1|
| 2| NA| 2| 49| 2|
| 3| NA| 3| 7| 3|
| 4| Gold| 4| 2| 4|
| 5| NA| 5| 36| 5|
+------------+-------+-------------+--------+---------+
only showing top 5 rows
+---------+--------------------+----------+
|evento_id| evento|deporte_id|
+---------+--------------------+----------+
| 1|Basketball Men's ...| 1|
| 2|Judo Men's Extra-...| 2|
| 3|Football Men's Fo...| 3|
| 4|Tug-Of-War Men's ...| 4|
| 5|Speed Skating Wom...| 5|
+---------+--------------------+----------+
only showing top 5 rows
Operaciones con df
root
|-- deportista_id: string (nullable = true)
|-- nombre: string (nullable = true)
|-- genero: string (nullable = true)
|-- edad: string (nullable = true)
|-- altura: string (nullable = true)
|-- peso: string (nullable = true)
|-- equipo_id: string (nullable = true)
En la siguiente celdas podemos ver como renombrar y eliminar columnas del df
root
|-- deportista_id: string (nullable = true)
|-- nombre: string (nullable = true)
|-- sexo: string (nullable = true)
|-- edad: string (nullable = true)
|-- peso: string (nullable = true)
|-- equipo_id: string (nullable = true)
Filter
Filtrando y ordenando en df
+-------------+--------------------+----+----+----+---------+
|deportista_id| nombre|sexo|edad|peso|equipo_id|
+-------------+--------------------+----+----+----+---------+
| 128719|John Quincy Adams...| 1| 97| 0| 1096|
| 49663| Winslow Homer| 1| 96| 0| 1096|
| 31173|Thomas Cowperthwa...| 1| 88| 0| 1096|
| 118789| Louis Tauzin| 1| 81| 0| 362|
| 69729| Max Liebermann| 1| 80| 0| 399|
+-------------+--------------------+----+----+----+---------+
only showing top 5 rows
Select
Select en df y para utilizar col().alias() debemos importar la siguiente librería
root
|-- deportista_id: string (nullable = true)
|-- nombre: string (nullable = true)
|-- sexo: string (nullable = true)
|-- edad_al_jugar: string (nullable = true)
|-- peso: string (nullable = true)
|-- equipo_id: string (nullable = true)
Joins
root
|-- deportista_id: string (nullable = true)
|-- nombre: string (nullable = true)
|-- sexo: string (nullable = true)
|-- edad_al_jugar: string (nullable = true)
|-- peso: string (nullable = true)
|-- equipo_id: string (nullable = true)
root
|-- resultado_id: string (nullable = true)
|-- medalla: string (nullable = true)
|-- deportista_id: string (nullable = true)
|-- juego_id: string (nullable = true)
|-- evento_id: string (nullable = true)
root
|-- juego_id: integer (nullable = true)
|-- anio: string (nullable = true)
|-- Temporada: string (nullable = true)
|-- Ciudad: string (nullable = true)
root
|-- evento_id: string (nullable = true)
|-- evento: string (nullable = true)
|-- deporte_id: string (nullable = true)
root
|-- id: string (nullable = true)
|-- equipo: string (nullable = true)
|-- sigla: string (nullable = true)
Teniendo definidos los esquemas podemos empezar a realizar joins
+-----------------+-------------+-----+-------+-----------+--------------------+
| nombre|edad_al_jugar|sigla|medalla| anio| evento|
+-----------------+-------------+-----+-------+-----------+--------------------+
|Nstor Abad Sanjun| 23| ESP| NA|2016 Verano|Gymnastics Men's ...|
|Giovanni Abagnale| 21| ITA| Bronze|2016 Verano|Rowing Men's Coxl...|
|Nstor Abad Sanjun| 23| ESP| NA|2016 Verano|Gymnastics Men's ...|
|Nstor Abad Sanjun| 23| ESP| NA|2016 Verano|Gymnastics Men's ...|
|Nstor Abad Sanjun| 23| ESP| NA|2016 Verano|Gymnastics Men's ...|
|Nstor Abad Sanjun| 23| ESP| NA|2016 Verano|Gymnastics Men's ...|
+-----------------+-------------+-----+-------+-----------+--------------------+
only showing top 6 rows
Reto: Hacer Joins de las medallas y los equipos que las ganaron, no mostrar los equipos que no ganaron medallas
+------------+-------+-------------+--------+---------+
|resultado_id|medalla|deportista_id|juego_id|evento_id|
+------------+-------+-------------+--------+---------+
| 4| Gold| 4| 2| 4|
| 38| Bronze| 15| 7| 19|
| 39| Bronze| 15| 7| 20|
| 41| Bronze| 16| 50| 14|
| 42| Bronze| 17| 17| 21|
| 43| Gold| 17| 17| 22|
| 45| Gold| 17| 17| 24|
| 49| Gold| 17| 17| 28|
| 51| Bronze| 17| 19| 22|
| 61| Gold| 20| 38| 32|
| 62| Bronze| 20| 38| 33|
| 64| Silver| 20| 40| 31|
| 65| Bronze| 20| 40| 32|
| 68| Silver| 20| 40| 35|
| 74| Gold| 20| 44| 32|
| 77| Gold| 20| 44| 35|
| 79| Gold| 20| 46| 32|
| 80| Gold| 21| 47| 36|
| 87| Silver| 25| 7| 41|
| 92| Bronze| 29| 37| 45|
+------------+-------+-------------+--------+---------+
only showing top 20 rows
Tenemos que realizar un join de los deportistas con los equipos o selecciones a la que pertenecen y por ultimo otro join con el df de solo ganadores
+-----+-------+
|sigla|medalla|
+-----+-------+
| SWE| Gold|
| FIN| Bronze|
| FIN| Bronze|
| FIN| Bronze|
| FIN| Bronze|
+-----+-------+
only showing top 5 rows
GroupBy
root
|-- sigla: string (nullable = true)
|-- medalla: string (nullable = true)
|-- count: long (nullable = false)
+-----+--------------+
|sigla|Total medallas|
+-----+--------------+
| USA| 5643|
| URS| 2660|
| GBR| 2045|
| GER| 2045|
| FRA| 1785|
| ITA| 1627|
| SWE| 1537|
| CAN| 1347|
| AUS| 1314|
| HUN| 1131|
| GDR| 1084|
| NED| 1039|
| NOR| 1033|
| RUS| 1003|
| CHN| 991|
| JPN| 913|
| FIN| 900|
| SUI| 690|
| ROU| 658|
| KOR| 642|
+-----+--------------+
only showing top 20 rows
Sql
Convertimos los df en tablas temporales dentro de spark pero el verdadero poder de spark se ve en usar las funciones nativas de los df
+------------+-------+-------------+--------+---------+
|resultado_id|medalla|deportista_id|juego_id|evento_id|
+------------+-------+-------------+--------+---------+
| 1| NA| 1| 39| 1|
+------------+-------+-------------+--------+---------+
only showing top 1 row
+-------------+---------+----+-------------+----+---------+
|deportista_id| nombre|sexo|edad_al_jugar|peso|equipo_id|
+-------------+---------+----+-------------+----+---------+
| 1|A Dijiang| 1| 24| 80| 199|
+-------------+---------+----+-------------+----+---------+
only showing top 1 row
+---+-----------+-----+
| id| equipo|sigla|
+---+-----------+-----+
| 1|30. Februar| AUT|
+---+-----------+-----+
only showing top 1 row
+-----+-------+--------+
|sigla|medalla|count(1)|
+-----+-------+--------+
| USA| Gold| 2639|
| USA| Silver| 1648|
| USA| Bronze| 1356|
| URS| Gold| 1137|
| URS| Silver| 783|
| URS| Bronze| 740|
| GBR| Silver| 720|
| GER| Bronze| 709|
| GER| Gold| 690|
| GBR| Gold| 675|
| FRA| Bronze| 668|
| GBR| Bronze| 650|
| GER| Silver| 646|
| FRA| Silver| 613|
| ITA| Gold| 574|
| SWE| Bronze| 535|
| ITA| Bronze| 528|
| ITA| Silver| 525|
| SWE| Silver| 522|
| AUS| Bronze| 511|
+-----+-------+--------+
only showing top 20 rows
Udf
UDF (User Defined Functions) son las funciones de usuario, y son sistemas para definir nuevos métodos SQL que operan sobre las columnas de un DataFrame
+-------------+--------------------+------+----+------+----+---------+
|deportista_id| nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
| 1| A Dijiang| 1| 24| 180| 80| 199|
| 2| A Lamusi| 1| 23| 170| 60| 199|
| 3| Gunnar Nielsen Aaby| 1| 24| null|null| 273|
| 4|Edgar Lindenau Aabye| 1| 34| null|null| 278|
| 5|Christine Jacoba ...| 2| 21| 185| 82| 705|
| 6| Per Knut Aaland| 1| 31| 188| 75| 1096|
| 7| John Aalberg| 1| 31| 183| 72| 1096|
| 8|"Cornelia ""Cor""...| 2| 18| 168|null| 705|
| 9| Antti Sami Aalto| 1| 26| 186| 96| 350|
| 10|"Einar Ferdinand ...| 1| 26| null|null| 350|
| 11| Jorma Ilmari Aalto| 1| 22| 182|76.5| 350|
| 12| Jyri Tapani Aalto| 1| 31| 172| 70| 350|
| 13| Minna Maarit Aalto| 2| 30| 159|55.5| 350|
| 14|Pirjo Hannele Aal...| 2| 32| 171| 65| 350|
| 15|Arvo Ossian Aaltonen| 1| 22| null|null| 350|
| 16|Juhamatti Tapio A...| 1| 28| 184| 85| 350|
| 17|Paavo Johannes Aa...| 1| 28| 175| 64| 350|
| 18|Timo Antero Aaltonen| 1| 31| 189| 130| 350|
| 19|Win Valdemar Aalt...| 1| 54| null|null| 350|
| 20| Kjetil Andr Aamodt| 1| 20| 176| 85| 742|
+-------------+--------------------+------+----+------+----+---------+
only showing top 20 rows
Creamos una función normal
Registramos la udf en spark con las siguientes líneas
+----------------+
|<lambda>(altura)|
+----------------+
| 180|
| 170|
| 0|
| 0|
| 185|
| 188|
| 183|
| 168|
| 186|
| 0|
| 182|
| 172|
| 159|
| 171|
| 0|
| 184|
| 175|
| 189|
| 0|
| 176|
+----------------+
only showing top 20 rows
Persistencia
Vamos a validar si un df esta almacenado en cache
Para mantener el df en cache podemos ejecutar la siguiente propiedad
Para saber cual es el tipo de persistencia que tiene ejecutamos la siguiente sentencia y podemos validar en el link que significa su salida
Vamos a bajar de la persistencia en cache el df y aplicar una persistencia en disco y cache memoria
Vamos agregar un numero mas de replicación al df