flume kafka spark读取日志延迟

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flume kafka spark读取日志延迟相关的知识,希望对你有一定的参考价值。

参考技术A 您时想问flumekafkaspark读取日志延迟的原因吗?网络不好或者内存不够。通常读取延迟或者失败时因为网络环境不好。可以选择等一会缓冲一下,也可以直接切换个好一点的网络。还有一种情况时因为内存不够了,不足以支撑它打开日志,这时可以选择清理一下内存再尝试。

大数据篇:flume+kafka+spark stream+hbase做日志收集

前言

flume+kafka+spark stream 是目前比较常用的一套大数据消息日志收集管理框架,至于最后是入到Hive或者者Hbase需看不同业务场景,下面以HBase为场景简述下整个配置与搭建流程以及这些框架如此搭配的优点。

1. flume 配置

1.1 flume 简介

从官网文档 https://flume.apache.org 可以知道Flume的定位是很清晰的,它提供了一个分布式的,高可用的桥梁链接,可以收集、聚合和移动大量的日志数据,从许多不同的数据源到集中式数据存储,大概的结构如下图,流程大致为 从源端(source)接收数据,经过管道(channel)的缓存等等,发送到目标(sink)端。:

其中source的定义flume提供了很多方式,常用的有以下几种:

  • Http source ,这种方式可以通过监听接口方式来收集log;
  • Exec source ,这种方式可以通过执行一些shell命令来收集log,例如通过 tail -f 文件 来监听文件追加的日志;
  • Spooling source,这种方式可以监听某个目录的log,当有新的log产生时即会被发送出去;
  • 还有很多其他的方式,例如可以以kafka作为source,这样flume就充当了kafka的消费者,当然还有很多如 Avro sourceThrift sourceTCP类的等等,具体参考官网文档更加相应场景配置即可。

channel同样flume提供了很多方式,memory channel这种方式已经不太建议了,原因也很明显,不够安全,当出现任何机器问题时数据就会丢失,file channelkafka channel是比较推荐的做法,特别是当需要比较高的并发时,kafka channel是一个不错的选择。

sink同样flume提供了很多方式,常用的有以下几种:

  • HDFS/Hive/Hbase/ElasticSearch sink,直接写入hdfs/Hive/Hbase/ElasticSearch,这种方式适合那些比较无需做ETL的场景。
  • kafka sink,直接充当kafka的生产者,可以看到kafka可以在整个flume生命周期里可以自由穿插。
  • Http sink,直接通过post方法将数据发送到目标api。
  • 其他的一些详细见官网文档即可。

1.2 flume 配置

下面以Spooling Directory Source -> file channel -> kafka sink为例:

一份样例配置参数:

# Name the components on this agent
agent.sources = dir-src
agent.sinks = kafka-sink
agent.channels = file-channel

# Describe/configure the source
agent.sources.dir-src.type = spooldir
agent.sources.dir-src.spoolDir = #监听目录
agent.sources.dir-src.fileHeader = true
agent.sources.dir-src.deserializer.maxLineLength=1000000

# Describe the sink
agent.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafka-sink.kafka.topic = test
agent.sinks.kafka-sink.kafka.bootstrap.servers = #kafka boostrapServer

# Use a channel which buffers events in file
agent.channels.file-channel.type = file
agent.channels.file-channel.checkpointDir = # checkpoint目录
agent.channels.file-channel.dataDirs = # 缓存的数据目录

# Bind the source and sink to the channel
agent.sources.dir-src.channels = file-channel
agent.sinks.kafka-sink.channel = file-channel

配置详解:

  • 首先每个flume配置表可以存在多个agent,多个source,多个channel,多个sink,所以可以根据相应业务场景进行组合。
  • 对于每个agent,必须配置相应的source/channel/sink,通过agent.source = ???,agent.channel = ???,agent.sink = ???来指定。
  • 对于具体的source/channel/sink,通过 agent.source/channel/sink.???.属性 = ... 来具体配置 source/channel/sink 的属性。
  • 配置完source/channel/sink相应的属性后,需把相应的组件串联一起,如: agent.sources.dir-src.channels = file-channel其中dir-src这个source指定了其channel为我们定义好的file-channel.

一些Tips:

  • flume在收集log的时候经常会出现Line length exceeds max (2048), truncating line!,这个一般情况对于一些log的存储没影响,但是遇到需要解析log的情况就有问题了,有时一个json或者其他格式的log被截断了,解析也会出问题,所以在source的属性配置里可以通过参数deserializer.maxLineLength调高默认的2048。
  • flume在监听相应的目录时,如果有重名的文件,或者直接在监听目录下修改相应正在读取的文件时,都会报错,而且flume-ng目前没有这种容错机制,报错只能重启了,还有一个比较大的问题,flume-ng没有提供相应的kill脚本,只能通过shell直接ps -aux | grep flume找到相应的PID,然后手动kill。
  • flume在监听相应目录时,如果目录下的文件是通过HTTP或者scp传输过来的,小文件的话没问题,但是当文件大小超过网络传输速率,就会造成flume读取文件时报错直接显示文件大小正在变化,这点也是比较麻烦的,所以建议是现有个临时目录先存放文件,等文件传输完成后再通过shell的mv命令直接发送到监听目录。
  • 有时候我们的log文件是以压缩的方式传输过来,但是如果我们想解析后才发送出去的话,可以将当前的Spooling Directory Source的改为Exec Source,可以指定改source的command参数里写shell解析命令。

flume的启动:

flume-ng agnet --conf "配置文件文件目录" --conf-file "配置文件" --name "配置文件里agent的名字"

2. kafka 配置

2.1 kafka简介

kafka的官网 https://kafka.apache.org 同样对kafka的定位做了一个清晰阐述,分布式的消息流平台,与传统的MQ架构类似,kafka解耦了生产者,中间层与消费者三个组件,乍一听似乎与其他的MQ框架没有太大的区别,于是对比了很久,各个框架间并没有表现出显著性的区别以致某一方是不可替代的,但是其中仍有一些值得细细推敲的地方,具体可见下表(以rabbit MQ 为例):

属性rabbit MQKafka
多语言支持支持,语言无关支持,语言无关
消息延迟微妙级毫秒级
负载均衡miror queue多broker,多replication
协议问题遵从AMQP协议, broker由Exchange,Binding,queue组成,客户端Producer通过连接channel和server进行通信,Consumer从queue获取消息进行消费, rabbitMQ以broker为中心,有消息的确认机制。遵从一般的MQ结构,producer,broker,consumer,consumer从broker上批量pull数据,通过提交offset来做相应消息拉取管控。
集群扩展支持原生支持
事务支持原生支持支持

除上述所列各点外,还有几点需单独拿出讨论的:

  • kafka利用zookeeper做均衡管理,最新的kafka版本在消费者消费完信息后会将offset保存在kafka本身服务上,而不是zookeeper上,这在很大程度保证了消息队列被消费不会出现缺失与重复,但是要保证0重复0丢失,对于consumer提交offset的设计仍有比较大的考验。
  • kafka在创建topic时一般都是分区存储,如此带来的问题是每个分区间的消息顺序是很难保证全局性,只能在单个分区下保证,因此kafka在日志这个领域会更加的吻合和焕发光芒。

2.2 kafka配置

同样,下面是一份单broker的kafka配置方案:

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

delete.topic.enable = true

有几点配置需要注意:

  • broker.id 必须是全局唯一的,多个broker尽量部署在不同的集群上,通过指定相同的zookeeper.connect 进行统一管理。
  • listeners 是监听相应的IP:Port,如果kafka已经部署在集群上,会通过java.net.InetAddress.getCanonicalHostName()自动获取到相应的地址。
  • num.partitions 是为每个topic保留的默认分区,如果创建topic时不指定即采用默认1。
  • 其他的一些配置参数看注释既可以,delete.topic.enable = true可以让topic的删除什么更加方便。

kafka的启动:

kafka-server-start server.properties

创建无备份,分区为1的topic:

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic test

删除topic:

kafka-topics  --delete --zookeeper localhost:2181 --topic test
zookeeper-client 
rmr /brokers/topics/test

3. spark stream 消费者

3.1 spark stream 简介

Spart Stream 是 Spark 框架下一个流处理的子项目,其基础数据DStream封装的是spark的RDD,通过轮询不断地从源端拉取数据,spark stream支持多种源端数据的拉取,同时基于spark的核心计算模块,使得其在实时性和大数据方面有着很强的优势,其流程结构大概如下图所示:

3.2 spark stream 写 Kafka 消费者

spark stream 写 kafka 消费者,官方提供了相应的示例,这里再稍微简述下:

首先sbt引入spark stream/Kafka相关依赖

libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.1.0" % Provided
libraryDependencies += "org.apache.spark" % "spark-mllib_2.11" % "2.1.0" % Provided
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.1.0" % Provided
libraryDependencies += "org.apache.spark" % "spark-hive_2.11" % "2.1.0" % Provided
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"  % Provided
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.1.0" 

其次定义好kafka参数:

val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "192.168.1.23:9093,gz19:9092,gz21:9092,gz24:9092,gz18:9092,gz89:9092,bigdata.zuzuche.cn:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "kafka_consumer_tantu",
        "auto.offset.reset" -> "latest",
        "enable.auto.commit" -> (false: java.lang.Boolean)
    )

订阅相应的topic:

val stream = KafkaUtils.createDirectStream[String, String](
            ssc,
            PreferConsistent,
            Subscribe[String, String](topics, kafkaParams)
        )

接下来就可以对stream做进一步的处理,跟spark rdd的处理类似。

同样在写spark stream的时候有一些细节需要注意:

  • spark stream的轮询时间最小可以达到500ms, 但是如此带来的集群资源消耗也会更大,轮询的时间间隔应根据具体的场景设定。
  • spark stream本质上仍为spark的任务,只是添加了轮询机制使其一直挂在后台,当spark-submit提交spark stream的时候若设定的excutor大于kafka topic创建时设定的分区,多出来的部分会处于空闲,所以两者的配置要互相参考。

4. HBase 存储

4.1 HBase 简介

HBase是NoSql中的一个代表,是一个面向列的数据库,支持亿级别的行*百万级别的列,若要定位到某个字段的值,通常需要限定如下:表名 -> rowid -> column family:column name -> timestamp,其中rowid为全局唯一的行键,行键的设计会影响到列的同个列下的排序,column family为列簇,其含义接近于HIve中的分区,通过column family的限定,其下相应的column会被集中存放,不同column family的column会分开存放,这样当需要索引少量的列时,无需遍历全部字段,当然,column family也不是越多越好,而且官方文档似乎也不支持过多的列簇,关于HBase的表结构,参考如下图:

4.2 spark stream 写入 HBase (以HBase 1.2.0 为例)

引入HBase相关依赖:

libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.0"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.0"

将数据存储为HBase对应的格式:

// 随机产生某个uuid为行键
val put = new Put(Bytes.toBytes(UUID.randomUUID().toString))
// 将列簇,列明,列值添加进对应结构
put.addColumn(Bytes.toBytes("column_family"), Bytes.toBytes("column_name"), Bytes.toBytes("column_value"))

插入HBase:

// 表名
val tablename = "table_name"
// 创建初始配置
val hbaseconf = HBaseConfiguration.create()
// 创建链接
val conn = ConnectionFactory.createConnection(hbaseconf)
// 指定表
val table: HTable = new HTable(hbaseconf, Bytes.toBytes(tablename))
// 提交事务,插入数据
table.put(put)

5. Hive 外部表关联 HBase, impala 映射查询

Hive做HBase的外部关联,需提前定义好列字段,而通常HBase的列都是无限扩展的,所以通过Hive外部映射HBase,只能处理一些日常的查询需求。

5.1 Hive 外部银映射 HBase:

CREATE EXTERNAL TABLE hive_external_HBase(
key string,
time string,
`_track_id` string,
)    
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'    
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,search:time,search:_track_id")    
TBLPROPERTIES("hbase.table.name" = "HBase_table_name");

语法与创建Hive基本一致,需要注意的是hive字段不支持特殊字符如$_*&等开头,需加转义符。

最后,Hive有时候查询的速度并不能达到我们的想象,再做以不impala映射,用impala的查询引擎,会明显快很多:

INVALIDATE METADATA;

总结

flume+kafka+spark stream+hbase是目前比较常用的组合,相信对这种组合存疑的有不少,下面稍微总结下:

  • 为什么不用kafka直接接收源数据,而用flume作为Kafka的源?

从配置方面讲,flume提供了多种源接收方式,且只需做简单的配置即可,灵活的多种源配置也方便后续的收集扩展,kafka作为源会比flume稍微麻烦点,需在前面写一层生产者,实际上cloudera官方也建议,当存在多给消费者时,用kafka会更好,当存在多个多种生产者时,用flume会更加方便,同时,如果并发很高,可以采用kafka做flume的channel。

  • 为什么用spark stream作为kafka的消费者而不是其他?

就目前spark stream的性能来看,spark stream还不能完全称之为实时流处理,更合适的叫法应该是准实时批处理,但是由于其最低延迟可以达到秒级,基本满足了大部分系统需要,对于对实时性要求不高的可以胜任,同时Spark stream内部封装的仍是Spark RDD结构,对于熟悉spark家族的开发者会更友好,且相应的处理解决方案会更多更成熟。另外Storm也是目前spark stream外比较流行的流处理,其实时性比spark stream更高,但属于spark体系外,要求相关开发者具备的能力会更高,所以可以根据不同场景和技术体系,做相应选择。

  • 为什么是入到hbase而不是其他Nosql?

无他,HBase是目前Hadoop家族里BigTable最完善的,列式存储结构最成熟的方案。

以上是关于flume kafka spark读取日志延迟的主要内容,如果未能解决你的问题,请参考以下文章

大数据篇:flume+kafka+spark stream+hbase做日志收集

使用 apache spark 流式处理实时日志

Spark Streaming基于Spark Streaming&Flume&Kafka打造通用流处理平台

Spark Streaming基于Spark Streaming&Flume&Kafka打造通用流处理平台

Spark Streaming基于Spark Streaming&Flume&Kafka打造通用流处理平台

flume 读取kafka 数据