flume + kafka + sparkStreaming + HDFS 构建实时日志分析系统
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flume + kafka + sparkStreaming + HDFS 构建实时日志分析系统相关的知识,希望对你有一定的参考价值。
一、Hadoop配置安装
注意:apache提供的hadoop-2.6.0的安装包是在32位操作系统编译的,因为hadoop依赖一些C++的本地库,
所以如果在64位的操作上安装hadoop-2.4.1就需要重新在64操作系统上重新编译
1.修改Linux主机名
2.修改IP
3.修改主机名和IP的映射关系
######注意######如果你们公司是租用的服务器或是使用的云主机(如华为用主机、阿里云主机等)
/etc/hosts里面要配置的是内网IP地址和主机名的映射关系
4.关闭防火墙
5.ssh免登陆
6.安装JDK,配置环境变量等
集群规划:
主机名IP安装的软件 运行的进程
hadoop01192.168.1.201jdk、hadoopNameNode、DFSZKFailoverController
(zkfc)
hadoop02192.168.1.202jdk、hadoopNameNode、DFSZKFailoverController
(zkfc)
hadoop03192.168.1.203jdk、hadoopResourceManager
hadoop04192.168.1.204jdk、hadoopResourceManager
hadoop05192.168.1.205jdk、hadoop、zookeeperDataNode、NodeManager、JournalNode、
QuorumPeerMain
hadoop06192.168.1.206jdk、hadoop、zookeeperDataNode、NodeManager、JournalNode、
QuorumPeerMain
hadoop07192.168.1.207jdk、hadoop、zookeeperDataNode、NodeManager、JournalNode、
QuorumPeerMain
说明:
1.在hadoop2.0中通常由两个NameNode组成,一个处于active状态,另一个处于standby状态。Active NameNode对外提供服务,而Standby
NameNode则不对外提供服务,仅同步active namenode的状态,以便能够在它失败时快速进行切换。
hadoop2.0官方提供了两种HDFS HA的解决方案,一种是NFS,另一种是QJM。
这里我们使用简单的QJM。在该方案中,主备NameNode之间通过一组JournalNode同步元数据信息,一条数据只要成功写入多数JournalNode即认为写入成
功。通常配置奇数个JournalNode
这里还配置了一个zookeeper集群,用于ZKFC(DFSZKFailoverController)故障转移,当Active NameNode挂掉了,会自动切换Standby
NameNode为standby状态
2.hadoop-2.2.0中依然存在一个问题,就是ResourceManager只有一个,存在单点故障,hadoop-2.4.1解决了这个问题,有两个
ResourceManager,一个是Active,一个是Standby,状态由zookeeper进行协调
安装步骤:
1.安装配置zooekeeper集群(在hadoop05上)
1.1解压
tar -zxvf zookeeper-3.4.5.tar.gz -C /hadoop/
1.2修改配置
cd /hadoop/zookeeper-3.4.5/conf/
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
修改:dataDir=/hadoop/zookeeper-3.4.5/tmp
在最后添加:
server.1=hadoop05:2888:3888
server.2=hadoop06:2888:3888
server.3=hadoop07:2888:3888
保存退出
然后创建一个tmp文件夹
mkdir /hadoop/zookeeper-3.4.5/tmp
再创建一个空文件
touch /hadoop/zookeeper-3.4.5/tmp/myid
最后向该文件写入ID
echo 1 > /hadoop/zookeeper-3.4.5/tmp/myid
1.3将配置好的zookeeper拷贝到其他节点(首先分别在hadoop06、hadoop07根目录下创建一个hadoop目录:mkdir /hadoop)
scp -r /hadoop/zookeeper-3.4.5/ hadoop06:/hadoop/
scp -r /hadoop/zookeeper-3.4.5/ hadoop07:/hadoop/
注意:修改hadoop06、hadoop07对应/hadoop/zookeeper-3.4.5/tmp/myid内容
hadoop06:
echo 2 > /hadoop/zookeeper-3.4.5/tmp/myid
hadoop07:
echo 3 > /hadoop/zookeeper-3.4.5/tmp/myid
2.安装配置hadoop集群(在hadoop01上操作)
2.1解压
tar -zxvf hadoop-2.4.1.tar.gz -C /hadoop/
2.2配置HDFS(hadoop2.0所有的配置文件都在$HADOOP_HOME/etc/hadoop目录下)
#将hadoop添加到环境变量中
vim /etc/profile
export JAVA_HOME=/usr/java/jdk1.7.0_55
export HADOOP_HOME=/hadoop/hadoop-2.4.1
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin
#hadoop2.0的配置文件全部在$HADOOP_HOME/etc/hadoop下
cd /hadoop/hadoop-2.6.0/etc/hadoop
2.2.1修改hadoop-env.sh
export JAVA_HOME=/usr/java/jdk1.7.0_55
2.2.2修改core-site.xml
<configuration> <!-- 指定hdfs的nameservice为wxm --> <property> <name>fs.defaultFS</name> <value>hdfs://wxm</value> </property> <!-- 指定hadoop临时目录 --> <property> <name>hadoop.tmp.dir</name> <value>/root/wxm/software/hadoop/hadoop-2.6.5/tmp</value> </property> <!-- 指定zookeeper地址 --> <property> <name>ha.zookeeper.quorum</name> <value>hadoop01:2181,hadoop02:2181,hadoop03:2181</value> </property> </configuration>
2.2.3修改hdfs-site.xml
<configuration> <!--指定hdfs的nameservice为wxm,需要和core-site.xml中的保持一致 --> <property> <name>dfs.nameservices</name> <value>wxm</value> </property> <!-- ns1下面有两个NameNode,分别是nn1,nn2 --> <property> <name>dfs.ha.namenodes.wxm</name> <value>hadoop05,hadoop06</value> </property> <!-- nn1的RPC通信地址 --> <property> <name>dfs.namenode.rpc-address.wxm.hadoop05</name> <value>hadoop05:9000</value> </property>
<!-- nn1的http通信地址 --> <property> <name>dfs.namenode.http-address.wxm.hadoop051</name> <value>hadoop05:50070</value> </property> <!-- nn2的RPC通信地址 --> <property> <name>dfs.namenode.rpc-address.wxm.hadoop06</name> <value>hadoop06:9000</value> </property> <!-- nn2的http通信地址 --> <property> <name>dfs.namenode.http-address.wxm.hadoop06</name> <value>hadoop06:50070</value> </property> <!-- 指定NameNode的元数据在JournalNode上的存放位置 --> <property> <name>dfs.namenode.shared.edits.dir</name> <value>qjournal://hadoop05:8485;hadoop06:8485/wxm</value> </property> <!-- 指定JournalNode在本地磁盘存放数据的位置 --> <property> <name>dfs.journalnode.edits.dir</name> <value>/root/wxm/software/hadoop/hadoop-2.6.5/journal</value> </property> <!-- 开启NameNode失败自动切换 --> <property> <name>dfs.ha.automatic-failover.enabled</name> <value>true</value> </property> <!-- 配置失败自动切换实现方式 --> <property> <name>dfs.client.failover.proxy.provider.wxm</name> <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value> </property> <!-- 配置隔离机制方法,多个机制用换行分割,即每个机制暂用一行--> <property> <name>dfs.ha.fencing.methods</name> <value> sshfence shell(/bin/true) </value> </property> <!-- 使用sshfence隔离机制时需要ssh免登陆 --> <property> <name>dfs.ha.fencing.ssh.private-key-files</name> <value>/root/.ssh/id_rsa</value> </property> <!-- 配置sshfence隔离机制超时时间 --> <property> <name>dfs.ha.fencing.ssh.connect-timeout</name> <value>300000</value> </property> </configuration>
2.2.4修改mapred-site.xml
<configuration> <!-- 指定mr框架为yarn方式 --> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration>
2.2.5修改yarn-site.xml
<configuration> <!-- 开启RM高可靠 --> <property> <name>yarn.resourcemanager.ha.enabled</name> <value>true</value> </property> <!-- 指定RM的cluster id --> <property> <name>yarn.resourcemanager.cluster-id</name> <value>yrc</value> </property> <!-- 指定RM的名字 --> <property> <name>yarn.resourcemanager.ha.rm-ids</name> <value>resourcemanagerAtWorker1,resourcemanagerAtWorker1</value> </property> <!-- 分别指定RM的地址 --> <property> <name>yarn.resourcemanager.hostname.resourcemanagerAtWorker1</name> <value>worker1</value> </property> <property> <name>yarn.resourcemanager.hostname.resourcemanagerAtWorker2</name> <value>worker2</value> </property> <!-- 指定zk集群地址 --> <property> <name>yarn.resourcemanager.zk-address</name> <value>hadoop01:2181,hadoop02:2181,hadoop03:2181</value> </property> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> </configuration>
2.2.6修改slaves(slaves是指定子节点的位置,因为要在hadoop01上启动HDFS、在hadoop03启动yarn,所以hadoop01上的
slaves文件指定的是datanode的位置,hadoop03上的slaves文件指定的是nodemanager的位置)
hadoop05
hadoop06
hadoop07
2.2.7配置免密码登陆
#首先要配置hadoop01到hadoop02、hadoop03、hadoop04、hadoop05、hadoop06、hadoop07的免密码登
陆
#在hadoop01上生产一对钥匙
ssh-keygen -t rsa
#将公钥拷贝到其他节点,包括自己
ssh-coyp-id hadoop01
ssh-coyp-id hadoop02
ssh-coyp-id hadoop03
ssh-coyp-id hadoop04
ssh-coyp-id hadoop05
ssh-coyp-id hadoop06
ssh-coyp-id hadoop07
#配置hadoop03到hadoop04、hadoop05、hadoop06、hadoop07的免密码登陆
#在hadoop03上生产一对钥匙
ssh-keygen -t rsa
#将公钥拷贝到其他节点
ssh-coyp-id hadoop04
ssh-coyp-id hadoop05
ssh-coyp-id hadoop06
ssh-coyp-id hadoop07
#注意:两个namenode之间要配置ssh免密码登陆,别忘了配置hadoop02到hadoop01的免登陆
在hadoop02上生产一对钥匙
ssh-keygen -t rsa
ssh-coyp-id -i hadoop01
2.4将配置好的hadoop拷贝到其他节点
scp -r /hadoop/ hadoop02:/
scp -r /hadoop/ hadoop03:/
scp -r /hadoop/hadoop-2.4.1/ [email protected]:/hadoop/
scp -r /hadoop/hadoop-2.4.1/ [email protected]:/hadoop/
scp -r /hadoop/hadoop-2.4.1/ [email protected]:/hadoop/
scp -r /hadoop/hadoop-2.4.1/ [email protected]:/hadoop/
###注意:严格按照下面的步骤
2.5启动zookeeper集群(分别在hadoop05、hadoop06、hadoop07上启动zk)
cd /hadoop/zookeeper-3.4.5/bin/
./zkServer.sh start
#查看状态:一个leader,两个follower
./zkServer.sh status
2.6启动journalnode(分别在在hadoop05、hadoop06、tcast07上执行)
cd /hadoop/hadoop-2.4.1
sbin/hadoop-daemon.sh start journalnode
#运行jps命令检验,hadoop05、hadoop06、hadoop07上多了JournalNode进程
2.7格式化HDFS
#在hadoop01上执行命令:
hdfs namenode -format
#格式化后会在根据core-site.xml中的hadoop.tmp.dir配置生成个文件,这里我配置的是/hadoop/hadoop-2.4.1/tmp,然后
将/hadoop/hadoop-2.4.1/tmp拷贝到hadoop02的/hadoop/hadoop-2.4.1/下。
scp -r tmp/ hadoop02:/hadoop/hadoop-2.4.1/
2.8格式化ZK(在hadoop01上执行即可)
hdfs zkfc -formatZK
2.9启动HDFS(在hadoop01上执行)
sbin/start-dfs.sh
2.11启动zkfc
hadoop-daemon.sh start zkfc
2.10启动YARN(#####注意#####:是在hadoop03上执行start-yarn.sh,把namenode和resourcemanager分开是因为性能问题,因为
他们都要占用大量资源,所以把他们分开了,
他们分开了就要分别在不同的机器上启动)
sbin/start-yarn.sh
到此,hadoop-2.4.1配置完毕,可以统计浏览器访问:
http://192.168.1.201:50070
NameNode ‘hadoop01:9000‘ (active)
http://192.168.1.202:50070
NameNode ‘hadoop02:9000‘ (standby)
验证HDFS HA
首先向hdfs上传一个文件
hadoop fs -put /etc/profile /profile
hadoop fs -ls /
然后再kill掉active的NameNode
kill -9 <pid of NN>
通过浏览器访问:http://192.168.1.202:50070
NameNode ‘hadoop02:9000‘ (active)
这个时候hadoop02上的NameNode变成了active
在执行命令:
hadoop fs -ls /
-rw-r--r-- 3 root supergroup 1926 2014-02-06 15:36 /profile
刚才上传的文件依然存在!!!
手动启动那个挂掉的NameNode
sbin/hadoop-daemon.sh start namenode
通过浏览器访问:http://192.168.1.201:50070
NameNode ‘hadoop01:9000‘ (standby)
验证YARN:
运行一下hadoop提供的demo中的WordCount程序:
hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.4.1.jar wordcount /profile /out
OK,大功告成!!!
二、spark安装
standalone模式搭建比较简单 参考官网即可
三、kafka集群搭建
1. kafka2.11下载并解压
2. 修改配置文件
· config/server.properties
broker.id=4(集群里的id不能重复,我是取每台机器IP最后一位)
listeners=PLAINTEXT://192.168.248.134:9092(格式不变,绑定本机IP)
log.dirs=/home/hadoop/kafka/logs4kafka(日志路径)
zookeeper.connect=h2:2181,h3:2181,h4:2181,h8:2181,h9:2181,h10:2181
最后一项时延可以增大一些,曾经因为超过时延而报错,修改成10倍就OK了
·producer.properties
bootstrap.servers=h2:9092,h3:9092,h4:9092,h8:9092,h9:9092,h10:9092
·consumer.properties
zookeeper.connect=h2:2181,h3:2181,h4:2181,h8:2181,h9:2181,h10:2181
group不配的话就是默认,看需求
3.拷贝到其他节点,注意修改listeners绑定的IP和broker.id
4 基本命令
#start
bin/kafka-server-start.sh config/server.properties
#create a topic
bin/kafka-topics.sh --create --zookeeper h2:2181,h3:2181,h4:2181,h8:2181,h9:2181,h10:2181 --replication-factor 5 --partition 5 --topic T20161021
#list all topics
bin/kafka-topics.sh --zookeeper h2:2181,h3:2181,h4:2181,h8:2181,h9:2181,h10:2181 --list
#describe the detail of this topic
bin/kafka-topics.sh --describe --zookeeper h2:2181,h3:2181,h4:2181,h8:2181,h9:2181,h10:2181 --topic test101
#write some Messages to the topic
bin/kafka-console-producer.sh --broker-list h2:9092 --topic T20161021
#recerive the message producer writed
bin/kafka-console-consumer.sh --zookeeper h2:2181,h3:2181,h4:2181,h8:2181,h9:2181,h10:2181 --topic T20161021 --from-beginning
四、flume-ng 1.6安装配置
1.下载安装并配置好flume的运行环境
2.编写配置文件
# ------------------- 定义数据流----------------------
agent.sources = kafkaSource
agent.channels = memoryChannel
agent.sinks = hdfsSink
agent.sources.kafkaSource.channels = memoryChannel
agent.sinks.hdfsSink.channel = memoryChannel
#-------- kafkaSource相关配置-----------------
agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafkaSource.zookeeperConnect = h2:2181,h3:2181,h4:2181
agent.sources.kafkaSource.topic = T20161031
#agent.sources.kafkaSource.groupId = flume
agent.sources.kafkaSource.kafka.consumer.timeout.ms = 1000
#------- memoryChannel相关配置-------------------------
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity=10000
agent.channels.memoryChannel.transactionCapacity=1000
#---------hdfsSink 相关配置------------------
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = hdfs://h4:9000/user/test/kafka2HdfsByFlume
agent.sinks.hdfsSink.hdfs.writeFormat = Text
agent.sinks.hdfsSink.hdfs.fileType = DataStream
3.启动Flume
bin/flume-ng agent --conf conf --conf-file conf/kafka2HdfsByFlume.conf --name agent -Dflume.root.logger=INFO,console
五、sparkStreaming1.6.2
import org.apache.spark.SparkConf;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.*;import scala.Tuple2;import java.util.Arrays;public class StreamingOnHDFS {public static void main(String[] args){final SparkConf conf = newSparkConf().setMaster("spark://h4:7077").setAppName("SparkStreamingOnHDFS") ;Durations.seconds(5);//Durations.seconds(5) 设置每隔 5 秒final String checkpointDirectory = "hdfs://h4:9000/library/SparkStreaming/Checkpoint_Data";JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {@Overridepublic JavaStreamingContext create() {return createContext(checkpointDirectory, conf) ;}} ;JavaStreamingContext jsc = JavaStreamingContext. getOrCreate(checkpointDirectory, factory) ;JavaDStream lines = jsc.textFileStream("hdfs://h4:9000/user/test/kafka2HdfsByFlume") ; JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {public Iterable<String> call(String line) throws Exception {return Arrays. asList(line.split(" ")) ;}}) ;JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String,Integer>() {public Tuple2<String, Integer> call(String word) throws Exception {return new Tuple2<String, Integer>(word, 1) ;}}) ;JavaPairDStream<String, Integer> wordscount = pairs.reduceByKey(new Function2<Integer, Integer,Integer>() {public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2;}}) ;wordscount.print();jsc.start() ;jsc.awaitTermination() ;jsc.close() ;} private static JavaStreamingContext createContext(String checkpointDirectory, SparkConf conf){System. out.println("==========Creating new context==============") ;JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations. seconds(5)) ;ssc.checkpoint(checkpointDirectory) ;return ssc;}}
六、测试
1.将程序打成runnable jar并上传
2.启动zookeeper、hadoop、spark、kafka、flume
3.进入spark安装目录下执行
bin/spark-submit --class com.unisk.spark.sparkStreaming.StreamingOnHDFS --master spark://h4:7077 /home/hadoop/spark/Apps4Spark/StreamingOnHDFS.jar
七、扩展
数据导入HDFS之后即可进行基于MR/spark的批处理或准实时的流式处理,可作海量数据的清洗工作或预处理,结果可存入Hbase、redis或RDBMS以做进一步的展示或挖掘
本文出自 “10891776” 博客,请务必保留此出处http://10901776.blog.51cto.com/10891776/1874924
以上是关于flume + kafka + sparkStreaming + HDFS 构建实时日志分析系统的主要内容,如果未能解决你的问题,请参考以下文章
[Flume][Kafka]Flume 与 Kakfa结合例子(Kakfa 作为flume 的sink 输出到 Kafka topic)