PROYECTO BIG DATA PARA LA OPTIMIZACI脫N DE RECLAMOS

#PROYECTO-BIG-DATA-PARA-LA-OPTIMIZACI脫N-DE-RECLAMOS

image.png

Integrantes:

#Integrantes:

Juan Jose Soto Cruz

#Juan-Jose-Soto-Cruz

Yuri Vanessa Ccencho Atauje

#Yuri-Vanessa-Ccencho-Atauje

Jose Berrocal Bensur

#Jose-Berrocal-Bensur

1. Caso de Negocio

#1.-Caso-de-Negocio

El caso de negocio es sobre las quejas del consumidor sobre productos y servicios financieros que se centralizan por la oficina de protecci贸n financiera del consumidor.El objetivo del proyecto es poder dise帽ar una infraestructura que soporte el uso de Big Data para el an谩lisis de informaci贸n y as铆 establecer recomendaciones a las entidades financieras para mejorar su funcionamiento y ofrecer mejores experiencias a los usuarios y a los consumidores a detectar tendencias de actos injustos o enga帽osos para evitar problemas por adelantado.

2. Justificaci贸n

#2.-Justificaci贸n

Proponer una soluci贸n para el an谩lisis de datos. Los datos por m谩s volumen, variedad que puedan ser tener, si no se analizan no resultan 煤tiles para las recomendaciones y toma de decisiones. Lo que se busca en primera instancia es centralizar esos datos para su extracci贸n desde diferentes fuentes, para realizar su almacenamiento, procesamiento y an谩lisis.

Volumen:

#Volumen:

Gran cantidad de datos generados diariamente producto de las quejas de los usuarios de todo el sistema financiero de un pais.

Velocidad:

#Velocidad:

Se requiere mejoras en el tiempo de ingesta de informaci贸n, a fin de reducir los tiempos de espera para el an谩lisis de informaci贸n oportuna.

Variedad:

#Variedad:

Se manejan diversas variables de data, as铆 como m煤ltiples fuentes input de informaci贸n.

3. Identificaci贸n y Clasificaci贸n de Fuentes

#3.-Identificaci贸n-y-Clasificaci贸n-de-Fuentes

La informaci贸n utilizada en el presente trabajo, se basa en fuente de datos externa de bancos americanos donde proporcionan las quejas de consumidores y respuesta de la empres al consumidor, as铆 como datos adicionales como empresa, estado, fecha de recepci贸n, v铆a de presentaci贸n de la queja, entre otros datos. image.png

4.Sizing de la Generaci贸n de Data

#4.Sizing-de-la-Generaci贸n-de-Data

La Oficina de Protecci贸n Financiera del Consumidor (CFPB) es la primera agencia federal enfocada exclusivamente en protecci贸n financiera de los consumidores, y las quejas de los consumidores son una parte esencial de este trabajo. En tal sentido hemos determinado el crecimiento de la data, bas谩ndonos en indicadores de medici贸n, el cual visualizamos en el siguiente cuadro.

image.png

5.Dise帽o de arquitectura conceptual

#5.Dise帽o-de-arquitectura-conceptual

El modelo conceptual considera cinco componentes funcionales conectados por interfaces: image.png

6.Dise帽o De Arquitectura Tecnol贸gica

#6.Dise帽o-De-Arquitectura-Tecnol贸gica

image.png

7.1Perfiles de Bigdata

#7.1Perfiles-de-Bigdata

Los roles determinados que participaran en las siguientes fases del proyecto, se visualiza en la siguiente imagen:

image.png

7.2 Presupuesto Del Proyecto

#7.2-Presupuesto-Del-Proyecto

image.png

COSTO TOTAL DEL PROYECTO (expresado en d贸lares americanos): USD 108,626

8Tramiento de datos en un Datalake

#8Tramiento-de-datos-en-un-Datalake

Dentro del proyecto tenemos definifos 3 capas de datalake

image.png

Comandos usados para la creaci贸n de las 2 primeras capas hdfs dfs -mkdir /todaladata/

hdfs dfs -put state.csv /todaladata/ hdfs dfs -put company.csv /todaladata/ hdfs dfs -put issues.csv /todaladata/ hdfs dfs -put complaints.csv /todaladata/

hdfs dfs -cp /todaladata/state.csv /carpeta4/state/ hdfs dfs -cp /todaladata/company.csv /carpeta4/company/ hdfs dfs -cp /todaladata/issues.csv /carpeta4/issues/ hdfs dfs -cp /todaladata/complaints.csv /carpeta4/complaints/

Las carpetas de las 2da capa fueron creados al momento de crear las tablas

Nos conectamos y creamos la Base de Datos en Hive:

-- beeline -u jdbc:hive2://10.128.0.9:10000/
-- create database trabajo4;

8.1 Crear tablas en hive (simple, bucketing, partici贸n din谩mica y est谩tica)

#8.1-Crear-tablas-en-hive-(simple,-bucketing,-partici贸n-din谩mica-y-est谩tica)

CREATE EXTERNAL TABLE IF NOT EXISTS trabajo4.issues( id_issue string COMMENT 'Id_issue', issues string COMMENT 'Issues' ) COMMENT 'Tabla Issues' ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE LOCATION '/carpeta4/issues' tblproperties("skip.header.line.count" = "1");

use trabajo4; CREATE EXTERNAL TABLE IF NOT EXISTS trabajo4.complaints( date_received date COMMENT 'Date_received', id_issue string COMMENT 'Id_issue', id_company string COMMENT 'Id_company', id_state string COMMENT 'Id_state', consumer_consent_provided string COMMENT 'Consumer_consent_provided', submitted_via string COMMENT 'Submitted_via', date_sent_to_company date COMMENT 'Date_sent_to_company', company_response_to_consumer string COMMENT 'Company_response_to_consumer', timely_response string COMMENT 'Timely_response' ) COMMENT 'Tabla Complaints' ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE LOCATION '/carpeta4/complaints' tblproperties("skip.header.line.count" = "1");

image.png

8.2 Crear tablas en hive (compleja: array, struct, map)

#8.2-Crear-tablas-en-hive-(compleja:-array,-struct,-map)

use trabajo4; CREATE EXTERNAL TABLE IF NOT EXISTS trabajo4.company( id_company string COMMENT 'Id_company', company_name string COMMENT 'Company_name', id_company_company_name struct<id_company:STRING,company_name:string> COMMENT 'Id_company_company_name' ) COMMENT 'Tabla Company' ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY ':' LOCATION '/carpeta4/company' tblproperties("skip.header.line.count" = "1");

use trabajo4; CREATE EXTERNAL TABLE IF NOT EXISTS trabajo4.state( id_state string COMMENT 'Id_state', state string COMMENT 'State', id_state_state struct<id_state:STRING,state:string> COMMENT 'Id_state_state' ) COMMENT 'Tabla State' ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY ':' LOCATION '/carpeta4/state' tblproperties("skip.header.line.count" = "1");

image.png

8.3 Tabla est谩tica y funci贸n de Ingesta

#8.3-Tabla-est谩tica-y-funci贸n-de-Ingesta

CREATE EXTERNAL TABLE trabajo4.tabla_particion_estatica ( date_received date COMMENT 'Date_received', id_issue string COMMENT 'Id_issue', id_company string COMMENT 'Id_company', id_state string COMMENT 'Id_state', consumer_consent_provided string COMMENT 'Consumer_consent_provided', submitted_via string COMMENT 'Submitted_via', date_sent_to_company date COMMENT 'Date_sent_to_company', company_response_to_consumer string COMMENT 'Company_response_to_consumer', timely_response string COMMENT 'Timely_response' ) PARTITIONED BY (mensual STRING) STORED AS PARQUET LOCATION '/carpeta4/complaints_part_esta/tabla_particion_estatica';

insert into trabajo4.tabla_particion_estatica partition(mensual = '201211') select from_unixtime (unix_timestamp(Concat(substring(date_received,1,4),SUBSTRING(date_received,5,2), SUBSTRING(date_received,7,2)), 'yyyyMMdd'), 'yyyy-MM-dd') as date_received, id_issue, id_company, id_state, consumer_consent_provided, submitted_via, from_unixtime (unix_timestamp(Concat(substring(date_sent_to_company,1,4),SUBSTRING(date_sent_to_company,5,2), SUBSTRING(date_sent_to_company,7,2)), 'yyyyMMdd'), 'yyyy-MM-dd') as date_sent_to_company, company_response_to_consumer, timely_response from trabajo4.complaints where date_received like '2012-11-%';

image.png

8.4 Tabla D铆namica y funci贸n de Ingesta

#8.4-Tabla-D铆namica-y-funci贸n-de-Ingesta

CREATE EXTERNAL TABLE trabajo4.tabla_particion_dinamica ( id_issue string COMMENT 'Id_issue', id_company string COMMENT 'Id_company', id_state string COMMENT 'Id_state', consumer_consent_provided string COMMENT 'Consumer_consent_provided', submitted_via string COMMENT 'Submitted_via', date_sent_to_company date COMMENT 'Date_sent_to_company', company_response_to_consumer string COMMENT 'Company_response_to_consumer', timely_response string COMMENT 'Timely_response' ) PARTITIONED BY (date_received STRING) STORED AS PARQUET LOCATION '/carpeta4/complaints_part_dina/tabla_particion_dinamica';

set hive.exec.dynamic.partition.mode=nonstrict;

insert into trabajo4.tabla_particion_dinamica partition(date_received) select id_issue, id_company, id_state, consumer_consent_provided, submitted_via, from_unixtime (unix_timestamp(Concat(substring(date_sent_to_company,1,4),SUBSTRING(date_sent_to_company,5,2), SUBSTRING(date_sent_to_company,7,2)), 'yyyyMMdd'), 'yyyy-MM-dd') as date_sent_to_company, company_response_to_consumer, timely_response, from_unixtime (unix_timestamp(Concat(substring(date_received,1,4),SUBSTRING(date_received,5,2), SUBSTRING(date_received,7,2)), 'yyyyMMdd'), 'yyyy-MM-dd') as date_received from trabajo4.complaints where date_received like '2012-0%';

image.png

8.5 Bucking

#8.5-Bucking

CREATE EXTERNAL TABLE IF NOT EXISTS trabajo4.company_buckets ( id_company string COMMENT 'Id_company', company_name string COMMENT 'Company_name', id_company_company_name struct<id_company:STRING,company_name:string> COMMENT 'Id_company_company_name' ) CLUSTERED BY (company_name) INTO 2 BUCKETS ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' COLLECTION ITEMS TERMINATED BY ',' MAP KEYS TERMINATED BY ':' LOCATION '/carpeta4/company_buck/tabla_bucketing';

set map.reduce.tasks = 2; set hive.enforce.bucketing = true; INSERT OVERWRITE TABLE trabajo4.company_buckets SELECT * FROM trabajo4.company;

image.png

8.6 Transformaciones con Spark

#8.6-Transformaciones-con-Spark

Procedemos a pasar las tablas a data frames

val df_state = spark.read.table("trabajo4.state") df_state.show() df_state.printSchema()

val df_issues = spark.read.table("trabajo4.issues") df_issues.show() df_issues.printSchema()

val df_complaints = spark.read.table("trabajo4.complaints") df_complaints.show() df_complaints.printSchema()

val df_company = spark.read.table("trabajo4.company") df_company.show() df_company.printSchema()

image.png

DataFrame Consolidada

#DataFrame-Consolidada

val df_tablon =df_complaints.as("cot").join(df_issues.as("iss"),"id_issue").join(df_state.as("sta"),"id_state").join(df_company.as("coy"),"id_company").select("cot.date_received","iss.issues","coy.company_name","sta.state","cot.consumer_consent_provided","cot.submitted_via","cot.date_sent_to_company","cot.company_response_to_consumer","cot.timely_response")

df_tablon.show() df_tablon.printSchema()

image.png

Guardamos el dataframe en parquet y creamos una tabla con la data a analizar

df_tablon.write.format("parquet").save("/carpeta4/tablon/") df_tablon.createOrReplaceTempView("tablon"); spark.sql("create table if not exists trabajo4.tablon as select * from tablon");

La 鈥/carpeta4/tablon/" es la tercera capa del datalake que se generar谩

image.png

8.7. Transformaciones con UDFs

#8.7.-Transformaciones-con-UDFs

COPIAMOS EL CONTENIDO DE UNA TABLA Y LO CONVERTIMOS EN DATAFRAME

#COPIAMOS-EL-CONTENIDO-DE-UNA-TABLA-Y-LO-CONVERTIMOS-EN-DATAFRAME

val df_tablon_udf = spark.read.table("trabajo4.tablon");

df_tablon_udf.show(1) df_tablon_udf.printSchema()

image.png

CREAMOS LA FUNCI脫N

#CREAMOS-LA-FUNCI脫N

val upper: String => String = _.toUpperCase

PASAMOS LA FUNCI脫N A UDF

#PASAMOS-LA-FUNCI脫N-A-UDF

import org.apache.spark.sql.functions.udf val upperUDF = udf(upper)

EJECUTAMOS LA FUNCION EN EL DATAFRAME

#EJECUTAMOS-LA-FUNCION-EN-EL-DATAFRAME

val df_tablon_udf_post = df_tablon_udf.withColumn("upper", upperUDF('issues))

df_tablon_udf_post.createOrReplaceTempView("tablon_udf"); spark.sql("create table if not exists trabajo4.tablon_udf as select * from tablon_udf"); df_tablon_udf_post.write.format("parquet").save("/carpeta4/tablon_udf/")

spark.sql("""SELECT from trabajo4.tablon_udf""").show(1) spark.sql("""SELECT from trabajo4.tablon_udf""").printSchema()

image.png

8.8. Tablas en HBASE

#8.8.-Tablas-en-HBASE

image.png

9.Apache Kafka

#9.Apache-Kafka

image.png

9.1 Explicar el Funcionamiento del Flujo en Streaming

#9.1-Explicar-el-Funcionamiento-del-Flujo-en-Streaming

Apache Kafka es un sistema de mensajes distribuidos de c贸digo abierto que permite crear aplicaciones en tiempo real con datos de transmisiones. Puede enviar datos de transmisiones como secuencias de clics de sitios web, transacciones financieras y registros de aplicaci贸n a su cl煤ster de Kafka, el cual almacena los datos en b煤fer y los env铆a a aplicaciones de procesamiento de transmisiones creadas en marcos como Apache Spark Streaming, Apache Storm o Apache Samza.

image.png

10. Visualizaci贸n

#10.-Visualizaci贸n

Conexi贸n con Power BI

image.png

image.png

image.png

image.png

image.png

image.png

11. Modelo de Machine Learning en Spark

#11.-Modelo-de-Machine-Learning-en-Spark

parte_de_ML

EN CASO QUE LA EL TABLON TENGA LA DATA CORRECTA PODRIAN EJECUTARSE LOS SIGUIENTES COMANDOS

#EN-CASO-QUE-LA-EL-TABLON-TENGA-LA-DATA-CORRECTA-PODRIAN-EJECUTARSE-LOS-SIGUIENTES-COMANDOS

SE LEE LA DATA

#SE-LEE-LA-DATA

val df_ml_tablon = spark.read.table("trabajo4.tablon")

SE GENERA EL VECTOR

#SE-GENERA-EL-VECTOR

feature_columns = df_ml_tablon.columns:-1 #se quita la 煤ltima fila

from pyspark.ml.feature import VectorAssembler

assembler_ml = VectorAssembler(inputCols=feature_columns,outputCol="features") #Nombre de la columna de salida es features df_ml_tablon_2 = assembler_ml.transform(df_ml_tablon)

SE HACE LA SEPARACI脫N ENTRE TRAIN Y TEST

#SE-HACE-LA-SEPARACI脫N-ENTRE-TRAIN-Y-TEST

train, test = df_ml_tablon_2.randomSplit(0.7, 0.3)

SE ENTRENA EL ALGORITMO EN ESTA CASO ES RANDOM FOREST

#SE-ENTRENA-EL-ALGORITMO-EN-ESTA-CASO-ES-RANDOM-FOREST

from pyspark.ml.regression import randomforestregressor

SE ESPECIFICAN 2 COLUMNAS

#SE-ESPECIFICAN-2-COLUMNAS

random_forest_ml = LinearRegression(featuresCol="features", labelCol="date_sent_to_company")

ENTRENAMIENTO

#ENTRENAMIENTO

modelo_rf_ml = random_forest_ml.fit(train)

EVALUAMOS EL PERFORMANE

#EVALUAMOS-EL-PERFORMANE

evaluation_summary = modelo_rf_ml.evaluate(test) evaluation_summary.meanAbsoluteError evaluation_summary.rootMeanSquaredError evaluation_summary.r2