电信客服项目笔记

Posted 程序员小憨

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了电信客服项目笔记相关的知识,希望对你有一定的参考价值。

电信客服案例笔记

如需获取项目源码请点击访问:https://gitee.com/fanggaolei/learning-notes-warehouse

1.0 Hadoop准备

1.1克隆虚拟机102,103,104

(1)修改克隆虚拟机的静态IP

vim /etc/sysconfig/network-scripts/ifcfg-ens33

(2)修改主机名

vim /etc/hostname

(3)连接xshell和XFTP

1.2伪分布式的测试

(1)将下面六个文件放入/opt/software目录下

 apache-flume-1.9.0-bin.tar.gz
 
 apache-zookeeper-3.5.7-bin.tar.gz
 
 hadoop-3.1.3.tar.gz
 
 hbase-2.4.11-bin.tar.gz
 
 jdk-8u212-linux-x64.tar.gz
 
 kafka_2.12-3.0.0.tgz

(2)将JDK和Hadoop两个文件分别解压

tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/
tar -zxvf hadoop-3.1.3.tar.gz -C /opt/module/

(3)添加JDK和Hadoop的环境变量

sudo vim /etc/profile.d/my_env.sh
#JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_212
export PATH=$PATH:$JAVA_HOME/bin

#HADOOP_HOME
export HADOOP_HOME=/opt/module/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin

最后让文件生效:
source /etc/profile

(4)对虚拟机进行重启并验证hadoop和JDK是否安装成功

java -version
hadoop version Hadoop 3.1.3

可以看到

Hadoop 3.1.3  
Source code repository https://gitbox.apache.org/repos/asf/hadoop.git -r ba631c436b806728f8ec2f54ab1e289526c90579
Compiled by ztang on 2019-09-12T02:47Z
Compiled with protoc 2.5.0
From source with checksum ec785077c385118ac91aadde5ec9799
This command was run using /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-common-3.1.3.jar

java version "1.8.0_212"
Java(TM) SE Runtime Environment (build 1.8.0_212-b10)
Java HotSpot(TM) 64-Bit Server VM (build 25.212-b10, mixed mode)

跳过伪分布式启动

1.3完全分布式搭建

(1)首先进行ssh免密登录

生成公钥和私钥
[atguigu@hadoop102 .ssh]$ pwd
/home/atguigu/.ssh

[atguigu@hadoop102 .ssh]$ ssh-keygen -t rsa
然后敲(三个回车),就会生成两个文件id_rsa(私钥)、id_rsa.pub(公钥)

将公钥拷贝到要免密登录的目标机器上(每个机器上均执行以下三个命令)
[atguigu@hadoop102 .ssh]$ ssh-copy-id hadoop102
[atguigu@hadoop102 .ssh]$ ssh-copy-id hadoop103
[atguigu@hadoop102 .ssh]$ ssh-copy-id hadoop104

(2)测试成功后进行xsync配置

(a)在/home/atguigu/bin目录下创建xsync文件
[atguigu@hadoop102 opt]$ cd /home/atguigu
[atguigu@hadoop102 ~]$ mkdir bin
[atguigu@hadoop102 ~]$ cd bin
[atguigu@hadoop102 bin]$ vim xsync
在该文件中编写如下代码

#!/bin/bash

#1. 判断参数个数
if [ $# -lt 1 ]
then
    echo Not Enough Arguement!
    exit;
fi

#2. 遍历集群所有机器
for host in hadoop102 hadoop103 hadoop104
do
    echo ====================  $host  ====================
    #3. 遍历所有目录,挨个发送

    for file in $@
    do
        #4. 判断文件是否存在
        if [ -e $file ]
            then
                #5. 获取父目录
                pdir=$(cd -P $(dirname $file); pwd)

                #6. 获取当前文件的名称
                fname=$(basename $file)
                ssh $host "mkdir -p $pdir"
                rsync -av $pdir/$fname $host:$pdir
            else
                echo $file does not exists!
        fi
    done
done

(b)修改脚本 xsync 具有执行权限
[atguigu@hadoop102 bin]$ chmod +x xsync

(c)测试脚本
[atguigu@hadoop102 ~]$ xsync /home/atguigu/bin

(d)将脚本复制到/bin中,以便全局调用
[atguigu@hadoop102 bin]$ sudo cp xsync /bin/

(e)同步环境变量配置(root所有者)
[atguigu@hadoop102 ~]$ sudo ./bin/xsync /etc/profile.d/my_env.sh
注意:如果用了sudo,那么xsync一定要给它的路径补全。

让环境变量生效
[atguigu@hadoop103 bin]$ source /etc/profile
[atguigu@hadoop104 opt]$ source /etc/profile

(3)检查103 104 上的环境变量是否生效,若果没有生效可以重启虚拟机再次检查

java -version
hadoop version Hadoop 3.1.3

1.4对配置文件进行配置

(1)核心配置文件

[atguigu@hadoop102 ~]$ cd $HADOOP_HOME/etc/hadoop
[atguigu@hadoop102 hadoop]$ vim core-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <!-- 指定NameNode的地址 -->
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://hadoop102:8020</value>
    </property>

    <!-- 指定hadoop数据的存储目录 -->
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/opt/module/hadoop-3.1.3/data</value>
    </property>

    <!-- 配置HDFS网页登录使用的静态用户为atguigu -->
    <property>
        <name>hadoop.http.staticuser.user</name>
        <value>atguigu</value>  !!!记得换成自己的用户名
    </property>
</configuration>

(2)HDFS 配置文件

[atguigu@hadoop102 hadoop]$ vim hdfs-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
	<!-- nn web端访问地址-->
	<property>
        <name>dfs.namenode.http-address</name>
        <value>hadoop102:9870</value>
    </property>
	<!-- 2nn web端访问地址-->
    <property>
        <name>dfs.namenode.secondary.http-address</name>
        <value>hadoop104:9868</value>
    </property>
</configuration>

(3)YARN 配置文件

[atguigu@hadoop102 hadoop]$ vim yarn-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <!-- 指定MR走shuffle -->
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>

    <!-- 指定ResourceManager的地址-->
    <property>
        <name>yarn.resourcemanager.hostname</name>
        <value>hadoop103</value>
    </property>

    <!-- 环境变量的继承 -->
    <property>
        <name>yarn.nodemanager.env-whitelist</name>
        <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
    </property>
<!-- 开启日志聚集功能 -->
<property>
    <name>yarn.log-aggregation-enable</name>
    <value>true</value>
</property>
<!-- 设置日志聚集服务器地址 -->
<property>  
    <name>yarn.log.server.url</name>  
    <value>http://hadoop102:19888/jobhistory/logs</value>
</property>
<!-- 设置日志保留时间为7天 -->
<property>
    <name>yarn.log-aggregation.retain-seconds</name>
    <value>604800</value>
</property>
</configuration>

(4)MapReduce 配置文件

[atguigu@hadoop102 hadoop]$ vim mapred-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
	<!-- 指定MapReduce程序运行在Yarn上 -->
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
<!-- 历史服务器端地址 -->
<property>
    <name>mapreduce.jobhistory.address</name>
    <value>hadoop102:10020</value>
</property>

<!-- 历史服务器web端地址 -->
<property>
    <name>mapreduce.jobhistory.webapp.address</name>
    <value>hadoop102:19888</value>
</property>
</configuration>

(5)配置 workers

[atguigu@hadoop102 hadoop]$ vim /opt/module/hadoop-3.1.3/etc/hadoop/workers

hadoop102
hadoop103
hadoop104

同步所有配置
[atguigu@hadoop102 hadoop]$ xsync /opt/module/hadoop-3.1.3/etc

(6)初始化NameNode

[atguigu@hadoop102 hadoop-3.1.3]$ hdfs namenode -format

(7)编写myhadoop.sh脚本

[atguigu@hadoop102 ~]$ cd /home/atguigu/bin
[atguigu@hadoop102 bin]$ vim myhadoop.sh

#!/bin/bash

if [ $# -lt 1 ]
then
    echo "No Args Input..."
    exit ;
fi

case $1 in
"start")
        echo " =================== 启动 hadoop集群 ==================="

        echo " --------------- 启动 hdfs ---------------"
        ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/start-dfs.sh"
        echo " --------------- 启动 yarn ---------------"
        ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/start-yarn.sh"
        echo " --------------- 启动 historyserver ---------------"
        ssh hadoop102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon start historyserver"
;;
"stop")
        echo " =================== 关闭 hadoop集群 ==================="

        echo " --------------- 关闭 historyserver ---------------"
        ssh hadoop102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon stop historyserver"
        echo " --------------- 关闭 yarn ---------------"
        ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/stop-yarn.sh"
        echo " --------------- 关闭 hdfs ---------------"
        ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/stop-dfs.sh"
;;
*)
    echo "Input Args Error..."
;;
esac

记得添加执行权限
[atguigu@hadoop102 bin]$ chmod +x myhadoop.sh 

(8)编写jpsall脚本

[atguigu@hadoop102 ~]$ cd /home/atguigu/bin
[atguigu@hadoop102 bin]$ vim jpsal

#!/bin/bash

for host in hadoop102 hadoop103 hadoop104
do
        echo =============== $host ===============
        ssh $host jps 
done

添加脚本并分发脚本
[atguigu@hadoop102 bin]$ chmod +x jpsall
[atguigu@hadoop102 ~]$ xsync /home/atguigu/bin/
在103 104上添加执行权限

(9)启动集群

myhadoop.sh start  启动集群

jpsall      查看所有节点是否启动

(10)查看3个网址是否可以正常访问

http://hadoop102:9870
http://hadoop103:8088/
http://hadoop102:19888/jobhistory

如果节点都正常启动,页面无法访问,检查hosts文件是否修改,如果无误,可以重写启动,并进行测试

2.0 zookeeper准备

2.1解压安装

解压
[atguigu@hadoop102 software]$ tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/

更名
[atguigu@hadoop102 module]$ mv apache-zookeeper-3.5.7-bin/ zookeeper-3.5.7

2.2文件配置

配置服务器编号

(1)在/opt/module/zookeeper-3.5.7/这个目录下创建 zkData

[atguigu@hadoop102 zookeeper-3.5.7]$ mkdir zkData

(2)在/opt/module/zookeeper-3.5.7/zkData 目录下创建一个 myid 的文件

[atguigu@hadoop102 zkData]$ vi myid

在文件中添加与 server 对应的编号(注意:上下不要有空行,左右不要有空格)

2

[atguigu@hadoop102 module ]$ xsync zookeeper-3.5.7

vim /opt/module/zookeeper-3.5.7/zkData/myid
并分别在 hadoop103、hadoop104 上修改 myid 文件中内容为 3、4

(3)配置zoo.cfg文件

(1)重命名/opt/module/zookeeper-3.5.7/conf 这个目录下的 zoo_sample.cfg 为 zoo.cfg
[atguigu@hadoop102 conf]$ mv zoo_sample.cfg zoo.cfg

(2)打开 zoo.cfg 文件
[atguigu@hadoop102 conf]$ vim zoo.cfg

#修改数据存储路径配置
dataDir=/opt/module/zookeeper-3.5.7/zkData
#增加如下配置
#######################cluster##########################
server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888

(3)同步 zoo.cfg 配置文件
[atguigu@hadoop102 conf]$ xsync zoo.cfg

(4)编写启停脚本

1)在 hadoop102 的/home/atguigu/bin 目录下创建脚本
[atguigu@hadoop102 bin]$ vim zk.sh

#!/bin/bash
case $1 in
"start")
	for i in hadoop102 hadoop103 hadoop104
	do
        echo ---------- zookeeper $i 启动 ------------
		ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
	done
;;
"stop")
	for i in hadoop102 hadoop103 hadoop104
	do
        echo ---------- zookeeper $i 停止 ------------    
		ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
	done
;;
"status")
	for i in hadoop102 hadoop103 hadoop104
	do
        echo ---------- zookeeper $i 状态 ------------    
		ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
	done
;;
esac

2)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod u+x zk.sh

3)Zookeeper 集群启动脚本
[atguigu@hadoop102 module]$ zk.sh start

4)Zookeeper 集群停止脚本
[atguigu@hadoop102 module]$ zk.sh stop

输入jpsall正常启动即可

(5)zookeeper的操作

启动zookeeper客户端
[atguigu@hadoop102 zookeeper-3.5.7]$ bin/zkCli.sh -server hadoop102:2181

查看zookeeper节点信息
[zk: hadoop102:2181(CONNECTED) 0] ls /

3.0HBase准备

​ HBase 通过 Zookeeper 来做 master 的高可用、记录 RegionServer 的部署信息、并且存储有 meta 表的位置信息。

​ HBase 对于数据的读写操作时直接访问 Zookeeper 的,在 2.3 版本推出 Master Registry 模式,客户端可以直接访问 master。使用此功能,会加大对 master 的压力,减轻对 Zookeeper的压力。

3.1解压安装、环境变量

1)解压 Hbase 到指定目录
[atguigu@hadoop102 software]$ tar -zxvf hbase-2.4.11-bin.tar.gz -C /opt/module/
[atguigu@hadoop102 software]$ mv /opt/module/hbase-2.4.11 /opt/module/hbase

2)配置环境变量
[atguigu@hadoop102 ~]$ sudo vim /etc/profile.d/my_env.sh

添加
:

3)使用 source 让配置的环境变量生效
[atguigu@hadoop102 module]$ source /etc/profile.d/my_env.sh

[fang@hadoop102 hbase]$ sudo /home/fang/bin/xsync /etc/profile.d/my_env.sh 

3.2文件配置

1)hbase-env.sh 修改内容,可以添加到最后:

在hbase/conf目录下

[fang@hadoop102 conf]$ vim hbase-env.sh

最后面添加一句
export HBASE_MANAGES_ZK=false

2)hbase-site.xml 修改内容: 把原先标签内的内容删掉,直接替换成下方的即可

<property>
  <name>hbase.cluster.distributed</name>
  <value>true</value>
</property>

<property>
 <name>hbase.zookeeper.quorum</name>
 <value>hadoop102,hadoop103,hadoop104</value>
 <description>The directory shared by RegionServers.
 </description>
 </property>
 
<!-- <property>-->
<!-- <name>hbase.zookeeper.property.dataDir</name>-->
<!-- <value>/export/zookeeper</value>-->
<!-- <description> 记得修改 ZK 的配置文件 -->
<!-- ZK 的信息不能保存到临时文件夹-->
<!-- </description>-->
<!-- </property>-->

 <property>
 <name>hbase.rootdir</name>
 <value>hdfs://hadoop102:8020/hbase</value>
 <description>The directory shared by RegionServers.
 </description>
 </property>
 
 <property>
 <name>hbase.cluster.distributed</name>
 <value>true</value>
 </property>

3)修改regionservers

hadoop102 
hadoop103 
hadoop104 

4)解决 HBase 和 Hadoop 的 log4j 兼容性问题,修改 HBase 的 jar 包,使用 Hadoop 的 jar 包

[atguigu@hadoop102hbase]$mv /opt/module/hbase/lib/client-facing-thirdparty/slf4j-reload4j-1.7.33.jar /opt/module/hbase/lib/client-facing-thirdparty/slf4j-reload4j-1.7.33.jar.bak




发送到其他
xsync hbase/

5)Hbase启停

[atguigu@hadoop102 hbase]$ bin/start-hbase.sh
[atguigu@hadoop102 hbase]$ bin/stop-hbase.sh

4.0 Flume准备

4.1解压安装

解压
[atguigu@hadoop102 software]$ tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/

改名
[atguigu@hadoop102 module]$ mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume

将 lib 文件夹下的 guava-11.0.2.jar 删除以兼容 Hadoop 3.1.3
[atguigu@hadoop102 lib]$ rm /opt/module/flume/lib/guava-11.0.2.jar

4.2 配置

(1)安装 netcat 工具
[atguigu@hadoop102 software]$ sudo yum install -y nc

(2)判断 44444 端口是否被占用
[atguigu@hadoop102 flume-telnet]$ sudo netstat -nlp | grep 44444

(3)创建 Flume Agent 配置文件 flume-netcat-logger.conf
(4)在 flume 目录下创建 job 文件夹并进入 job 文件夹。
[atguigu@hadoop102 flume]$ mkdir job
[atguigu@hadoop102 flume]$ cd job/

(5)在 job 文件夹下创建 Flume Agent 配置文件 flume-netcat-logger.conf。
[atguigu@hadoop102 job]$ vim flume-netcat-logger.conf

(6)在 flume-netcat-logger.conf 文件中添加如下内容。
添加内容如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(7)先开启 flume 监听端口
[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

复制102会话

(8)使用 netcat 工具向本机的 44444 端口发送内容
[atguigu@hadoop102 ~]$ nc localhost 44444

到这就可以了,项目中会继续进行配置

5.0 Kafka准备

5.1解压安装配置环境变量

解压
[atguigu@hadoop102 software]$ tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/

改名
[atguigu@hadoop102 module]$ mv kafka_2.12-3.0.0/ kafka

进入到/opt/module/kafka 目录,修改配置文件
[atguigu@hadoop102 kafka]$ cd config/
[atguigu@hadoop102 config]$ vim server.properties

修改三个地方:记得找对相应的位置

#broker 的全局唯一编号,不能重复,只能是数字。
broker.id=0

#kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以
配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas


# 检查过期数据的时间,默认 5 分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

分发
[atguigu@hadoop102 module]$ xsync kafka/

分别在 hadoop103 和 hadoop104 上修改配置文件
[fang@hadoop102 flume]$ vim /opt/module/kafka/config/server.properties 
中的 broker.id=1、broker.id=2

在/etc/profile.d/my_env.sh 文件中增加 kafka 环境变量配置
[atguigu@hadoop102 module]$ sudo vim /etc/profile.d/my_env.sh

#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
刷新
[atguigu@hadoop102 module]$ source /etc/profile

分发并刷新
[atguigu@hadoop102 module]$ sudo /home/atguigu/bin/xsync /etc/profile.d/my_env.sh
[atguigu@hadoop103 module]$ source /etc/profile
[atguigu@hadoop104 module]$ source /etc/profile

5.2集群启动

(1)先启动 Zookeeper 集群,然后启动 Kafka。
[atguigu@hadoop102 kafka]$ zk.sh start

(2)集群脚本
1)在/home/atguigu/bin 目录下创建文件 kf.sh 脚本文件
[atguigu@hadoop102 bin]$ vim kf.sh


#! /bin/bash
case $1 in
        "start" )
        for i in hadoop102 hadoop103 hadoop104;
        do
                echo "==============$i start=============="
                ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
        done
;;
        "stop" )
        for i in hadoop102 hadoop103 hadoop104;
        do
                echo "==============$i stop=============="
                ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh"
        done
;;
esac


2)添加执行权限
[atguigu@hadoop102 bin]$ chmod +x kf.sh
3)启动集群命令
[atguigu@hadoop102 ~]$ kf.sh start
4)停止集群命令
[atguigu@hadoop102 ~]$ kf.sh stop


6.0Redis环境配置

1.将软件包上传到/opt/softwere目录下,解压到/opt/modul下

[root@linux softwere]# tar -zxvf redis-3.0.0.tar.gz -C /opt/module/

2.进入redis目录进行启动

[root@linux redis-3.0.0]# make

出现如下报错:没有找到gcc,此时需要安装gcc

[root@linux module]# yum -y install gcc

安装完成后验证是否安装成功

[root@linux module]# gcc -v

3.再次进行make出现如下报错

此时执行

#清理残余文件
make distclean

#再次进行make
make

执行成功

4.执行

make install

5.查看安装目录

[root@linux redis-3.0.0]# cd /usr/local/bin/

6.回到Redis目录备份文件

#在根目录创建文件夹
[root@linux /]# mkdir myredis

#将redis.conf移动到文件夹汇总
[root@linux redis-3.0.0]# cp redis.conf /myredis/

#在myredis目录中编辑文件
[root@linux myredis]# vi redis.conf 

#切换目录
[root@linux myredis]# cd /usr/local/bin/

#启动redis
[root@linux bin]# redis-server /myredis/redis.conf 

#查看客户端
[root@linux bin]# redis-cli -p 6379

测试

查看redis在后台是否启动

[root@linux bin]# ps -ef|grep redis

集群环境查看

此时输入:jpsall 可查看到一下节点相应信息

软件hadoop102hadoop103hadoop104
Hadoop HDFSNameNodeSecondaryNameNode
DataNodeDataNodeDataNode
Hadoop YARNResourceManager
NodeManagerNodeManagerNodeManager
Hadoop历史服务器JobHistoryServer
ZookeeperQuorumPeerMainQuorumPeerMainQuorumPeerMain
KafkaKafkaKafkaKafka
HBaseHMaster
HRegionServerHRegionServerHRegionServer
jspjspjsp

集群停止脚本:**

Hadoop启停:
myhadoop.sh start
myhadoop.sh stop

HBase启停:
bin/start-hbase.sh
bin/stop-hbase.sh

Zookeeper启停: 先启停Zookeeper再启停Kafka
zk.sh start
zk.sh stop

KafKa启停:
kf.sh start
kf.sh stop

各大网址:
http://hadoop102:9870
http://hadoop103:8088/
http://hadoop102:19888/jobhistory
http://hadoop102:16010/master-status

注意:

Hbase启动前先启动zookeeper再启动Hadoop

必须先关闭KafKa再关闭Zookeeper

7.0业务流程

7.1 模块①—数据的生产-采集-消费(P1-P13)

7.1.1创建生产者

P6尚硅谷_数据生产 - 创建生产者对象04:25

public class Bootstrap 
    public static void main(String[] args) throws IOException 


        if(args.length<2)
            System.out.println("系统参数不正确,请按照指定格式传递");
            System.exit(1);
        

        //构建生产者对象
        Producer producer=new LocalFileProducer();


//        producer.setIn(new LocalFileDataIn("D:\\\\大数据学习资料\\\\尚硅谷大数据技术之电信客服综合案例\\\\2.资料\\\\辅助文档\\\\contact.log"));
//        producer.setOut(new LocalFileDataOut("D:\\\\大数据学习资料\\\\尚硅谷大数据技术之电信客服综合案例\\\\2.资料\\\\辅助文档\\\\call.log"));


        producer.setIn(new LocalFileDataIn(args[0]));
        producer.setOut(new LocalFileDataOut(args[1]));

        //生产数据
        producer.producer();

        //关闭生产者对象
        producer.close();
    

P7尚硅谷_数据生产 - 获取通讯录数据33:32

创建输入流,将数据保存到集合中

public class LocalFileDataIn implements DataIn 

    private BufferedReader reader = null;

    /**
     * 构造方法
     * @param path
     */
    public LocalFileDataIn(String path) 
        setPath(path);
    

    /**
     * 设置路径
     * 创建输入流
     * @param path
     */
    public void setPath(String path) 
        try 
            reader = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8"));
         catch (UnsupportedEncodingException e) 
            e.printStackTrace();
         catch (FileNotFoundException e) 
            e.printStackTrace();
        
    

    public Object read() throws IOException 
        return null;
    

    /**
     * 读取数据,将数据返回到集合中
     *
     * @param <T>
     * @param clazz
     * @throws IOException
     * @return
     */
    public <T extends Data> List<T> read(Class<T> clazz) throws IOException 

        List<T> ts = new ArrayList<T>();

        try 
            //从数据文件中读取所有的数据
            String line = null;
            while ((line = reader.readLine()) != null) 
                //将数据转换为指定类型的对象,封装为集合返回
                T t = (T) clazz.newInstance();
                t.setValue(line);
                ts.add(t);
            
         catch (Exception e) 
            e.printStackTrace();
        
        return ts;
    

    /**
     * 关闭资源
     * @throws IOException
     */
    public void close() throws IOException 
      if (reader!=null)
          reader.close();
      
    

P8尚硅谷_数据生产 - 随机生成主被叫电话号码12:59

从集合中获取两条数据构成主叫和被叫,并随机生成通话 日期和时间

/**
 * 本地数据文件的生产者
 */
public class LocalFileProducer implements Producer 

    /**
     * 数据来源
     * @param in
     */
    private DataIn in;
    private DataOut out;
    private volatile boolean flg=true; //增强内存可见性

    public void setIn(DataIn in) 
          this.in=in;
    

    /**
     * 数据输出
     * @param out
     */
    public void setOut(DataOut out) 
          this.out=out;
    

    /**
     * 数据生产
     */
    public void producer() 

        /**
         * 数据返回类型为一个对象
         */
        try 

            List<Contact> contacts=in.read(Contact.class);

            //读取通讯录的数据
            while ( flg )

                int call1Index=new Random().nextInt(contacts.size());
                int call2Index;
                while (true)
                    call2Index=new Random().nextInt(contacts.size());
                    if (call1Index!=call2Index)
                       break;
                    
                

                Contact call1=contacts.get(call1Index);
                Contact call2=contacts.get(call2Index);

                //生成随机的通话时间
                String startDate="20180101000000";  //开始时间
                String endDate="20190101000000";    //结束时间

                long startTime= DataUtil.parse(startDate,"yyyyMMddHHmmss").getTime();
                //通话时间
                long endtime=DataUtil.parse(endDate,"yyyyMMddHHmmss").getTime();

                //通话时间字符串
                long  calltime=startTime+(long)((endtime-startTime)* Math.random());
                //通话时间字符串
                String callTimeString=DataUtil.format(new Date(calltime),"yyyyMMddHHmmss");

                //生成随机的通话时长
                 String duration= NumberUtil.format(new Random().nextInt(3000),4);

                //生成通话记录
                Callog log=new Callog(call1.getTel(),call2.getTel(),callTimeString,duration);
                System.out.println(log);
                //将通话记录写入到数据文件中
                out.write(log);
                Thread.sleep(500);
            
        catch (Exception e)
            e.printStackTrace();
        
    

    /**
     * 关闭生产者
     * @throws IOException
     */
    public void close() throws IOException 

        if (in!=null)

            in.close();
        
        if (out!=null)
            out.close();
        

    


P9尚硅谷_数据生产 - 构建通话记录25:40

将数据封装为对象输出到通话日志中

/**
 * 本地文件数据输出
 */
public class LocalFileDataOut implements DataOut 

    private PrintWriter writer=null;

    public LocalFileDataOut(String path)
        setPath(path);
    
    /**
     * 设置路径
     * @param path
     */
    public void setPath(String path) 
        try 
            writer=new PrintWriter(new OutputStreamWriter(new FileOutputStream(path),"UTF-8"));
         catch (UnsupportedEncodingException e) 
            e.printStackTrace();
         catch (FileNotFoundException e) 
            e.printStackTrace();
        

    

    public void write(Object data) throws Exception 
         write(data.toString());
    

    /**
     * 将数据字符串生成到文件中
     * @param data
     * @throws Exception
     */
    public void write(String data) throws Exception 
         writer.println(data);
         writer.flush();
    

    /**
     * 释放资源
     * @throws IOException
     */
    public void close() throws IOException 

        if(writer!=null)
            writer.close();
        
    


public class Callog 
    private String call1;
    private String call2;
    private String calltime;
    private String duration;

    public Callog(String call1, String call2, String calltime, String duration) 
        this.call1 = call1;
        this.call2 = call2;
        this.calltime = calltime;
        this.duration = duration;
    

    @Override
    public String toString() 
        return call1+"\\t"+call2+"\\t"+calltime+"\\t"+duration;

    

    public String getCall1() 
        return call1;
    

    public void setCall1(String call1) 
        this.call1 = call1;
    

    public String getCall2() 
        return call2;
    

    public void setCall2(String call2) 
        this.call2 = call2;
    

    public String getCalltime() 
        return calltime;
    

    public void setCalltime(String calltime) 
        this.calltime = calltime;
    

    public String getDuration() 
        return duration;
    

    public void setDuration(String duration) 
        this.duration = duration;
    

7.1.3数据采集和消费

配置Flume的配置文件监听日志信息和Kafka输出存储区

在kafka创建一个ct的数据存储区开启kafka获取到数据

到指定目录下配置flume

[fang@hadoop102 kafka]$ cd /opt/module/data/
[fang@hadoop102 data]$ vim flume-2-kafka.conf
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/module/data/call.log
a1.sources.r1.shell = /bin/bash -c

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = ct
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动jar包:

[fang@hadoop102 data]$ java -jar ct_producer.jar contact.log call.log 

在kafka目录下创建topic:

bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic ct --partitions 3 --replication-factor 2

kafka开始消费

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 -topic ct

启动flume对数据进行采集:

bin/flume-ng agent -c conf/ -n a1 -f /opt/module/data/flume-2-kafka.conf 

使用Kafka消费者API获取数据,并将数据发送到Hbase中

/**
 * 启动Kafka消费者
 *
 */
//使用kafka消费者获取Flume采集数据

//将数据存储到HBase中
public class Bootstrap 
    public static void main(String[] args) throws IOException 

        //创建消费者
        Consumer consumer=new CallogConsumer();

        //消费数据
        consumer.consume();

        //关闭资源
        consumer.close();

    

/**
 * 通话日志消费者
 */
public class CallogConsumer implements Consumer 

    /**
     * 消费数据
     */
    public void consume() 

        try 
            //创建配置对象
            Properties prop =new Properties();
            prop.load(Thread.currentThread().getContextClassLoader().getResourceAsStream("consumer.properties"));

            //获取flume采集的数据
            KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(prop);

            //关注主题
            consumer.subscribe(Arrays.asList(Names.TOPIC.getValue()));

            //HBase数据访问对象
            HBaseDao dao=new HBaseDao();

            dao.init();//初始化HBase

            //消费数据
            while (true)
                ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) 
                    System.out.println(consumerRecord.value());
                    dao.insertData(consumerRecord.value());//将数据插入HBase中
                    //Callog log=new Callog(consumerRecord.value());

                    //dao.insertData(log);
                
            
         catch (IOException e) 
            e.printStackTrace();
         catch (Exception e) 
            e.printStackTrace();
        
    

    /**
     * 关闭资源
     * @throws IOException
     */
    public void close() throws IOException 

    

7.2 模块②—HBase数据消费(P14-P22)

7.2.1建表

P14尚硅谷__数据消费 - Hbase数据访问封装32:21

在Hbase中创建对应的表格,设置两个列族和rowkey

/**
 * HBase的数据访问对象
 */
public class HBaseDao extends BaseDao 
    /**
     * 初始化
     */
    public void init() throws Exception 
        start();

        //创建命名空间ct
        createNamespaseNX(Names.NAMESPACE.getValue());

        //创建表名ct:calllog 分区数为6
        createTableXX(Names.TABLE.getValue(),"com.fang.ct.consumer.coprocessor.InsertCalleeCoprocessor", 6,Names.CF_CALLER.getValue(),Names.CF_CALLEE.getValue());

        end();
    

    /**
     * 插入对象
     * @param log
     * @throws Exception
     */
    public void insertData(Callog log) throws Exception
        log.setRowkey(genRegionNum(log.getCall1(),log.getCalltime())+"_"+log.getCall1()+"_"+log.getCalltime()+"_"+log.getCall2()+"_"+log.getDuration());

        putData(log);

    


    /**
     * 插入数据
     */
    public void insertData(String value) throws Exception

        //将通话日志保存到Hbase的表中

        //1.获取通话日志数据
        String[] values= value.split("\\t");
        String call1=values[0];
        String call2=values[1];
        String calltime=values[2];
        String duration=values[3];

        //2.创建数据对象
        //rowkey设计

        //主叫用户
        String rowkey=genRegionNum(call1,calltime)+"_"+call1+"_"+calltime+"_"+call2+"_"+duration+ "_1";
        Put put =new Put(Bytes.toBytes(rowkey));
        byte[] family=Bytes.toBytes(Names.CF_CALLER.getValue());
        put.addColumn(family,Bytes.toBytes("call1"),Bytes.toBytes(call1));
        put.addColumn(family,Bytes.toBytes("call2"),Bytes.toBytes(call2));
        put.addColumn(family,Bytes.toBytes("calltime"),Bytes.toBytes(calltime));
        put.addColumn(family,Bytes.toBytes("duration"),Bytes.toBytes(duration));
        put.addColumn(family, Bytes.toBytes("flg"), Bytes.toBytes("1"));


       String calleeRowkey = genRegionNum(call2, calltime) + "_" + call2 + "_" + calltime + "_" + call1 + "_" + duration + "_0";

//        // 被叫用户
//        Put calleePut = new Put(Bytes.toBytes(calleeRowkey));
//        byte[] calleeFamily = Bytes.toBytes(Names.CF_CALLEE.getValue());
//        calleePut.addColumn(calleeFamily, Bytes.toBytes("call1"), Bytes.toBytes(call2));
//        calleePut.addColumn(calleeFamily, Bytes.toBytes("call2"), Bytes.toBytes(call1));
//        calleePut.addColumn(calleeFamily, Bytes.toBytes("calltime"), Bytes.toBytes(calltime));
//        calleePut.addColumn(calleeFamily, Bytes.toBytes("duration"), Bytes.toBytes(duration));
//        calleePut.addColumn(calleeFamily, Bytes.toBytes("flg"), Bytes.toBytes("0"));



        // 3. 保存数据
        List<Put> puts = new ArrayList<Put>();
        puts.add(put);
       // puts.add(calleePut);

        putData(Names.TABLE.getValue(), puts);
    

获取连接对象,设置表的相关方法,生成分区键,分区号,数据的插入,删除等

/**
 * 基础的数据访问对象
 */
public abstract class BaseDao 


    /**
     *
     */
    private ThreadLocal<Connection> connHolder=new ThreadLocal<Connection>();
    private ThreadLocal<Admin> adminHolder=new ThreadLocal<Admin>();

    protected void start() throws Exception
        getConnection();
        getAdmin();
    

    protected void end() throws Exception
        Admin admin=getAdmin();
        if (admin!=null)
            admin.close();
            adminHolder.remove();
        
        Connection conn=getConnection();
        if (conn!=null)
            conn.close();;
            connHolder.remove();
        

    

    /**
     * 创建表,如果表已经存在,name删除后创建新的
     * @param name
     * @param family
     * @throws IOException
     */
    protected void createTableXX(String name,String... family) throws Exception 
        createTableXX(name,null,null,family);
    

    protected void createTableXX(String name,String coprocessorClass,Integer regionCount,String... family) throws Exception 
        Admin admin=getAdmin();
        TableName tableName= TableName.valueOf(name);

        if (admin.tableExists(tableName))
            //表存在,删除表
            deleteTable(name);
        

        //创建表
        createTable(name,coprocessorClass,regionCount,family);
    


    private void createTable(String name,String coprocessorClass,Integer regionCount,String... family) throws Exception
        Admin admin=getAdmin();
        TableName tablename=TableName.valueOf(name);

        HTableDescriptor tableDescriptor=new HTableDescriptor(tablename);
        
        if (family==null||family.length==0)
            family=new String[1];
            family[0]= Names.CF_INFO.getValue();

        
        for (String families : family) 
            HColumnDescriptor columnDescriptor=
                    new HColumnDescriptor(families);
            tableDescriptor.addFamily(columnDescriptor);
        

        if(coprocessorClass!=null&& !"".equals(coprocessorClass))
            tableDescriptor.addCoprocessor(coprocessorClass);
        


        //增加预分区
        if(regionCount==null||regionCount<=0)
            admin.createTable(tableDescriptor);
        else 
            //分区键
            byte[][] splitKeys=genSpliKeys(regionCount);
            admin.createTable(tableDescriptor,splitKeys);
        
    

    /**
     * 获取查询时startrow,stoprom集合
     * @return
     */
    protected  List<String[]> getStartStorRowkeys(String tel,String start,String end)
        List<String[]> rowkeyss=new ArrayList<String[]>();

        String startTime = start.substring(0, 6);
        String endTime = end.substring(0, 6);

        Calendar startCal = Calendar.getInstance();
        startCal.setTime(DataUtil.parse(startTime, "yyyyMM"));

        Calendar endCal = Calendar.getInstance();
        endCal.setTime(DataUtil.parse(endTime, "yyyyMM"));

        while (startCal.getTimeInMillis() <= endCal.getTimeInMillis()) 

            // 当前时间
            String nowTime = DataUtil.format(startCal.getTime(), "yyyyMM");

            int regionNum = genRegionNum(tel, nowTime);

            String startRow = regionNum + "_" + tel + "_" + nowTime;
            String stopRow = startRow + "|";

            String[] rowkeys = startRow, stopRow;
            rowkeyss.add(rowkeys);

            // 月份+1
            startCal.add(Calendar.MONTH, 1);
        
        return  rowkeyss;

    
    /**
     * 计算分区号
     * @param tel
     * @param date
     * @return
     */
    protected  int genRegionNum(String tel,String date)
        String usercode=tel.substring(tel.length()-4);
        String yearMonth=date.substring(0,6);
        int userCodehash=usercode.hashCode();
        int yearMonthHash=yearMonth.hashCode();

        //crc校验采用异或算法
        int crc=Math.abs(userCodehash ^ yearMonthHash);
        //取模
        int regionNum=crc% ValueConstant.REGION_COUNT;
        return regionNum;
    

    /**
     * 生成分区键
     * @return
     */
    private  byte[][] genSpliKeys(int regionCount)

        int splitkeyCount=regionCount-1;
        byte[][] bs=new byte[splitkeyCount][];
        //0,1,2,3,4
        List<byte[]> bsList=new ArrayList<byte[]>();
        for (int i = 0; i < splitkeyCount; i++) 
            String splitkey=i+"|";
            bsList.add(Bytes.toBytes(splitkey));
        

        //Collections.sort(bsList,new Bytes.ByteArrayComparator());

        bsList.toArray(bs);
        return bs;
    




    /**
     * 增加对象:自动封装数据,将对象数据直接保存到Hbase中
     * @param obj
     * @throws Exception
     */
    protected void putData(Object obj) throws Exception
        // 反射
        Class clazz = obj.getClass();
        TableRef tableRef = (TableRef)clazz.getAnnotation(TableRef.class);
        String tableName = tableRef.value();

        Field[] fs = clazz.getDeclaredFields();
        String stringRowkey = "";
        for (Field f : fs) 
            Rowkey rowkey = f.getAnnotation(Rowkey.class);
            if ( rowkey != null ) 
                f.setAccessible(true);
                stringRowkey = (String)f.get(obj);
                break;
            
        

        //获取表对象
        Connection connection=getConnection();
        Table table = connection.getTable(TableName.valueOf(tableName));
        Put put=new Put(Bytes.toBytes(stringRowkey));
        for (Field f : fs) 
            Column column = f.getAnnotation(Column.class);
            if (column != null) 
                String family = column.family();
                String colName = column.column();
                if (colName == null || "".equals(colName)) 
                    colName = f.getName();
                
                f.setAccessible(true);
                String value = (String) f.get(obj);

                put.addColumn(Bytes.toBytes(family), Bytes.toBytes(colName), Bytes.toBytes(value));

            
        
        //增加数据
        table.put(put);
        //关闭表
        table.close();

    


    /**
     * 增加多条数据
     * @param name
     * @param puts
     * @throws IOException
     */
    protected void putData( String name, List<Put> puts ) throws Exception 

        // 获取表对象
        Connection conn = getConnection();
        Table table = conn.getTable(TableName.valueOf(name));

        // 增加数据
        table.put(puts);

        // 关闭表
        table.close();
    

    /**
     * 增加数据
     * @param name
     * @param put
     */

    protected  void putData(String name, Put put) throws IOException 
        //获取表对象
        Connection connection=getConnection();
        Table table = connection.getTable(TableName.valueOf(name));
        //增加数据
        table.put(put);
        //关闭表
        table.close();
    

    /**
     * 删除表格
     * @param name
     * @throws Exception
     */
    protected  void deleteTable(String name) throws Exception
        TableName tableName=TableName.valueOf(name);
        Admin admin=getAdmin();
        admin.disableTable(tableName);
        admin.deleteTable(tableName);
    

    /**
     * 创建命名空间,如果命名空间已经存在,不需要创建,否则创建新的
     * @param namespace
     */
    protected void createNamespaseNX(String namespace) throws IOException 

        Admin admin =getAdmin();

        try
            admin.getNamespaceDescriptor(namespace);
        catch (NamespaceNotFoundException e)

           admin.createNamespace(NamespaceDescriptor.create(namespace).build());
        
    

    /**
     * 获取管理对象
     */
    protected synchronized Admin getAdmin() throws IOException 
        Admin admin = adminHolder.get();
        if(admin==null)
            admin=getConnection().getAdmin();
            adminHolder.set(admin);
        
        return admin;
    

    /**
     * 获取连接对象
     * @return
     */
    protected synchronized Connection getConnection() throws IOException 
        Connection conn=connHolder.get();
        if(conn==null)
            Configuration conf= HBaseConfiguration.create();
            conn= ConnectionFactory.createConnection(conf);
            connHolder.set(conn);
        

        return conn;
    



7.2.4配置协处理器

用于区分主叫和被叫,减少数据的插入量

/**
 *
 * 使用协处理器保存被叫用户的数据
 *
 * 协处理器的使用
 * 1. 创建类
 * 2. 让表找到协处理类(和表有关联)
 * 3. 将项目打成jar包发布到hbase中(关联的jar包也需要发布),并且需要分发
 */
public class InsertCalleeCoprocessor extends BaseRegionObserver 

    // 方法的命名规则
    // login
    // logout
    // prePut
    // doPut :模板方法设计模式
    //    存在父子类:
    //    父类搭建算法的骨架
    //    1. tel取用户代码,2时间取年月,3,异或运算,4 hash散列
    //    子类重写算法的细节
    //    do1. tel取后4位,do2,201810, do3 ^, 4, % &
    // postPut

    /**
     * 保存主叫用户数据之后,由Hbase自动保存被叫用户数据
     * @param e
     * @param put
     * @param edit
     * @param durability
     * @throws IOException
     */
    @Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException 

        // 获取表
        Table table = e.getEnvironment().getTable(TableName.valueOf(Names.TABLE.getValue()));

        // 主叫用户的rowkey
        String rowkey = Bytes.toString(put.getRow());
        // 1_133_2019_144_1010_1
        String[] values = rowkey.split("_");

        CoprocessorDao dao = new CoprocessorDao();
        String call1 = values[1];
        String call2 = values[3];
        String calltime = values[2];
        String duration = values[4];
        String flg = values[5];

        if ( "1".equals(flg) ) 
            // 只有主叫用户保存后才需要触发被叫用户的保存
            String calleeRowkey = dao.getRegionNum(call2, calltime) + "_" + call2 + "_" + calltime + "_" + call1 + "_" + duration + "_0";

            // 保存数据
            Put calleePut = new Put(Bytes.toBytes(calleeRowkey));
            byte[] calleeFamily = Bytes.toBytes(Names.CF_CALLEE.getValue());
            calleePut.addColumn(calleeFamily, Bytes.toBytes("call1"), Bytes.toBytes(call2));
            calleePut.addColumn(calleeFamily, Bytes.toBytes("call2"), Bytes.toBytes(call1));
            calleePut.addColumn(calleeFamily, Bytes.toBytes("calltime"), Bytes.toBytes(calltime));
            calleePut.addColumn(calleeFamily, Bytes.toBytes("duration"), Bytes.toBytes(duration));
            calleePut.addColumn(calleeFamily, Bytes.toBytes("flg"), Bytes.toBytes("0"));
            table.put( calleePut );


        
        // 关闭表
        table.close();

    

    private class CoprocessorDao extends BaseDao 

        public int getRegionNum(String tel, String time) 
            return genRegionNum(tel, time);
        
    

7.3 模块③—导出数据及运算(P23-P29)

7.3.1数据库表设计

7.3.2数据计算

用于map和reduce,通过对应的方法获取Hbase的数据

/**
 * 分析数据工具类
 */
public class AnalysisTextTool implements Tool 

    public int run(String[] args) throws Exception 

        Job job = Job.getInstance();
        job.setJarByClass(AnalysisTextTool.class);


        //从Hbase中读去数据
        Scan scan = new Scan();
        scan.addFamily(Bytes.toBytes(Names.CF_CALLER.getValue()));

        // mapper
        TableMapReduceUtil.initTableMapperJob(
                Names.TABLE.getValue(),
                scan,
                AnalysisTextMapper.class,
                Text.class,
                Text.class,
                job
        );

        // reducer
        job.setReducerClass(AnalysisTextReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);


        job.setOutputFormatClass(mysqlRedisTextOutputFormat.class);

        boolean flg = job.waitForCompletion(true);
        if ( flg ) 
            return JobStatus.State.SUCCEEDED.getValue();
         else 
            return JobStatus.State.FAILED.getValue();
        
    

    public void setConf(Configuration configuration) 

    

    public Configuration getConf() 
        return null;
    


将从Hbase中获取到的数据进行拆封,封装为集合

/**
 * maper
 */
public class AnalysisTextMapper extends TableMapper<Text, Text> 
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException 

        String rowkey = Bytes.toString(key.get());

        String[] values = rowkey.split("_");

        String call1 = values[1];
        String call2 = values[3];
        String calltime = values[2];
        String duration = values[4];

        String year = calltime.substring(0, 4);
        String month = calltime.substring(0, 6);
        String date= calltime.substring(0, 8);

        // 主叫用户 - 年
        context.write(new Text(call1+"_"+year), new Text(duration));
        // 主叫用户 - 月
        context.write(new Text(call1+"_"+month), new Text(duration));
        // 主叫用户 - 日
        context.write(new Text(call1+"_"+date), new Text(duration));

        // 被叫用户 - 年
        context.write(new Text(call2+"_"+year), new Text(duration));
        // 被叫用户 - 月
        context.write(new Text(call2+"_"+month), new Text(duration));
        // 被叫用户 - 日
        context.write(new Text(call2+"_"+date), new Text(duration));
    


获取到Mapper的数据对数据进行对应的运算

/**
 * 分析数据的Reducer
 */
public class AnalysisTextReducer extends Reducer<Text,Text,Text,Text>

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException 

        int sumCall = 0;
        int sumDuration = 0;

        for (Text value : values) 
            int duration = Integer.parseInt(value.toString());
            sumDuration = sumDuration + duration;

            sumCall++;
        
        context.write(key, new Text(sumCall + "_" + sumDuration));
    


将数据上传到Mysql,并通过Redis对数据表进行缓存

/**
 * Mysql的数据格式化输出对象
 */
public class MySQLTextOutputFormat extends OutputFormat<Text, Text> 


    protected static class MySQLRecordWriter extends RecordWriter<Text,Text> 

        private Connection connection =null;
        private Map<String,Integer> userMap=new HashMap<String, Integer>();
        Map<String, Integer> dateMap = new HashMap<String, Integer>();


        public MySQLRecordWriter() 
            // 获取资源
            connection = JDBCUtil.getConnection();
            PreparedStatement pstat = null;
            ResultSet rs = null;

            try 

                String queryUserSql = "select id, tel from ct_user";
                pstat = connection.prepareStatement(queryUserSql);
                rs = pstat.executeQuery();
                while ( rs.next() ) 
                    Integer id = rs.getInt(1);
                    String tel = rs.getString(2);
                    userMap.put(tel, id);
                

                rs.close();

                String queryDateSql = "select id, year, month, day from ct_date";//将整张表中的数据查出
                pstat = connection.prepareStatement(queryDateSql);
                rs = pstat.executeQuery();
                while ( rs.next() ) 
                    Integer id = rs.getInt(1);
                    String year = rs.getString(2);
                    String month = rs.getString(3);
                    if ( month.length() == 1) 
                        month = "0" + month;
                    
                    String day = rs.getString(4);
                    if ( day.length() == 1 ) 
                        day = "0" + day;
             

以上是关于电信客服项目笔记的主要内容,如果未能解决你的问题,请参考以下文章

大数据 电信客服项目

大数据电信客服-数据生产

请问怎么查询一个手机号码的区号?

377 是哪个国家的区号

大数据开发实战系列之电信客服

中国主要城市长途区号一览