Flume+Kafka整合

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume+Kafka整合相关的知识,希望对你有一定的参考价值。

Flume+Kafka整合

 

 

一、准备工作

准备5台内网服务器创建ZookeeperKafka集群

服务器地址:

192.168.2.240

192.168.2.241

192.168.2.242

192.168.2.243

192.168.2.244

服务器系统:Centos 6.5  64

 

 

下载安装包

Zookeeperhttp://apache.fayea.com/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz

Flumehttp://apache.fayea.com/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz

Kafkahttp://apache.fayea.com/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz

 

ZookeeperFlumekafka需要用到Java环境,所以先安装JDk

yum install java-1.7.0-openjdk-devel



二、安装配置zookeeper

选择3台服务器作为zookeeper集群,他们的IP分别为:

192.168.2.240

192.168.2.241

192.168.2.242

 

注:先在第一台服务器192.168.2.240上分别执行(1)-(3)步。

1)解压:将zookeeper-3.4.6.tar.gz放入/opt目录下

tar zxf zookeeper-3.4.6.tar.gz

2)创建配置文件:将conf/zoo_sample.cfg拷贝一份命名为zoo.cfg,也放在conf目录下。然后按照如下值修改其中的配置:

    tickTime=2000      

    dataDir=/opt/zookeeper/Data

    initLimit=5

    syncLimit=2

    clientPort=2181

    server.1=192.168.2.240:2888:3888

    server.2=192.168.2.241:2888:3888

    server.3=192.168.2.242:2888:3888

 

各个参数的意义:

tickTime:心跳检测的时间间隔(毫秒),缺省:2000

clientPort:其他应用(比如solr)访问ZooKeeper的端口,缺省:2181

initLimit:初次同步的阶段(followers连接到leader的阶段),允许的时长(tick数量),缺省:10

syncLimit:允许followers同步到ZooKeeper的时长(tick数量),缺省:5

dataDir:数据(比如所管理的配置文件)的存放路径

server.XX是集群中一个服务器的id,与myid文件中的id是一致的。右边可以配置两个端口,第一个端口用于FllowerLeader之间的数据同步和其它通信,第二个端口用于Leader选举过程中投票通信。

 

3)创建/opt/zookeeper/Data快照目录,并创建my id文件,里面写入1。

   mkdir /opt/zookeeper/Data
   vi /opt/zookeeper/Data/myid
   1

4)将192.168.2.240上已经配置好的/opt/zookeeper/目录分别拷贝至192.168.2.241192.168.2.242。然后将对应的myid的内容修改为23

 

5)启动zookeeper集群

分别在3台服务器上执行启动命令

/opt/zookeeper/bin/zkServer.sh start

 

 

三、安装配置kafka集群

一共5台服务器,服务器IP地址:

192.168.2.240  node1

192.168.2.241  node2

192.168.2.242  node3

192.168.2.243  node4

192.168.2.244  node5

 

1、解压安装文件到/opt/目录

cd /opt
tar -zxvf kafka_2.10-0.10.0.0.tar.gz
mv kafka_2.10-0.10.0.0  kafka

 

2、修改server. properties文件

#node1 配置

broker.id=0

port=9092

advertised.listeners=PLAINTEXT:// 58.246.xx.xx:9092

advertised.host.name=58.246.xx.xx

#碰到的坑,由于我是从线上把nginx日志拉回公司本地服务器,所以这两选项必须配置成路由器外网IP地址,否则线上flume报无法连接kafka节点,报无法传送日志消息

advertised.port=9092

num.network.threads=3

num.io.threads=8

num.partitions=5

zookeeper.connect=192.168.2.240:2181,192.168.2.241:2181,192.168.2.242:2181


#node2 配置

broker.id=1

port=9093

advertised.listeners=PLAINTEXT://58.246.xx.xx:9093

advertised.host.name=58.246.xx.xx

advertised.port=9093

num.network.threads=3

num.io.threads=8

num.partitions=5

zookeeper.connect=192.168.2.240:2181,192.168.2.241:2181,192.168.2.242:2181


#node3 配置

broker.id=2

port=9094

advertised.listeners=PLAINTEXT:// 58.246.xx.xx:9094

advertised.host.name=58.246.xx.xx

advertised.port=9094

num.network.threads=3

num.io.threads=8

num.partitions=5

zookeeper.connect=192.168.2.240:2181,192.168.2.241:2181,192.168.2.242:2181



#node4 配置

broker.id=2

port=9095

advertised.listeners=PLAINTEXT:// 58.246.xx.xx:9095

advertised.host.name=58.246.xx.xx

advertised.port=9095

num.network.threads=3

num.io.threads=8

num.partitions=5

zookeeper.connect=192.168.2.240:2181,192.168.2.241:2181,192.168.2.242:2181


#node5 配置

broker.id=2

port=9096

advertised.listeners=PLAINTEXT:// 58.246.xx.xx:9096

advertised.host.name=58.246.xx.xx

advertised.port=9096

num.network.threads=3

num.io.threads=8

num.partitions=5

zookeeper.connect=192.168.2.240:2181,192.168.2.241:2181,192.168.2.242:2181

 

 

启动卡夫卡集群

分别在所有节点执行以下命令来启动服务

/opt/kafka/bin/kafka-server-start.sh/opt/kafka/config/server.properties &


 

 

四、安装配置Flume

安装两台flume,一台安装在线上,把线上的日志传回本地kafka,另一台安装在本地,把kafka集群的日志信息转存到HDFS

4.1、线上服务器安装Flume

收集nginx日志传给公司内部kafka

 

1、  解压安装包

cd /opt

tar –zxvf apache-flume-1.7.0-bin.tar.gz

 

2、  创建配置文件

Vi flume-conf.properties 添加以下内容

 

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F/unilifeData/logs/nginx/access.log

a1.sources.r1.channels = c1


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 100000

a1.channels.c1.transactionCapacity = 100000


#sinks

a1.sinks.k1.type =org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.topic = unilife_nginx_production

a1.sinks.k1.kafka.bootstrap.servers = 58.246.xx.xx:9092,58.246.xx.xx:9093,58.246.xx.xx:9094

a1.sinks.k1.brokerList = 58.246.xx.xx:9092,58.246.xx.xx:9093,58.246.xx.xx:9094

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.flumeBatchSize = 2000

a1.sinks.k1.channel = c1

 

 

启动flume服务

/opt/flume/bin/flume-ng agent --conf /opt/flume/conf/--conf-file /opt/flume/conf/flume-conf.properties --name a1-Dflume.root.logger=INFO,LOGFILE &


4.2、本地安装flume

转存日志到HDFS

1、解压安装包

cd /opt

tar –zxvf apache-flume-1.7.0-bin.tar.gz

 

3、  创建配置文件

nginx.sources = source1

nginx.channels = channel1

nginx.sinks = sink1

nginx.sources.source1.type =org.apache.flume.source.kafka.KafkaSource

nginx.sources.source1.zookeeperConnect =master:2181,slave1:2181,slave2:2181

nginx.sources.source1.topic =unilife_nginx_production

nginx.sources.source1.groupId =flume_unilife_nginx_production

nginx.sources.source1.channels = channel1

nginx.sources.source1.interceptors = i1

nginx.sources.source1.interceptors.i1.type =timestamp

nginx.sources.source1.kafka.consumer.timeout.ms = 100


nginx.channels.channel1.type = memory

nginx.channels.channel1.capacity = 10000000

nginx.channels.channel1.transactionCapacity = 1000


nginx.sinks.sink1.type = hdfs

nginx.sinks.sink1.hdfs.path =hdfs://192.168.2.240:8020/user/hive/warehouse/nginx_log

nginx.sinks.sink1.hdfs.writeFormat=Text

nginx.sinks.sink1.hdfs.inUsePrefix=_

nginx.sinks.sink1.hdfs.rollInterval = 3600

nginx.sinks.sink1.hdfs.rollSize = 0

nginx.sinks.sink1.hdfs.rollCount = 0

nginx.sinks.sink1.hdfs.fileType = DataStream

nginx.sinks.sink1.hdfs.minBlockReplicas=1

nginx.sinks.sink1.channel = channel1


启动服务

/opt/flume/bin/flume-ng agent --conf /opt/flume/conf/--conf-file /opt/flume/conf/flume-nginx-log.properties --name nginx-Dflume.root.logger=INFO,LOGFILE &



以上是关于Flume+Kafka整合的主要内容,如果未能解决你的问题,请参考以下文章

flume 整合 kafka

flume 整合kafka

flume整合kafka

Flume 学习笔记之 Flume NG+Kafka整合

Flume与Kafka整合案例详解

Flume+Kafka整合