• Caso de negocio

Los los aeropuertos son sin duda lugares que cada d铆a tienen m谩s tr谩fico a nivel mundial, como ejemplo el aeropuerto de Sidney, con m谩s de 37MM de pasajeros y 300k vuelos anuales. Este alto tr谩fico desencadena varias situaciones que impactan en la insatisfacci贸n y frustaci贸n de una experiencia de calma y fluidez para el pasajero. Es asi que que se pretende usar la ingente cantidad de datos disponibles de viajes cancelados por vuelos retrasados, problemas t茅cnicos, etc. en una aplicaci贸n de Big Data con el objetivo de usar esos recursos eficientemente, evitar reclamos y tener fluidez en los vuelos.

  • Justificaci贸n del caso

Para este caso, vamos a considerar la informaci贸n de vuelos totales (entre cancelados y no cancelados) o que han tenido retrasos del a帽o 2015 que tiene aprox. de 5MM de registros.

Nuestro proyecto analizar谩 principalmente, destinos m谩s cancelados, almacenamiento de esta informaci贸n en un cluster de google debido al tama帽o y la velocidad con la que esta informaci贸n se genera.

Volumen de datos: Debido a que si tomamos en cuenta toda la informaci贸n generada por un solo pa铆s (Como en este caso USA) y tenemos la cantidad de 5 millones, es de suponerse que si se quiere escalar esto se va a necesitar una mayor cantidad de almacenamiento de datos.

Variedad de datos: La informaci贸n que tenemos actualmente proviene de un CSV ya con la informaci贸n empaquetada, pero mucha de esta informaci贸n proviene de sensores del mismo avi贸n, de base datos tradicionales, es por ello que este punto es justificable.

Velocidad de datos: La velocidad determina que todo el volumen y variedad de informaci贸n sea procesada en tiempo real, permitiendo una visi贸n perif茅rica del momento y el entorno. En ese sentido, si damos un vistazo general a todos los aviones y toda la informaci贸n que estos generan por segundo, es de suponer que esta variable ser谩 muy importante.

1.JPG

  • Identificaci贸n de datos

Fuente: La Oficina de Estad铆sticas de Transporte del Departamento de Transporte de los Estados Unidos (DOT). Descripci贸n: DOT hace el raestro de desempe帽o a tiempo de los vuelos nacionales operados por grandes compa帽铆as a茅reas con la siguiente informaci贸n:

  • vuelos a tiempo
  • vuelos retrasados
  • vuelos cancelados
  • vuelos desviados

Data: https://www.kaggle.com/usdot/flight-delays#airlines.csv

  • Clasificaci贸n de la Data

2.JPG

  • Sizing de la data

La informaci贸n que tenemos corresponde a todos los vuelos solamente dentro de estados unidos, teniendo como sizing :

Mensualmente :

3.JPG

Diariamente : (sacando un promedio con Spark)

4.JPG

  • Arquitectura del Cluster

5.JPG

Donde los nodos distribuidos comparten las mismas especificaciones t茅cnicas:

Procesador: Intel Haswell (2 cores) Memoria Ram: 7.5 GB Disco Duro: 60 GB Versi贸n OS: Debian GNU Linux 9

  • Arquitectura conceptual

6.JPG

RDA : Raw Data Area, donde se realiza el almacenamiento en el datalake de los datos en su formato de origen

Aqui estan los archivos :

1
2
3
##flights.csv (contiene todos los eventos de los vuelos)
##airlines.csv (tabla descriptiva de aereolineas)
##airports.csv (tabla descriptva de aereopuertos)

UDA: Universal Data Area, en donde se realiza proceso de transformaci贸n y/o calidad de datos a los archivos.

Aqui se crearon las tablas modeladas y con particiones:

uda.flights uda.airlines uda.airports

DDA: Dimensional Data Area, es la capa accesible por el usuario y donde se suele desarrollar reportes, datamarts, tableros, etc.

Aqui se cre贸 la tabla dda.flights_cube , donde tiene la tabla y todos los cruces necesarios, adem谩s desde aqu铆 se van a generar todos los tableros para Tableau

  • Tecnolog铆as usadas

7.jpeg

  • Justificaci贸n de perfiles:

Data Engineer El ingeniero de datos es alguien que desarrolla, construye, prueba y mantiene arquitecturas, como bases de datos y sistemas de procesamiento a gran escala. Este perfil ser谩 necesario para la construcci贸n de los procesos de ingesta de data de vuelos, as铆 como de los pipelines necesarios.

Data Scientist Debido a la gran cantidad de informaci贸n almacenada en el datalake, ser谩 necesario el perfil con las habilidades para descubrir los insights detr谩s de esta data, predecir con ayuda de algoritmos los futuros escenarios y con los conocimientos necesarios en ciencias matematicas y/o estad铆stica.

Data Analyst Esta persona con la ayuda de herramientas de visualizaci贸n de datos como Tableau, Power BI , tendr谩 la repsonsabilidad de presentar lo resultados obtenidos de una manera clara y concisa.

8.png

  • Ingesta de datos:

Creamos las carpetas del hdfs:

hdfs dfs -mkdir /datalake hdfs dfs -mkdir /datalake/rda hdfs dfs -mkdir /datalake/rda/flights hdfs dfs -mkdir /datalake/rda/airlines hdfs dfs -mkdir /datalake/rda/airports hdfs dfs -mkdir /datalake/uda/ hdfs dfs -mkdir /datalake/uda/flights hdfs dfs -mkdir /datalake/uda/flights/no_part hdfs dfs -mkdir /datalake/uda/airlines hdfs dfs -mkdir /datalake/uda/airports hdfs dfs -mkdir /datalake/dda

Se cargaron los archivos al cluster CentOS

9.JPG

Luego se realiz贸 la ingesta hacia hadoop:

10.JPG

Validamos los resultados

11.JPG

  • Preparaci贸n de la data en Spark:

Leemos los archivos a variables dataframes en spark:

var dfflights_1 = spark.read.format("csv").option("header","true").option("sep",",").schema(schemaFlights).load("/datalake/rda/flights") val dfflights = dfflights_1.withColumn("date",struct(col("year"),col("month"),col("day"),col("day_of_week"))) var dfairlines = spark.read.format("csv").option("header","true").option("sep",",").schema(schemaAirlines).load("/datalake/rda/airlines") var dfairports_1 = spark.read.format("csv").option("header","true").option("sep",",").schema(schemaAirports).load("/datalake/rda/airports") val dfairports = dfairports_1.withColumn("geolocalization",struct(col("latitude"),col("longitude")))

#####################joins en spark###############################

var dfflightxairlines = dfflights.join(dfairlines.selectExpr("iata_code as iata_code_airline","airline as airline_desc"),@@0@@"iata_code_airline", "left") var dfflightxairlinesxairports = dfflightxairlines.join(dfairports.selectExpr("iata_code as iata_code_origin","airport as airport_origin","city as city_origin","state as state_origin","country as country_origin","latitude as latitude_origin","longitude as longitude_origin"),@@1@@"iata_code_origin", "left") var dffxalxai = dfflightxairlinesxairports.join(dfairports.selectExpr("iata_code as iata_code_destination","airport as airport_destination","city as city_destination","state as state_destination","country as country_destination","latitude as latitude_destination","longitude as longitude_destination"),@@2@@"iata_code_destination", "left")

###guardamos en formato parquet dfflights.write.format("parquet").option("mode", "OVERWRITE").option("path", "/datalake/uda/flights/flights.parquet").save() dfairlines.write.format("parquet").option("mode", "OVERWRITE").option("path", "/datalake/uda/airlines/airlines.parquet").save() dfairports.write.format("parquet").option("mode", "OVERWRITE").option("path", "/datalake/uda/airports/airports.parquet").save() dffxalxai.write.format("parquet").option("mode", "OVERWRITE").option("path", "/datalake/uda/flights/flights_all.parquet").save()

  • Creacion de las tablas en Hive:

Tabla normales:

CREATE EXTERNAL TABLE uda.airlines ( iata_code String, airline String ) STORED AS PARQUET LOCATION '/datalake/uda/airlines/';

12.JPG

CREATE EXTERNAL TABLE uda.airports ( iata_code String, airport String, city String, state String, country String, latitude Double, longitude Double ) STORED AS PARQUET LOCATION '/datalake/uda/airports/';

13.JPG

CREATE EXTERNAL TABLE uda.flights ( year Integer , month Integer , day Integer , day_of_week Integer , airline String , flight_number Integer , tail_number String , origin_airport String , destination_airport String , scheduled_departure Integer , departure_time Integer , departure_delay Integer , taxi_out Integer , wheels_off Integer , scheduled_time Integer , elapsed_time Integer , air_time Integer , distance Integer , wheels_on Integer , taxi_in Integer , scheduled_arrival Integer , arrival_time Integer , arrival_delay Integer , diverted Integer , cancelled Integer , cancellation_reason String , air_system_delay Integer , security_delay Integer , airline_delay Integer , late_aircraft_delay Integer , weather_delay Integer ) STORED AS PARQUET LOCATION '/datalake/uda/flights/flights.parquet';

14.JPG

Tabla con particion est谩tica:

CREATE EXTERNAL TABLE uda.flights_part ( day Integer , flight_number Integer , tail_number String , origin_airport String , origin_city String, destination_airport String, destination_city String ) PARTITIONED BY (month Integer) STORED AS PARQUET LOCATION '/datalake/uda/flights/partition';

insert into uda.flights_part partition(month ='201501') select a.day, a.flight_number, a.tail_number, a.origin_airport, b.city, a.destination_airport, c.city from uda.flights a left join uda.airports b on a.origin_airport=b.iata_code left join uda.airports c on a.destination_airport=c.iata_code where a.month =1 and a.year=2015;

15.JPG

###PARTICION DINAMICA

set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict;

CREATE TABLE uda.flights_part_dynamic ( day Integer , origin_airport String , destination_airport String , scheduled_time Integer , elapsed_time Integer ) PARTITIONED BY (month string) STORED AS PARQUET LOCATION '/datalake/uda/flights/partition_d';

insert into uda.flights_part_dynamic partition(month) select day , origin_airport, destination_airport, scheduled_time, elapsed_time, month from uda.flights;

16.JPG

#tablas con buckets

CREATE EXTERNAL TABLE uda.flights_part_bucket ( day Integer , origin_airport String , destination_airport String , scheduled_time Integer , elapsed_time Integer, airline String ) CLUSTERED BY (airline) INTO 4 BUCKETS STORED AS PARQUET LOCATION '/datalake/uda/flights/partition_b';

set hive.enforce.bucketing = true;

insert into uda.flights_part_bucket
select day, origin_airport , destination_airport, scheduled_time, elapsed_time, airline from uda.flights;

  • Presentaci贸n de resultados en Tableau

tableau3.JPGtableau2.JPGtableau1.JPG

  • Desarrollo de fujo b谩sico en Apache Kafka

14.1 Creaci贸n del tema Mytopic

kafka1.JPG

14.2 Creaci贸n de la suscripci贸n mySub

kafka2.JPG

14.3 Creaci贸n del mensaje:鈥滺ello Word鈥

kafka3.JPG

14.4 Extraer el mensaje de la subscripci贸n

kafka4.JPG

  • Justificaci贸n de un flujo de Kafka en el proyecto:

Apache Kafka es un sistema de mensajes distribuidos de c贸digo abierto que permite el procesamiento de flujos de datos en tiempo real. Kafka toma un flujo continuo de datos de un topic de entradas, lo transforma en un flujo discreto llamado DStream, lo procesa como una secuencia de RDD鈥檚 (Resilient Distributed Data), la unidad de informaci贸n que el motor de Spark utiliza, y produce un flujo de datos de salida. Puede trabajar datos de transmisiones como secuencias de clics de sitios web, transacciones financieras, etc.

a_kafka1.JPG

  • Desarrollo de un Modelo de Machine Learning en DataBricks:

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1735837945562005/4021292648634437/5868886568007952/latest.html

#Leer data data = spark.read.option("multiline","true").parquet("/FileStore/tables/part_00001_7f9bc5a0_a93d_4d4e_b82a_de14d7e48eb0_c000_snappy-5356a.parquet") #Transformacion data.show()

#Volviendo los datos numericos #airline from pyspark.sql import functions as F data = data.withColumn("airlinenum",F.when(F.col("airline")=="UA", 1).when(F.col("airline")=="AA", 2).when(F.col("airline")=="US", 3).when(F.col("airline")=="F9", 4).when(F.col("airline")=="B6", 5).when(F.col("airline")=="OO", 6).when(F.col("airline")=="AS", 7).when(F.col("airline")=="NK", 8).when(F.col("airline")=="WN", 9).when(F.col("airline")=="DL", 10).when(F.col("airline")=="EV", 11).when(F.col("airline")=="HA", 12).when(F.col("airline")=="MQ", 13).when(F.col("airline")=="VX", 14).otherwise(None)) data.select("airline","airlinenum").show()

bricks1.JPG

#Se selecciona las variables a utilizar data2 = data.select("month","day","airlinenum","origin_airportnum","destination_airportnum","scheduled_departure","departure_time","departure_delay","elapsed_time","air_time"\ ,"distance", "arrival_time", "arrival_delay","cancelled") data2.show(5)

bricks_2.JPG

#Sustituir nulos data3 = data2.fillna({"month": 0,"day": 0,"airlinenum": 0,"origin_airportnum": 0,"destination_airportnum": 0,"scheduled_departure": 0, "departure_time": 0, "departure_delay": 0, "elapsed_time": 0, "arrival_delay": 0,"air_time": 0,"distance": 0,"arrival_time": 0,"month": 0})

bricks7.JPGbricks6.JPGbricks5.JPGbricks4.JPGbricks3.JPG