实时事件统计项目:优化flume:用file channel代替mem channel

Posted Code Job

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实时事件统计项目:优化flume:用file channel代替mem channel相关的知识,希望对你有一定的参考价值。

背景:利用kafka+flume+morphline+solr做实时统计

solr从12月23号开始一直没有数据。查看日志发现,因为有一个同事加了一条格式错误的埋点数据,导致大量error。

据推断,是因为使用mem channel占满,消息来不及处理,导致新来的数据都丢失了。

image

修改flume使用file channel:

kafka2solr.sources = source_from_kafka
kafka2solr.channels = file_channel
kafka2solr.sinks = solrSink

# For each one of the sources, the type is defined  
kafka2solr.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
kafka2solr.sources.source_from_kafka.channels = file_channel
kafka2solr.sources.source_from_kafka.batchSize = 100
kafka2solr.sources.source_from_kafka.useFlumeEventFormat=false
kafka2solr.sources.source_from_kafka.kafka.bootstrap.servers= kafkanode0:9092,kafkanode1:9092,kafkanode2:9092
kafka2solr.sources.source_from_kafka.kafka.topics = eventCount
kafka2solr.sources.source_from_kafka.kafka.consumer.group.id = flume_solr_caller
kafka2solr.sources.source_from_kafka.kafka.consumer.auto.offset.reset=latest

# file channel  
kafka2solr.channels.file_channel.type = file
kafka2solr.channels.file_channel.checkpointDir = /var/log/flume-ng/checkpoint
kafka2solr.channels.file_channel.dataDirs = /var/log/flume-ng/data


kafka2solr.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
kafka2solr.sinks.solrSink.channel = file_channel
#kafka2solr.sinks.solrSink.batchSize = 1000
#kafka2solr.sinks.solrSink.batchDurationMillis = 1000
kafka2solr.sinks.solrSink.morphlineFile = morphlines.conf
kafka2solr.sinks.solrSink.morphlineId=morphline1
kafka2solr.sinks.solrSink.isIgnoringRecoverableExceptions=true

使得数据持久化到磁盘不会丢失。

以上是关于实时事件统计项目:优化flume:用file channel代替mem channel的主要内容,如果未能解决你的问题,请参考以下文章

Flume实时监控目录sink到hdfs

Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PVUV+展示

Kafka Source 的 Flume 不使用 file_roll 写入事件

大数据实战Flume+Kafka+Spark+Spring Boot 统计网页访问量项目

Banana:Solr的Kibana

Spark Streaming实时处理应用