大数据篇:flume+kafka+spark stream+hbase做日志收集
Posted 杨铖
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据篇:flume+kafka+spark stream+hbase做日志收集相关的知识,希望对你有一定的参考价值。
前言
flume+kafka+spark stream 是目前比较常用的一套大数据消息日志收集管理框架,至于最后是入到Hive或者者Hbase需看不同业务场景,下面以HBase为场景简述下整个配置与搭建流程以及这些框架如此搭配的优点。
1. flume 配置
1.1 flume 简介
从官网文档 https://flume.apache.org 可以知道Flume的定位是很清晰的,它提供了一个分布式的,高可用的桥梁链接
,可以收集、聚合和移动大量的日志数据,从许多不同的数据源到集中式数据存储,大概的结构如下图,流程大致为 从源端(source)接收数据,经过管道(channel)的缓存等等,发送到目标(sink)端。:
其中source的定义flume提供了很多方式,常用的有以下几种:
Http source
,这种方式可以通过监听接口方式来收集log;Exec source
,这种方式可以通过执行一些shell命令来收集log,例如通过tail -f 文件
来监听文件追加的日志;Spooling source
,这种方式可以监听某个目录的log,当有新的log产生时即会被发送出去;- 还有很多其他的方式,例如可以以
kafka
作为source,这样flume
就充当了kafka
的消费者,当然还有很多如Avro source
,Thrift source
,TCP
类的等等,具体参考官网文档更加相应场景配置即可。
channel同样flume提供了很多方式,memory channel
这种方式已经不太建议了,原因也很明显,不够安全,当出现任何机器问题时数据就会丢失,file channel
和kafka channel
是比较推荐的做法,特别是当需要比较高的并发时,kafka channel
是一个不错的选择。
sink同样flume提供了很多方式,常用的有以下几种:
HDFS/Hive/Hbase/ElasticSearch sink
,直接写入hdfs/Hive/Hbase/ElasticSearch,这种方式适合那些比较无需做ETL的场景。kafka sink
,直接充当kafka
的生产者,可以看到kafka
可以在整个flume
生命周期里可以自由穿插。Http sink
,直接通过post方法将数据发送到目标api。- 其他的一些详细见官网文档即可。
1.2 flume 配置
下面以Spooling Directory Source
-> file channel
-> kafka sink
为例:
一份样例配置参数:
# Name the components on this agent
agent.sources = dir-src
agent.sinks = kafka-sink
agent.channels = file-channel
# Describe/configure the source
agent.sources.dir-src.type = spooldir
agent.sources.dir-src.spoolDir = #监听目录
agent.sources.dir-src.fileHeader = true
agent.sources.dir-src.deserializer.maxLineLength=1000000
# Describe the sink
agent.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafka-sink.kafka.topic = test
agent.sinks.kafka-sink.kafka.bootstrap.servers = #kafka boostrapServer
# Use a channel which buffers events in file
agent.channels.file-channel.type = file
agent.channels.file-channel.checkpointDir = # checkpoint目录
agent.channels.file-channel.dataDirs = # 缓存的数据目录
# Bind the source and sink to the channel
agent.sources.dir-src.channels = file-channel
agent.sinks.kafka-sink.channel = file-channel
配置详解:
- 首先每个flume配置表可以存在多个agent,多个source,多个channel,多个sink,所以可以根据相应业务场景进行组合。
- 对于每个agent,必须配置相应的
source/channel/sink
,通过agent.source = ???
,agent.channel = ???
,agent.sink = ???
来指定。 - 对于具体的
source/channel/sink
,通过agent.source/channel/sink.???.属性 = ...
来具体配置source/channel/sink
的属性。 - 配置完
source/channel/sink
相应的属性后,需把相应的组件串联一起,如:agent.sources.dir-src.channels = file-channel
其中dir-src这个source指定了其channel为我们定义好的file-channel.
一些Tips:
- flume在收集log的时候经常会出现
Line length exceeds max (2048), truncating line!
,这个一般情况对于一些log的存储没影响,但是遇到需要解析log的情况就有问题了,有时一个json或者其他格式的log被截断了,解析也会出问题,所以在source的属性配置里可以通过参数deserializer.maxLineLength
调高默认的2048。 - flume在监听相应的目录时,如果有重名的文件,或者直接在监听目录下修改相应正在读取的文件时,都会报错,而且flume-ng目前没有这种容错机制,报错只能重启了,还有一个比较大的问题,flume-ng没有提供相应的kill脚本,只能通过shell直接
ps -aux | grep flume
找到相应的PID,然后手动kill。 - flume在监听相应目录时,如果目录下的文件是通过HTTP或者scp传输过来的,小文件的话没问题,但是当文件大小超过网络传输速率,就会造成flume读取文件时报错直接显示文件大小正在变化,这点也是比较麻烦的,所以建议是现有个临时目录先存放文件,等文件传输完成后再通过shell的
mv
命令直接发送到监听目录。 - 有时候我们的log文件是以压缩的方式传输过来,但是如果我们想解析后才发送出去的话,可以将当前的
Spooling Directory Source
的改为Exec Source
,可以指定改source的command
参数里写shell解析命令。
flume的启动:
flume-ng agnet --conf "配置文件文件目录" --conf-file "配置文件" --name "配置文件里agent的名字"
2. kafka 配置
2.1 kafka简介
kafka的官网 https://kafka.apache.org 同样对kafka的定位做了一个清晰阐述,分布式的消息流平台
,与传统的MQ架构类似,kafka解耦了生产者,中间层与消费者三个组件,乍一听似乎与其他的MQ框架没有太大的区别,于是对比了很久,各个框架间并没有表现出显著性的区别以致某一方是不可替代的,但是其中仍有一些值得细细推敲的地方,具体可见下表(以rabbit MQ 为例):
属性 | rabbit MQ | Kafka |
---|---|---|
多语言支持 | 支持,语言无关 | 支持,语言无关 |
消息延迟 | 微妙级 | 毫秒级 |
负载均衡 | miror queue | 多broker,多replication |
协议问题 | 遵从AMQP协议, broker由Exchange,Binding,queue组成,客户端Producer通过连接channel和server进行通信,Consumer从queue获取消息进行消费, rabbitMQ以broker为中心,有消息的确认机制。 | 遵从一般的MQ结构,producer,broker,consumer,consumer从broker上批量pull数据,通过提交offset来做相应消息拉取管控。 |
集群扩展 | 支持 | 原生支持 |
事务支持 | 原生支持 | 支持 |
除上述所列各点外,还有几点需单独拿出讨论的:
- kafka利用zookeeper做均衡管理,最新的kafka版本在消费者消费完信息后会将offset保存在kafka本身服务上,而不是zookeeper上,这在很大程度保证了消息队列被消费不会出现缺失与重复,但是要保证0重复0丢失,对于consumer提交offset的设计仍有比较大的考验。
- kafka在创建topic时一般都是分区存储,如此带来的问题是每个分区间的消息顺序是很难保证全局性,只能在单个分区下保证,因此kafka在日志这个领域会更加的吻合和焕发光芒。
2.2 kafka配置
同样,下面是一份单broker的kafka配置方案:
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
############################# Group Coordinator Settings #############################
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
delete.topic.enable = true
有几点配置需要注意:
- broker.id 必须是全局唯一的,多个broker尽量部署在不同的集群上,通过指定相同的zookeeper.connect 进行统一管理。
- listeners 是监听相应的IP:Port,如果kafka已经部署在集群上,会通过java.net.InetAddress.getCanonicalHostName()自动获取到相应的地址。
- num.partitions 是为每个topic保留的默认分区,如果创建topic时不指定即采用默认1。
- 其他的一些配置参数看注释既可以,
delete.topic.enable = true
可以让topic的删除什么更加方便。
kafka的启动:
kafka-server-start server.properties
创建无备份,分区为1的topic:
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic test
删除topic:
kafka-topics --delete --zookeeper localhost:2181 --topic test
zookeeper-client
rmr /brokers/topics/test
3. spark stream 消费者
3.1 spark stream 简介
Spart Stream 是 Spark 框架下一个流处理的子项目,其基础数据DStream
封装的是spark的RDD
,通过轮询不断地从源端拉取数据,spark stream支持多种源端数据的拉取,同时基于spark的核心计算模块,使得其在实时性和大数据方面有着很强的优势,其流程结构大概如下图所示:
3.2 spark stream 写 Kafka 消费者
spark stream 写 kafka 消费者,官方提供了相应的示例,这里再稍微简述下:
首先sbt
引入spark stream/Kafka相关依赖
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.1.0" % Provided
libraryDependencies += "org.apache.spark" % "spark-mllib_2.11" % "2.1.0" % Provided
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.1.0" % Provided
libraryDependencies += "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % Provided
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0" % Provided
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.1.0"
其次定义好kafka参数:
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "192.168.1.23:9093,gz19:9092,gz21:9092,gz24:9092,gz18:9092,gz89:9092,bigdata.zuzuche.cn:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "kafka_consumer_tantu",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
订阅相应的topic:
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
接下来就可以对stream做进一步的处理,跟spark rdd的处理类似。
同样在写spark stream的时候有一些细节需要注意:
- spark stream的轮询时间最小可以达到500ms, 但是如此带来的集群资源消耗也会更大,轮询的时间间隔应根据具体的场景设定。
- spark stream本质上仍为spark的任务,只是添加了轮询机制使其一直挂在后台,当spark-submit提交spark stream的时候若设定的excutor大于kafka topic创建时设定的分区,多出来的部分会处于空闲,所以两者的配置要互相参考。
4. HBase 存储
4.1 HBase 简介
HBase是NoSql中的一个代表,是一个面向列的数据库,支持亿级别的行*百万级别的列,若要定位到某个字段的值,通常需要限定如下:表名 -> rowid -> column family:column name -> timestamp
,其中rowid为全局唯一的行键,行键的设计会影响到列的同个列下的排序,column family为列簇,其含义接近于HIve中的分区,通过column family的限定,其下相应的column会被集中存放,不同column family的column会分开存放,这样当需要索引少量的列时,无需遍历全部字段,当然,column family也不是越多越好,而且官方文档似乎也不支持过多的列簇,关于HBase的表结构,参考如下图:
4.2 spark stream 写入 HBase (以HBase 1.2.0 为例)
引入HBase相关依赖:
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.0"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.0"
将数据存储为HBase对应的格式:
// 随机产生某个uuid为行键
val put = new Put(Bytes.toBytes(UUID.randomUUID().toString))
// 将列簇,列明,列值添加进对应结构
put.addColumn(Bytes.toBytes("column_family"), Bytes.toBytes("column_name"), Bytes.toBytes("column_value"))
插入HBase:
// 表名
val tablename = "table_name"
// 创建初始配置
val hbaseconf = HBaseConfiguration.create()
// 创建链接
val conn = ConnectionFactory.createConnection(hbaseconf)
// 指定表
val table: HTable = new HTable(hbaseconf, Bytes.toBytes(tablename))
// 提交事务,插入数据
table.put(put)
5. Hive 外部表关联 HBase, impala 映射查询
Hive做HBase的外部关联,需提前定义好列字段,而通常HBase的列都是无限扩展的,所以通过Hive外部映射HBase,只能处理一些日常的查询需求。
5.1 Hive 外部银映射 HBase:
CREATE EXTERNAL TABLE hive_external_HBase(
key string,
time string,
`_track_id` string,
)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,search:time,search:_track_id")
TBLPROPERTIES("hbase.table.name" = "HBase_table_name");
语法与创建Hive基本一致,需要注意的是hive字段不支持特殊字符如$_*&
等开头,需加转义符。
最后,Hive有时候查询的速度并不能达到我们的想象,再做以不impala映射,用impala的查询引擎,会明显快很多:
INVALIDATE METADATA;
总结
flume+kafka+spark stream+hbase是目前比较常用的组合,相信对这种组合存疑的有不少,下面稍微总结下:
- 为什么不用kafka直接接收源数据,而用flume作为Kafka的源?
从配置方面讲,flume提供了多种源接收方式,且只需做简单的配置即可,灵活的多种源配置也方便后续的收集扩展,kafka作为源会比flume稍微麻烦点,需在前面写一层生产者,实际上cloudera官方也建议,当存在多给消费者时,用kafka会更好,当存在多个多种生产者时,用flume会更加方便,同时,如果并发很高,可以采用kafka做flume的channel。
- 为什么用spark stream作为kafka的消费者而不是其他?
就目前spark stream的性能来看,spark stream还不能完全称之为实时流处理,更合适的叫法应该是准实时批处理,但是由于其最低延迟可以达到秒级,基本满足了大部分系统需要,对于对实时性要求不高的可以胜任,同时Spark stream内部封装的仍是Spark RDD结构,对于熟悉spark家族的开发者会更友好,且相应的处理解决方案会更多更成熟。另外Storm也是目前spark stream外比较流行的流处理,其实时性比spark stream更高,但属于spark体系外,要求相关开发者具备的能力会更高,所以可以根据不同场景和技术体系,做相应选择。
- 为什么是入到hbase而不是其他Nosql?
无他,HBase是目前Hadoop家族里BigTable最完善的,列式存储结构最成熟的方案。
以上是关于大数据篇:flume+kafka+spark stream+hbase做日志收集的主要内容,如果未能解决你的问题,请参考以下文章
大数据Spark“蘑菇云”行动之spark streaming消费flume采集的kafka数据Directf方式
(数据挖掘)大数据Flume+kafka+zookeeper+Strom/Spark/Fink......
下载基于大数据技术推荐系统实战教程(Spark ML Spark Streaming Kafka Hadoop Mahout Flume Sqoop Redis)
大数据Spark“蘑菇云”行动之flume整合spark streaming
大数据架构开发 挖掘分析 Hadoop HBase Hive Storm Spark Sqoop Flume ZooKeeper Kafka Redis MongoDB 机器学习 云计算 视频教程 J