项目实训第十天

本文最后更新于:2021年7月23日 晚上


电信日志分析

数据采集

概述

  1. 在实际过程中,流量日志并不是集中在一台服务器上而是放在了多台服务器上,此时需要考虑将数据先从多台服务器上收集过来其中到HDFS上
  2. 在收集日志的时候,可以考虑使用日志收集框架Flume、Scribe等

步骤

  1. 将三台虚拟机启动,将后两台虚拟机作为日志产生的服务器,第一台虚拟机作为日志进行汇聚的服务器
  2. 在第二台和第三台虚拟机上,上传日志文件
1
2
3
4
5
cd /opt
mkdir telecomlog
cd telecomlog/
rz
# 上传日志文件
  1. 第二台和第三台虚拟机上做日志收集
1
2
cd /opt/flume-1.9.0/data
vim telecomlog.conf

在文件中添加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
a1.sources = s1
a1.channels = c1
a1.sinks = k1

# 配置Source
# 如果监听的目录下产生了新的文件
# 那么需要将新的文件的内容来自动收集
a1.sources.s1.type = spooldir
# 指定要监听的目录
a1.sources.s1.spoolDir = /opt/telecomlog

# 配置Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000

# 配置Sink
# 需要将收集到的数据汇聚到第一个节点上
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop
a1.sinks.k1.port = 8090

# 绑定
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
  1. 在第一台虚拟机上来汇聚数据到HDFS上
1
2
cd /opt/flume-1.9.0/data/
vim telecomlog.conf

在文件中添加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
a1.sources = s1
a1.channels = c1
a1.sinks = k1

# 配置Source
# 接收其他节点发送来的数据
a1.sources.s1.type = avro
# 指定要监听的主机
a1.sources.s1.bind = 0.0.0.0
# 指定要监听的端口
a1.sources.s1.port = 8090
# 添加一个拦截器用于标记时间戳
a1.sources.s1.interceptors = i1
a1.sources.s1.interceptors.i1.type = timestamp

# 配置Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000

# 配置Sink
# 将数据写到HDFS上
a1.sinks.k1.type = hdfs
# 指定在HDFS上的存储位置
a1.sinks.k1.hdfs.path = hdfs://hadoop:9000/telecomlog/reporttime=%Y-%m-%d
# 指定文件在HDFS上的存储类型
a1.sinks.k1.hdfs.fileType = DataStream
# 指定文件的滚动间隔时间
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.rollCount = 0

# 绑定
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
  1. 在第一个虚拟机上启动HDFS
1
start-dfs.sh
  1. 先启动第一个虚拟机上的Flume
1
flume-ng agent -n a1 -c $FLUME_HOME/conf -f telecomlog.conf -Dflume.root.logger=INFO,console
  1. 再启动第二个虚拟机上的Flume
1
flume-ng agent -n a1 -c $FLUME_HOME/conf -f telecomlog.conf -Dflume.root.logger=INFO,console

数据清洗

概述

  1. 将数据收集到HDFS上之后,需要对数据进行处理,但是数据的字段相对比较多,并不是所有的字段都需要处理,那么此时就需要对原始数据进行分析,这个过程称之为数据清洗
  2. 数据清洗的时候,如果过程相对简单,那么可以考虑使用SQL工具(例如Hive,Kettle等)来清洗,如果数据处理过程相对比较复杂,那么可以考虑使用计算框架(例如MapReduce,Spark,Flink等)来清洗处理

步骤

  1. 需要先开启YARN
1
start-yarn.sh
  1. 启动Hive后台进程
1
2
hive --service hiveserver2 &
hive --service metastore &
  1. 启动Hive
1
hive
  1. 在Hive建库建表
1
2
3
4
5
6
7
8
9
10
11
# 建库
create database telecom;
# 使用这个库
use telecom;
# 建表
create EXTERNAL table telecom (a1 string, a2 string, a3 string, a4 string, a5 string, a6 string, a7 string, a8 string, a9 string, a10 string, a11 string, a12 string, a13 string, a14 string, a15 string, a16 string, a17 string, a18 string, a19 string, a20 string, a21 string, a22 string, a23 string, a24 string, a25 string, a26 string, a27 string, a28 string, a29 string, a30 string, a31 string, a32 string, a33 string, a34 string, a35 string, a36 string, a37 string, a38 string, a39 string, a40 string, a41 string, a42 string, a43 string, a44 string, a45 string, a46 string, a47 string, a48 string, a49 string, a50 string, a51 string, a52 string, a53 string, a54 string, a55 string, a56 string, a57 string, a58 string, a59 string, a60 string, a61 string, a62 string, a63 string, a64 string, a65 string, a66 string, a67 string, a68 string, a69 string, a70 string, a71 string, a72 string, a73 string, a74 string, a75 string, a76 string, a77 string)
partitioned by (reporttime string) row format delimited fields terminated by '|' stored as textfile location '/telecomlog';
# 修复分区
msck repair table telecom;
# 抽样数据
select * from telecom tablesample(5 rows);
  1. 原表中有78个字段,但是对需求分析真正有用的只有23个字段,此时需要从这78个字段中将这23个字段抽取出来 - 清洗表
1
2
3
4
5
6
# 建立表
create table dataclear(reporttime string, appType bigint, appSubtype bigint, userIp string, userPort bigint, appServerIP string, appServerPort bigint, host string, cellid string, appTypeCode bigint, interruptType String, transStatus bigint, trafficUL bigint, trafficDL bigint, retranUL bigint, retranDL bigint, procdureStartTime bigint, procdureEndTime bigint)row format delimited fields terminated by '|';
# 需要从原始表中来抽取字段
insert overwrite table dataclear select concat(reporttime, ' ', '00:00:00'), a23, a24, a27, a29, a31, a33, a59, a17, a19, a68, a55, a34, a35, a40, a41, a20, a21 from telecom;
# 抽样数据
select * from dataclear tablesample(5 rows);
  1. 抽取完字段之后,需要对数据进行整理,例如需要对数据进行合并、去重、转换、补齐、舍弃等 - 对数据整理,建立一个事实表
序号 字段 字段类型 描述
0 reportTime datetime 小时 时间片 default ‘YYYY-MM-DD HH24:MI:SS’
1 appType int 应用大类
2 appSubtype int 应用小类
3 userIP varchar(20) 用户IP
4 userPort int 用户端口
5 appServerIP varchar(20) 服务器IP
6 appServerPort int 服务器端口
7 host varchar(50) 域名
8 cellid varchar(20) 小区ID
9 attempts int(20) 尝试次数
10 accepts int(20) 接受次数
11 trafficUL int(20) 上行流量
12 trafficDL int(20) 下行流量
13 retranUL int(20) 重传上行报文数
14 retranDL int(20) 重传下行报文数
15 failCount int(20) 延时失败次数
16 transDelay int(20) 传输时延*
1
2
3
4
5
6
# 构建一个事实表
create table f_http_app_host(reporttime string, appType bigint, appSubtype bigint, userIP string, userPort int, appServerIP string, appServerPort int, host string, cellid string, attempts bigint, accepts bigint, trafficUL bigint, trafficDL bigint, retranUL bigint, retranDL bigint, failCount bigint, transDelay bigint) row format delimited fields terminated by '|' stored as textfile;
# 抽取字段
insert overwrite table f_http_app_host select reporttime, appType, appSubtype, userIp, userPort, appServerIP, appServerPort, host, if(cellid == '', "000000000", cellid), if(appTypeCode == 103, 1, 0), if(appTypeCode == 103 and find_in_set(transStatus, "10,11,12,13,14,15,32,33,34,35,36,37,38,48,49,50,51,52,53,54,55,199,200,201,202,203,204,205,206,302,304,306")!=0 and interruptType == 0, 1, 0), if(apptypeCode == 103, trafficUL, 0), if(apptypeCode == 103, trafficDL, 0), if(apptypeCode == 103, retranUL, 0), if(apptypeCode == 103, retranDL, 0), if(appTypeCode == 103 and transStatus == 1 and interruptType == 0, 1, 0), if(appTypeCode == 103, procdureEndTime - procdureStartTime, 0) from dataclear;
# 数据抽样
select * from f_http_app_host tablesample(5 rows);
7. 根据不同需求将数据来抽取出来,例如分析各个APP的受欢迎程度
序号 字段 字段类型 描述
0 hourid datetime 小时时间片
1 appType int 应用大类
2 appSubtype int 应用小类
3 attempts int(20) 尝试次数
4 accepts int(20) 接受次数
5 succRatio double 尝试成功率
6 trafficUL int(20) 上行流量
7 trafficDL int(20) 下行流量
8 totalTraffic int(20) 总流量
9 retranUL int(20) 重传上行报文数
10 retranDL int(20) 重传下行报文数
11 retranTraffic int(20) 重传报文数据
12 failCount int(20) 延时失败次数
13 transDelay int(20) 传输时延
1
2
3
4
5
6
7
8
# 建立受欢迎的APP的表
create table D_H_HTTP_APPTYPE(hourid string, appType bigint, appSubtype bigint, attempts bigint, accepts bigint, succRatio double, trafficUL bigint, trafficDL bigint, totalTraffic bigint, retranUL bigint, retranDL bigint, retranTraffic bigint, failCount bigint, transDelay bigint) row format delimited fields terminated by '|' stored as textfile;
# 从事实表中来抽取字段到要分析的表中
insert overwrite table D_H_HTTP_APPTYPE select reporttime, apptype, appsubtype, sum(attempts), sum(accepts), round(sum(accepts)/sum(attempts), 2), sum(trafficUL), sum(trafficDL), sum(trafficUL)+sum(trafficDL), sum(retranUL), sum(retranDL), sum(retranUL)+sum(retranDL), sum(failCount), sum(transDelay)from f_http_app_host group by reporttime, apptype, appsubtype;
# 数据抽样
select * from D_H_HTTP_APPTYPE tablesample(5 rows);
# 获取最受欢迎的APP - 统计每一个APP的流量
select appSubtype, sum(totalTraffic) as total from D_H_HTTP_APPTYPE group by appSubtype sort by total desc limit 5;

数据导出

概述

  1. 在实际过程中,在对数据处理完成之后,一般需要对数据进行可视化操作
  2. 如果在进行可视化操作的时候使用的是一些基于Hadoop的BI工具,那么可以直接从HDFS上来读取数据;如果使用开源的可视化工具,那么此时需要需要将数据导出到数据库中

Sqoop

  1. Sqoop是Apache提供的一套用于进行数据导入导出的工具,可以在HDFS和数据库之间实现数据的导入和导出效果
  2. 安装步骤
  3. 进入/opt目录下,上传sqoop的安装包
1
2
3
cd /opt
rz
# 选择Sqoop的安装包上传
  1. 解压
1
tar -xvf sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz
  1. 重命名
1
mv sqoop-1.4.7.bin__hadoop-2.6.0 sqoop-1.4.7
  1. 进入Sqoop的配置目录
1
cd sqoop-1.4.7/conf/
  1. 编辑文件
1
2
3
4
5
6
7
8
9
10
# 复制文件
cp sqoop-env-template.sh sqoop-env.sh
# 编辑文件
vim sqoop-env.sh
# 在文件中添加
export HADOOP_COMMON_HOME=/opt/hadoop-3.1.3
export HADOOP_MAPRED_HOME=/opt/hadoop-3.1.3
export HIVE_HOME=/opt/hive-3.1.2
# 保存退出,重新生效
source sqoop-env.sh
  1. 进入Sqoop的lib目录,将MySQL的连接驱动包放到这个目录下
1
2
3
cd ../lib
# 复制
cp /opt/hive-3.1.2/lib/mysql-connector-java-5.1.27.jar ./
  1. 编辑环境变量
1
2
3
4
5
6
7
8
vim /etc/profile.d/sqoophome.sh
# 在文件中添加
export SQOOP_HOME=/opt/sqoop-1.4.7
export PATH=$PATH:$SQOOP_HOME/bin
# 保存退出,重新生效
source /etc/profile.d/sqoophome.sh
# 查看Sqoop的版本
sqoop version
  1. Sqoop基本命令
  2. 查看MySQL中已经存在的库
1
sqoop list-databases --connect jdbc:mysql://hadoop:3306 --username root --password root
  1. 查看MySQL指定库中的指定的表
1
sqoop list-tables --connect jdbc:mysql://hadoop:3306/hive --username root --password root
  1. 将HDFS上的数据导出到MySQL中
    1. 在MySQL中建表
    1
    create table orders(id int primary key, orderdate varchar(10), productid int, num int);
     2. 将HDFS上的数据导出到MySQL中
    
    1
    sqoop export --connect jdbc:mysql://hadoop:3306/test --username root --password root --export-dir '/txt/order.txt' --table orders -m 1 --fields-terminated-by ' ';
  2. 将MySQL中的数据导入到HDFS上
1
sqoop import --connect jdbc:mysql://hadoop:3306/test --username root --password root --table orders --target-dir '/sqoop/orders' --fields-terminated-by '\t' -m 1;

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!