flume+kafka集群解决某著名联锁大型超市超过25年POS线下收单系统变实时系统的典型案例

Posted TGITCIC

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flume+kafka集群解决某著名联锁大型超市超过25年POS线下收单系统变实时系统的典型案例相关的知识,希望对你有一定的参考价值。

 楔子

本篇所讲为真正的kafka集群环境的全搭建以及应用端如何集成集群环境和包括生产环境中kafka在大交易量情况下又不能停机如何修复各类核心故障之用。

目前网上对于kafka的论述和博客,绝大部分都是错误的和没用的!没错!大家不要惊讶,那些集群搭完后都是假的,根本没有生产价值!目前网上一共分成5大版本,上千篇博文,都是从这5个“母版”中复刻出来的,本身的“母版”就是假的,根不要说复刻的呢!

在当时因为是2017年左右,我们还没有出现flink,在当时流行的流批一体用的是flume。但这不影响我们的流批一体相关架构的应用,核心思想还是使用流批一体技术去解决实际应用中的场景。对于这个大型商场来说,系统存在达25年以上,全国多达300多家大型超市,总部还有上千个超市。你不能上来说:为了实时交易、实时促销的改造把一个25年+的系统整个推翻重造?这里面有太多非技术问题。

因此此时我们需要“无痛的手法”,这个无痛的手法我们用的正是流批一体化技术去做的改造。当然各位在看完这个案例后完全也可以使用flink重写这个方案。

本篇的base版本就是国内某大型超市在上海徐汇的旗舰店秒级10万tps交易文件吞吐量的蓝本以及相应的生产环境为核心原理。要知道上海和国内有不少超大型超市即:hyper(每个店超过8,000平米)这种大店,几乎有90%用的POS线下收银都来自于同一家世界著名的POS软硬件供应商。因此在我们完成了这种无痛的改造并把原来一笔订单从生成到落回总部主数据需要t+1天变成1秒内后在业界做了分享,该方案还被其它3家超级百货集团(规模都在:超百家门店、上千个POS)所学习和取经,而这个方案至今被其它几家超级百货集团延用至今。下面开讲!

整体架构

某大型超市内的kafka群

特点:

  • 多节点
  • 每个节点多幅本(质数)
  • 超高效
  • 超安全(任意节点死菜,不管你怎么折腾,只要不要把所有节点折腾死,所有消息会给你保留)

实现目标

  • 搭建3个节点kafka;
  • 建立一个topic(消息队列名),每个topic对进入和发布的消息保持至少3个幅本;
  • 建立一个spring boot+spring kafka的应用,连接着kafka;
  • 使用超过万条信息,持续不断的“喂入”kafka群;
  • 消息在发送和消费时,任意杀一个kafka节点,为了真实模拟生产宕机,我们会直接使用kill -9命令不会影响消息的发送与接收,同时消息不会丢失且被完整消费;
  • 恢复被杀的节点,观察节点自动被kafka集群加回集群;

搭建kafka集群环境

硬件环境条件

如果你豪,用5台实际ip地址不同的虚机。每个节点的配置为:

  • hd:30-50gb,ssd
  • cpu:4c及以上
  • 内存:至少4gb
  • 操作系统:centos7.4及以上

如果你穷,用单台布署3个kafka服务,这个可不是模拟,而是真的开了3个kafka的后台生产级进程,只是ip一样端口不同、数据文件不同、配置也不同啦,配置为:

  • hd:必须为ssd,250-300gb笔记本或者是实体机
  • cpu:i7
  • 内存:16gb~24gb
  • 操作系统:centos7.4及以上或者是mac catalina及以上

开始搭建kafka集群

考虑到广大码农的迫切需要,我以“穷”环境来完成本篇内容。

安装kafka

  • kafka版本-2.11
  • flume版本-1.9.0(用于喂大量数据,因为自己虽然可以写并发线程的producer,但是效果不如一下生成万级别的数据文件喂给kafka,让flume的sink来成为你最好的kafka producer调用端来的“暴力”)

kafka-2.11下载后解压后成这样的目录结果。

第1步:在kafka下建立3个目录

分别命名为:kafka-log0, kafka-log1, kafka-log2 。 它们就是kafka的数据文件所在地方。

小Tips: kafka集群中的物理与逻辑关系

kafka的数据文件摆放是按照这样的逻辑关系的:

--------broker(kafka服务器节点)

             --------topic(消息名,相当于队列)

                          --------*.index *.log(数据文件)

因为我们是在本机搭建3个broker,所以我建了3个log文件夹。kafka的broker id是从0开始的,因此我们的数据文件夹以0,1,2来命令就是这个道理。

然后我们的实现为:

  1. 3个broker,一个topic;
  2. topic存成3个副本;
  3. 每个副本再分摊到3个节点中去

这边红色加粗的最后一句话是核心中的核心,如果没有这句话,你完成了前面那两句话那就是“假集群”,就算你的应用连上了这样的“假集群”,只要有一个broker挂了,你的应用也是无法做到“热切”的,整个应用此时会hang住、卡死、直到老板拿把双丽人的刀过来找你!这就是我说的无论网上那一千多篇、5大类的kafka说来说去,都没有做到第3句话。因为网上写博客的人根本没有经历过万级tps这种生产级别环境,因为在这方面的实践的真实案例还真不多,充其量那些博客不过是玩家!

废话不多,我们继续搭建。

第2步:在kafka-2.11的config目录下建立3个properties文件,它们分别是:

  1. server0.properties-它对应着第1个完整的broker
  2. server1.properties-它对应着第2个完整的broker
  3. server2.properties-它对应着第3个完整的broker

如果你是“豪”的办法,那么每个broker只要一个server.properties就行了,因为本身应是物理分开的,我们这边“穷”,因此把properties用名字给区分开来。

以下是3个properties文件的各自内容,区别点只在于以下几个字段对应的value不同:

  • broker.id=#每个broker的id都必须为数字且不能重复且要从0开始计算
  • listeners=PLAINTEXT://127.0.0.1:9092 #每个broker的本身地址
  • port=9092 #每个broker的kafka服务端口
  • advertised.listeners=PLAINTEXT://127.0.0.1:9092 #每个broker的对外部应用如springboot应用的地址
    host.name=127.0.0.1 #broker所在的主机名

注:如果是分在不同的物理不同的vm或者是服务器上,端口号可以一样,因为你有物理ip进行隔离了。

server0.properties

broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://127.0.0.1:9092
port=9092
advertised.listeners=PLAINTEXT://127.0.0.1:9092
host.name=127.0.0.1
 
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
 
socket.request.max.bytes=104857600
 
 
log.dirs=/Users/apple/opt/kafka2.11/kafka-log0
num.partitions=3
 
num.recovery.threads.per.data.dir=1
 
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
 
 
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
 
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
zookeeper.connection.timeout.ms=6000
 
group.initial.rebalance.delay.ms=0
 
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3

server1.properties

broker.id=1
delete.topic.enable=true
listeners=PLAINTEXT://127.0.0.1:9093
port=9093
advertised.listeners=PLAINTEXT://127.0.0.1:9093
host.name=127.0.0.1
 
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
 
socket.request.max.bytes=104857600
 
 
log.dirs=/Users/apple/opt/kafka2.11/kafka-log1
num.partitions=3
 
num.recovery.threads.per.data.dir=1
 
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
 
 
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
 
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
zookeeper.connection.timeout.ms=6000
 
group.initial.rebalance.delay.ms=0
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3

server2.properties

broker.id=2
delete.topic.enable=true
listeners=PLAINTEXT://127.0.0.1:9094
port=9094
advertised.listeners=PLAINTEXT://127.0.0.1:9094
host.name=127.0.0.1
 
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
 
socket.request.max.bytes=104857600
 
 
log.dirs=/Users/apple/opt/kafka2.11/kafka-log2
num.partitions=3
 
num.recovery.threads.per.data.dir=1
 
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
 
 
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
 
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
zookeeper.connection.timeout.ms=6000
 
group.initial.rebalance.delay.ms=0
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3

配置解说

第1个配置:zookeeper.connect=localhost:2181,localhost:2182,localhost:2183,你在搭建kafka时必须先要有至少3个节点的zookeeper,kafka是把topic、consumer(消息消费端)以及相应的replic机制给注册到zookeeper中来使用的,因此此处你必须写上zookeeper群的名字(为什么写3个节点,因为万1有一个zookeeper节点挂了而你只配了一个zk节点那么你的kafka也挂了)。

第2个配置:声明集群的复制方案,这块很重要,如果你不是在生产配置前声明好,生产一旦进行运行,要不你找个时间down机重配置重启,要么就用命令在生产上结合一个暴长无比、苦涩无比的json去做动态的+自己观察和计算offset的调整吧

log.dirs=/Users/apple/opt/kafka2.11/kafka-log0
num.partitions=3

记得,在单机环境下搭建,每个broker要指向它本身的数据文件夹所在,同时加上分区分成3个,这个数字对应broker的数量为最佳实践。

第3个配置:网上没有的教程中真正起到应用和集群连同实现“热切”功能

offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3

没错,就这3个参数,它定义了consumer也分成3个幅本。这是网上所有教程都没有说到的点。因为如果你只是把partition分摊到了3个主机上,而连接着你的kafka群的应用的consumer只有1个(不设这3个参数,默认就是consumer1个分区),那么当kafka群中任意一个分版挂了而此时你的consumer连着的正好是这一个分片的话那么你的整个应用就会hang住进而引起“雪崩效应”。为了要让consumer即应用实现“热切”,因此我们要把consumer的这个“脚”也长到3个broker机器上。

所以此处的结论为:broker分片数量=topic的partion分片数量=consumer分片数量。你不用管它哪片分到哪片,kafka用的是leader选举机制,在>=3个节点的情况下它以最优算法来做分片的,取决条件为:单个分片kafka broker机器上的流量、cpu、内存这些值来自动决定的,你要做的就是告诉它要分几片。

第四个参数:设定Topic为可以直接删除

delete.topic.enable=true

如果这个值不设成True,当你使用下面这条命令删Topic时,它不会删除,只会把它打上一个删除标记。

./bin/kafka-topics --delete --zookeeper 127.0.0.1:2181 --topic pos-transaction

在这种场景下,你要删掉Topic必须要到zookeeper里去手动删,下面给出去zookeeper里手动删的命令

cd /opt/zookeeper/bin
./zkCli.sh
rmr /brokers/topics/pos-transaction

不合理场景1:

3个broker,每个topic分成了3个partion,1个consumer分片,这样的架构那么结果就是假集群,服务集群了生产端producer端在任意一个节点挂了的时候可以不受影响而consumer端此时无法做热切,因此说:这叫假集群。

不合理场景2:

3个broker,每个topic分成了5个partition,3个consumer分片,这样的架构那么结果就是由于只有3个broker,而每个topic却分了5个partition这会造成访问的不均,有些topic会受到过多的压力。

不合理场景3:

2个broker,每个topic分成了3个partition,3个consumer分片,这样的架构那么结果就是假集群,因为kafka是leader的3选1机制,当一个broker挂了,需要有一个观察者来决定现存brokers中哪一个可以顶替leader。现在一个leader已经挂了,另一个不知道该选谁,选自己?

启动集群

请在kafka的bin目录下,使用命令如下一个个启动kafka的3个broker。

启动0号broker

./kafka-server-start.sh -daemon ../config/server0.properties

启动1号broker

./kafka-server-start.sh -daemon ../config/server1.properties

启动2号broker

./kafka-server-start.sh -daemon ../config/server2.properties

使用和观察集群

先创建一个叫pos-transaction的topic,让它分布到3个broker上。

./kafka-topics.sh --create --zookeeper 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 --replication-factor 3 --partitions 3 --topic pos-transaction

观察这个topic是否分布到了3个broker上了

./kafka-topics.sh --describe --zookeeper 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 --topic pos-transaction

这个列表我们可以这样解读,单个topic分布:

  • partitioncount: 分区数量
  • replicationFactor: 副本分布在哪些partition上,这边可以看到我们的幅本被平均分在了0,1,2三个partition中
  • leader: 每个partition的leader分布情况,比如说partition 0的leader目前是0,如果partition 0挂了,它的leader会随机被分到还可用的broker上
  • Isr: 同步值,Kafka在启动的时候会开启两个与ISR相关的定时任务,名称分别为“isr-expiration"和”isr-change-propagation".。isr-expiration任务会周期性的检测每个分区是否需要缩减其ISR集合。这个周期和“replica.lag.time.max.ms”参数有关。大小是这个参数一半。默认值为5000ms,当检测到ISR中有是失效的副本的时候,就会缩减ISR集合。如果某个分区的ISR集合发生变更, 则会将变更后的数据记录到ZooKerper对应/brokers/topics//partition//state节点中。节点中数据示例如下:

    “controller_cpoch":26,“leader”:0,“version”:1,“leader_epoch”:2,“isr”:0,1
    说白点就是当多个broker挂了后,kafka集群会根据“最优算法”自动重新计算节点扫描的时间。我们这边来试一下,杀掉broker0,然后再来看一下这个值的变化吧:

手动杀broker0

ps -ef |grep server0.properties

 

对比上面一张图,是不是这边0没了?因为0被杀了。

需要高度注意的几个地方:

  1. server.properties配置文件内的所有的地址、主机名要么是ip要么是主机名。就连./kafka-topics.sh --create命令后跟的zk地址也必须完全一致;
  2. topic名不得为各种符号和汉子除“-”英数短中间模杠可以用如“pos-transaction”,如果你是“pos_transaction”,你会得到一个警告,因此可以用横杠但不要用下划线;

刚才我们停掉了server0,现在我们重新把server0启动起来。

./kafka-server-start.sh ../config/server0.properties
./kafka-topics.sh --describe --zookeeper 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 --topic pos-transaction

我们可以看到,这边的replication不平了,当然这个是没有问题的,因为我们的consumer_offset也是3个副本,但是这时有一个kafka节点的leader需要管2个partition共6个幅本(2*3个副本)的活,如果你是在高并发环境下,这样会比较“累”,那么我们可以对生产时的leader进行动态分配,这个操作是无感的,而且速度很快。 

./kafka-preferred-replica-election.sh --zookeeper 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

看到很长一陀后,我们再次运行命令describe 

./kafka-topics.sh --describe --zookeeper 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 --topic pos-transaction

看,配平了。这条命令可以在大量数据在进入kafka和被消费时热运行,完全不用担心,kafka内有机制,会在那几个纳秒或者是毫秒内在没有数据进入相应调整分片时迅速完成调整。

到此为止,kafka集群我们搭好了,现在要进入激动人心的应用端如何连接kafka集群的设置了。

使用flume作为kafka的producer

首先,我们不自己写producer,因为自己写producer的话可能达不到我们需要模拟的大并发量,因此我们使用flume直接用来做producer来充分作kafka集群的压力端。

同时,这篇文章为日后需要使用flume来抽一些文本文件如传统商超、金融领域的批处理交易文件需要在不伤筋动骨的前提下改成real time交易做技术预留。

使用flume前的小改造

为什么需要对flume进行改造呢?因为传统零售或者是金融类企业的交易文件的吐出都为c或者是单片机所写,它们这些系统在生成文件时存在一个特点:排队、吐的慢。

如果当我们在设备还在吐文件时发现已经有了这个文件就要去用flume抽取时,就会发生文件读取不完整而导致一笔交易流水出错的情况。但是flume本身就是一个“实时流”的操作过程,它一扫描到固定的目录内有文件了它就会自动去抽取该文件,并把它送到flume的sink端。

那么一边要做到实时在文件生成后就抽取一边还要保证文件的完整性,好在flume是开源免费的,我们可以直接继承它的源码进行改造。

flume实时文件抽取的核心功能叫“spooldir”,它继承自AbstractSource,实现了flume的Configurable和EventDrivenSource两个接口。所以,我们可以直接自定义一个spooldir同样实现相应的继承和接口即可,这样我们也不用动flume的原始spooldir同时又实现了“插件”式的功能,flume是具有插件功能的。为此我们改造了flume,布署了一个自己的插件

下面给出原码

请记得使用openjdk1.8,编译器用java 1.8 compiler。

注:该代码已经在某大型超市全国300家大店的近10,000个pos上,以及全球总部的近10万台pos上实现和运行了4年多,每天吐量在15个kafka集群下达到10万+tps,单店每天上下行4个gb。本次放出的版本为在此版本上进一步的2.0加强版,增加了吞吐和相应的性能优化以及log日志内容的精细化定位和循环算法的进一步优化。

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>org.sky.platform.retails</groupId>
	<artifactId>sky-pos-agent</artifactId>
	<version>0.0.1</version>
	<dependencies>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>2.0.0-cdh4.1.0</version>
		</dependency>

		<!-- For unit testing -->
		<dependency>
			<groupId>org.apache.mrunit</groupId>
			<artifactId>mrunit</artifactId>
			<version>0.8.0-incubating</version>
		</dependency>
		<!-- <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> 
			<version>1.4.0-cdh4.5.0</version> </dependency> -->
		<dependency>
			<groupId>org.apache.flume</groupId>
			<artifactId>flume-ng-core</artifactId>
			<version>1.9.0</version>
		</dependency>
	</dependencies>

	<build>
		<!-- Source directories are set to use existing Ant setup rather than Maven 
			defaults. Ideally we should change move source directories to follow the 
			standard maven layout, then we don't have to override those default values. -->
		<sourceDirectory>src/main/java</sourceDirectory>
		<testSourceDirectory>src/test/java</testSourceDirectory>
		<plugins>
			<plugin>
				<!-- Site plugin is set up to work in the Maven3 way. (stolen from http://www.wakaleo.com/blog/292-site-generation-in-maven-3 
					) -->
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-site-plugin</artifactId>
				<version>3.0-beta-2</version>
				<configuration>
					<reportPlugins>
						<plugin>
							<groupId>org.apache.maven.plugins</groupId>
							<artifactId>maven-project-info-reports-plugin</artifactId>
							<!--version>2.2</version -->
							<configuration>
								<dependencyDetailsEnabled>false</dependencyDetailsEnabled>
								<dependencyLocationsEnabled>false</dependencyLocationsEnabled>
							</configuration>
							<!--reports> <report>foo</report> <report>toto</report> </reports -->
						</plugin>
						<plugin>
							<groupId>org.codehaus.mojo</groupId>
							<artifactId>findbugs-maven-plugin</artifactId>
							<version>2.3.2</version>
							<configuration>
								<effort>High</effort>
								<threshold>Low</threshold>

							</configuration>
						</plugin>
						<plugin>
							<groupId>org.apache.maven.plugins</groupId>
							<artifactId>maven-javadoc-plugin</artifactId>
							<version>2.7</version>
						</plugin>
						<plugin>
							<groupId>org.apache.maven.plugins</groupId>
							<artifactId>maven-jxr-plugin</artifactId>
							<version>2.1</version>
							<configuration>
								<aggregate>true</aggregate>
							</configuration>
						</plugin>
						<plugin>
							<groupId>org.apache.maven.plugins</groupId>
							<artifactId>maven-surefire-report-plugin</artifactId>
							<version>2.6</version>
						</plugin>
						<plugin>
							<groupId>org.codehaus.mojo</groupId>
							<artifactId>cobertura-maven-plugin</artifactId>
							<version>2.4</version>
							<configuration>
								<formats>
									<format>xml</format>
									<format>html</format>
								</formats>
							</configuration>
						</plugin>
						<plugin>
							<groupId>org.apache.maven.plugins</groupId>
							<artifactId>maven-checkstyle-plugin</artifactId>
							<version>2.8</version>
						</plugin>
						<plugin>
							<artifactId>maven-release-plugin</artifactId>
							<version>2.0-beta-7</version>
							<configuration>
							</configuration>
						</plugin>
					</reportPlugins>
				</configuration>
			</plugin>
			<plugin>
				<!-- and generate a second JAR with dependencies -->
				<artifactId>maven-assembly-plugin</artifactId>
				<version>2.3</version>
				<configuration>
					<classifier>dist</classifier>
					<appendAssemblyId>true</appendAssemblyId>
					<descriptorRefs>
						<descriptor>jar-with-dependencies</descriptor>
					</descriptorRefs>

				</configuration>
				<executions>
					<execution>
						<id>make-assembly</id>
						<phase>package</phase>
						<goals>
							<goal>single</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-jar-plugin</artifactId>
				<version>2.4</version>
				<configuration>
				</configuration>
			</plugin>
		</plugins>
		<resources>
			<resource>
				<directory>src/main/resources </directory>
			</resource>
		</resources>
	</build>

	<repositories>
		<repository>
			<id>maven-hadoop</id>
			<name>Hadoop Releases</name>
			<url>https://repository.cloudera.com/content/repositories/releases/</url>
		</repository>
		<repository>
			<id>cloudera-repos</id>
			<name>Cloudera Repos</name>
			<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
		</repository>
	</repositories>
</project>

RobustSpoolDirectorySource.java

这个类整个就是继承了flume的AbstractSource进行的重构。

package org.sky.platform.retails.plugin.spooldir;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.serialization.LineDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;

import static org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.*;
import org.apache.flume.source.AbstractSource;

/**
 * Created by mk on 17-06-25.
 */
public class RobustSpoolDirectorySource extends AbstractSource implements Configurable, EventDrivenSource 
	private static final Logger logger = LoggerFactory.getLogger(RobustReliableSpoolingFileEventReader.class);
	private static final int POLL_DELAY_MS = 100;

	/* Config options */
	private String completedSuffix;
	private String spoolDirectory;
	private boolean fileHeader;
	private String fileHeaderKey;
	private int batchSize;
	private String ignorePattern;
	private String trackerDirPath;
	private String deserializerType;
	private Context deserializerContext;
	private String deletePolicy;
	private String inputCharset;
	private int fileModifiedInterval;
	private boolean intervalChange = false;
	// private String transactionFileCompletedSuffix = ".null";
	private SourceCounter sourceCounter;
	RobustReliableSpoolingFileEventReader reader;

	@Override
	public void start() 
		logger.info("RobustSpoolDirectorySource start()");

		ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
		File directory = new File(spoolDirectory);
		try 
			reader = new RobustReliableSpoolingFileEventReader.Builder().spoolDirectory(directory)
					.completedSuffix(completedSuffix).ignorePattern(ignorePattern).trackerDirPath(trackerDirPath)
					.annotateFileName(fileHeader).fileNameHeader(fileHeaderKey).deserializerType(deserializerType)
					.deserializerContext(deserializerContext).deletePolicy(deletePolicy).inputCharset(inputCharset)
					.build();
		 catch (IOException ioe) 
			throw new FlumeException("Error instantiating spooling event parser", ioe);
		

		Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter);
		executor.scheduleWithFixedDelay(runner, 0, POLL_DELAY_MS, TimeUnit.MILLISECONDS);

		super.start();
		logger.debug("SpoolDirectorySource source started");
		sourceCounter.start();
	

	@Override
	public void stop() 
		super.stop();
		sourceCounter.stop();
		logger.info("SpoolDir source  stopped. Metrics: ", getName(), sourceCounter);
	

	@Override
	public void configure(Context context) 
		spoolDirectory = context.getString(SPOOL_DIRECTORY);
		Preconditions.checkState(spoolDirectory != null, "Configuration must specify a spooling directory");

		completedSuffix = context.getString(SPOOLED_FILE_SUFFIX, DEFAULT_SPOOLED_FILE_SUFFIX);
		deletePolicy = context.getString(DELETE_POLICY, DEFAULT_DELETE_POLICY);
		fileHeader = context.getBoolean(FILENAME_HEADER, DEFAULT_FILE_HEADER);
		fileHeaderKey = context.getString(FILENAME_HEADER_KEY, DEFAULT_FILENAME_HEADER_KEY);
		batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);
		inputCharset = context.getString(INPUT_CHARSET, DEFAULT_INPUT_CHARSET);
		ignorePattern = context.getString(IGNORE_PAT, DEFAULT_IGNORE_PAT);
		trackerDirPath = context.getString(TRACKER_DIR, DEFAULT_TRACKER_DIR);
		deserializerType = context.getString(DESERIALIZER, DEFAULT_DESERIALIZER);
		deserializerContext = new Context(context.getSubProperties(DESERIALIZER + "."));
		fileModifiedInterval = context.getInteger("fileModifiedIntervalMS", 600000);
		intervalChange = context.getBoolean("intervalChange", false);
		// transactionFileCompletedSuffix =
		// context.getString("transactionFileCompletedSuffix");
		// "Hack" to support backwards compatibility with previous generation of
		// spooling directory source, which did not support deserializers
		Integer bufferMaxLineLength = context.getInteger(BUFFER_MAX_LINE_LENGTH);
		if (bufferMaxLineLength != null && deserializerType != null
				&& deserializerType.equalsIgnoreCase(DEFAULT_DESERIALIZER)) 
			deserializerContext.put(LineDeserializer.MAXLINE_KEY, bufferMaxLineLength.toString());
		
		if (sourceCounter == null) 
			sourceCounter = new SourceCounter(getName());
		
	

	private class SpoolDirectoryRunnable implements Runnable 
		private RobustReliableSpoolingFileEventReader reader;
		private SourceCounter sourceCounter;

		public SpoolDirectoryRunnable(RobustReliableSpoolingFileEventReader reader, SourceCounter sourceCounter) 
			this.reader = reader;
			this.sourceCounter = sourceCounter;
		

		@Override
		public void run() 
			try 
				while (true) 
					// List<Event> events = reader.readEvents(batchSize, fileModifiedInterval,
					// intervalChange);
					List<Event> events = reader.readEvents(batchSize);
					if (events.isEmpty()) 
						break;
					
					sourceCounter.addToEventReceivedCount(events.size());
					sourceCounter.incrementAppendBatchReceivedCount();

					getChannelProcessor().processEventBatch(events);
					reader.commit();
					sourceCounter.addToEventAcceptedCount(events.size());
					sourceCounter.incrementAppendBatchAcceptedCount();
				
			 catch (Throwable t) 
				logger.error("Uncaught exception in Runnable", t);
				if (t instanceof Error) 
					throw (Error) t;
				
			
		
	

RobustReliableSpoolingFileEventReader.java

关键的改造点在这边:

首先,在RobustReliableSpoolingFileEventReader.java的private Optional<FileInfo> getNextFile(final int interval, boolean intervalChange) 方法中

增加了一堆逻辑,这个逻辑干的是这样的一件事。

和相应的pos或者是金融的前置机、atm机或者是医院的x光机的硬件厂商说,要他们在吐完文件后多吞一个文件,文件结尾叫“.end”,然后flume的spool机制(生成文件即取走)在看到有文件吐出时,还需要多判断一下,同样的文件名是否伴随着一个叫”.end“的文件名?

如果有”.end“文件,那么该文件会被抽取内容,抽取完后该文件会被改成.COMPLETED文件以避免被重取,因为根据代码逻辑,.COMPLETED的文件是不会被消费的

RobustReliableSpoolingFileEventReader全代码

package org.sky.platform.retails.plugin.spooldir;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.io.Files;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.IOFileFilter;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.client.avro.ReliableEventReader;
import org.apache.flume.serialization.*;
import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants;
import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.ConsumeOrder;
import org.apache.flume.tools.PlatformDetect;
import org.sky.platform.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileFilter;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.*;
import java.util.regex.Pattern;
import java.util.ArrayList;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class RobustReliableSpoolingFileEventReader implements ReliableEventReader 
	private static final Logger logger = LoggerFactory.getLogger(RobustReliableSpoolingFileEventReader.class);

	static final String metaFileName = ".flumespool-main.meta";

	private final File spoolDirectory;
	private final String completedSuffix;
	private final String deserializerType;
	private final Context deserializerContext;
	private final Pattern ignorePattern;
	private final File metaFile;
	private final boolean annotateFileName;
	private final boolean annotateBaseName;
	private final String fileNameHeader;
	private final String baseNameHeader;
	private final String deletePolicy;
	private final Charset inputCharset;
	private final DecodeErrorPolicy decodeErrorPolicy;
	private final ConsumeOrder consumeOrder;

	private Optional<FileInfo> currentFile = Optional.absent();
	/** Always contains the last file from which lines have been read. **/
	private Optional<FileInfo> lastFileRead = Optional.absent();
	private boolean committed = true;

	/** Instance var to Cache directory listing **/
	private Iterator<File> candidateFileIter = null;
	private int listFilesCount = 0;

	/**
	 * Create a ReliableSpoolingFileEventReader to watch the given directory.
	 */
	private RobustReliableSpoolingFileEventReader(File spoolDirectory, String completedSuffix, String ignorePattern,
			String trackerDirPath, boolean annotateFileName, String fileNameHeader, boolean annotateBaseName,
			String baseNameHeader, String deserializerType, Context deserializerContext, String deletePolicy,
			String inputCharset, DecodeErrorPolicy decodeErrorPolicy, ConsumeOrder consumeOrder) throws IOException 

		// Sanity checks
		Preconditions.checkNotNull(spoolDirectory);
		Preconditions.checkNotNull(completedSuffix);
		Preconditions.checkNotNull(ignorePattern);
		Preconditions.checkNotNull(trackerDirPath);
		Preconditions.checkNotNull(deserializerType);
		Preconditions.checkNotNull(deserializerContext);
		Preconditions.checkNotNull(deletePolicy);
		Preconditions.checkNotNull(inputCharset);

		// validate delete policy
		if (!deletePolicy.equalsIgnoreCase(DeletePolicy.NEVER.name())
				&& !deletePolicy.equalsIgnoreCase(DeletePolicy.IMMEDIATE.name())) 
			throw new IllegalArgumentException(
					"Delete policies other than " + "NEVER and IMMEDIATE are not yet supported");
		

		if (logger.isDebugEnabled()) 
			logger.debug("Initializing  with directory=, metaDir=, " + "deserializer=",
					new Object[]  RobustReliableSpoolingFileEventReader.class.getSimpleName(), spoolDirectory,
							trackerDirPath, deserializerType );
		

		// Verify directory exists and is readable/writable
		Preconditions.checkState(spoolDirectory.exists(),
				"Directory does not exist: " + spoolDirectory.getAbsolutePath());
		Preconditions.checkState(spoolDirectory.isDirectory(),
				"Path is not a directory: " + spoolDirectory.getAbsolutePath());

		// Do a canary test to make sure we have access to spooling directory
		try 
			File canary = File.createTempFile("flume-spooldir-perm-check-", ".canary", spoolDirectory);
			Files.write("testing flume file permissions\\n", canary, Charsets.UTF_8);
			List<String> lines = Files.readLines(canary, Charsets.UTF_8);
			Preconditions.checkState(!lines.isEmpty(), "Empty canary file %s", canary);
			if (!canary.delete()) 
				throw new IOException("Unable to delete canary file " + canary);
			
			logger.debug("Successfully created and deleted canary file: ", canary);
		 catch (IOException e) 
			throw new FlumeException(
					"Unable to read and modify files" + " in the spooling directory: " + spoolDirectory, e);
		

		this.spoolDirectory = spoolDirectory;
		this.completedSuffix = completedSuffix;
		this.deserializerType = deserializerType;
		this.deserializerContext = deserializerContext;
		this.annotateFileName = annotateFileName;
		this.fileNameHeader = fileNameHeader;
		this.annotateBaseName = annotateBaseName;
		this.baseNameHeader = baseNameHeader;
		this.ignorePattern = Pattern.compile(ignorePattern);
		this.deletePolicy = deletePolicy;
		this.inputCharset = Charset.forName(inputCharset);
		this.decodeErrorPolicy = Preconditions.checkNotNull(decodeErrorPolicy);
		this.consumeOrder = Preconditions.checkNotNull(consumeOrder);

		File trackerDirectory = new File(trackerDirPath);

		// if relative path, treat as relative to spool directory
		if (!trackerDirectory.isAbsolute()) 
			trackerDirectory = new File(spoolDirectory, trackerDirPath);
		

		// ensure that meta directory exists
		if (!trackerDirectory.exists()) 
			if (!trackerDirectory.mkdir()) 
				throw new IOException("Unable to mkdir nonexistent meta directory " + trackerDirectory);
			
		

		// ensure that the meta directory is a directory
		if (!trackerDirectory.isDirectory()) 
			throw new IOException("Specified meta directory is not a directory" + trackerDirectory);
		

		this.metaFile = new File(trackerDirectory, metaFileName);
		if (metaFile.exists() && metaFile.length() == 0) 
			deleteMetaFile();
		
	

	@VisibleForTesting
	int getListFilesCount() 
		return listFilesCount;
	

	/**
	 * Return the filename which generated the data from the last successful
	 * @link #readEvents(int) call. Returns null if called before any file
	 * contents are read.
	 */
	public String getLastFileRead() 
		if (!lastFileRead.isPresent()) 
			return null;
		
		return lastFileRead.get().getFile().getAbsolutePath();
	

	// public interface
	public Event readEvent() throws IOException 
		List<Event> events = readEvents(1);
		if (!events.isEmpty()) 
			return events.get(0);
		 else 
			return null;
		
	

	public List<Event> readEvents(int numEvents) throws IOException 
		if (!committed) 
			if (!currentFile.isPresent()) 
				throw new IllegalStateException("File should not roll when " + "commit is outstanding.");
			
			logger.info("Last read was never committed - resetting mark position.");
			currentFile.get().getDeserializer().reset();
		 else 
			// Check if new files have arrived since last call
			if (!currentFile.isPresent()) 
				currentFile = getNextFile();
			
			// Return empty list if no new files
			if (!currentFile.isPresent()) 
				return Collections.emptyList();
			
		

		EventDeserializer des = currentFile.get().getDeserializer();
		List<Event> events = des.readEvents(numEvents);

		/*
		 * It's possible that the last read took us just up to a file boundary. If so,
		 * try to roll to the next file, if there is one. Loop until events is not empty
		 * or there is no next file in case of 0 byte files
		 */
		while (events.isEmpty()) 
			logger.info("Last read took us just up to a file boundary. Rolling to the next file, if there is one.");
			retireCurrentFile();
			currentFile = getNextFile();
			if (!currentFile.isPresent()) 
				return Collections.emptyList();
			
			events = currentFile.get().getDeserializer().readEvents(numEvents);
		

		if (annotateFileName) 
			String filename = currentFile.get().getFile().getAbsolutePath();
			for (Event event : events) 
				event.getHeaders().put(fileNameHeader, filename);
			
		

		if (annotateBaseName) 
			String basename = currentFile.get().getFile().getName();
			for (Event event : events) 
				event.getHeaders().put(baseNameHeader, basename);
			
		

		committed = false;
		lastFileRead = currentFile;
		return events;
	

	@Override
	public void close() throws IOException 
		if (currentFile.isPresent()) 
			currentFile.get().getDeserializer().close();
			currentFile = Optional.absent();
		
	

	/** Commit the last lines which were read. */
	@Override
	public void commit() throws IOException 
		if (!committed && currentFile.isPresent()) 
			currentFile.get().getDeserializer().mark();
			committed = true;
		
	

	/**
	 * Closes currentFile and attempt to rename it.
	 *
	 * If these operations fail in a way that may cause duplicate log entries, an
	 * error is logged but no exceptions are thrown. If these operations fail in a
	 * way that indicates potential misuse of the spooling directory, a
	 * FlumeException will be thrown.
	 * 
	 * @throws FlumeException if files do not conform to spooling assumptions
	 */
	private void retireCurrentFile() throws IOException 
		Preconditions.checkState(currentFile.isPresent());

		File fileToRoll = new File(currentFile.get().getFile().getAbsolutePath());

		currentFile.get().getDeserializer().close();

		// Verify that spooling assumptions hold
		if (fileToRoll.lastModified() != currentFile.get().getLastModified()) 
			String message = "File has been modified since being read: " + fileToRoll;
			throw new IllegalStateException(message);
		
		if (fileToRoll.length() != currentFile.get().getLength()) 
			String message = "File has changed size since being read: " + fileToRoll;
			throw new IllegalStateException(message);
		

		if (deletePolicy.equalsIgnoreCase(DeletePolicy.NEVER.name())) 
			rollCurrentFile(fileToRoll);
		 else if (deletePolicy.equalsIgnoreCase(DeletePolicy.IMMEDIATE.name())) 
			deleteCurrentFile(fileToRoll);
		 else 
			// TODO: implement delay in the future
			throw new IllegalArgumentException("Unsupported delete policy: " + deletePolicy);
		
	

	/**
	 * Rename the given spooled file
	 * 
	 * @param fileToRoll
	 * @throws IOException
	 */
	private void rollCurrentFile(File fileToRoll) throws IOException 

		File dest = new File(fileToRoll.getPath() + completedSuffix);
		logger.info("Preparing to move file  to ", fileToRoll, dest);

		// Before renaming, check whether destination file name exists
		if (dest.exists() && PlatformDetect.isWindows()) 
			/*
			 * If we are here, it means the completed file already exists. In almost every
			 * case this means the user is violating an assumption of Flume (that log files
			 * are placed in the spooling directory with unique names). However, there is a
			 * corner case on Windows systems where the file was already rolled but the
			 * rename was not atomic. If that seems likely, we let it pass with only a
			 * warning.
			 */
			if (Files.equal(currentFile.get().getFile(), dest)) 
				logger.warn("Completed file " + dest + " already exists, but files match, so continuing.");
				boolean deleted = fileToRoll.delete();
				if (!deleted) 
					logger.error("Unable to delete file " + fileToRoll.getAbsolutePath()
							+ ". It will likely be ingested another time.");
				
			 else 
				String message = "File name has been re-used with different"
						+ " files. Spooling assumptions violated for " + dest;
				throw new IllegalStateException(message);
			

			// Dest file exists and not on windows
		 else if (dest.exists()) 
			String message = "File name has been re-used with different" + " files. Spooling assumptions violated for "
					+ dest;
			throw new IllegalStateException(message);

			// Destination file does not already exist. We are good to go!
		 else 
			boolean renamed = fileToRoll.renameTo(dest);
			if (renamed) 
				logger.debug("Successfully rolled file  to ", fileToRoll, dest);

				// now we no longer need the meta file
				deleteMetaFile();
			 else 
				/*
				 * If we are here then the file cannot be renamed for a reason other than that
				 * the destination file exists (actually, that remains possible w/ small
				 * probability due to TOC-TOU conditions).
				 */
				String message = "Unable to move " + fileToRoll + " to " + dest
						+ ". This will likely cause duplicate events. Please verify that "
						+ "flume has sufficient permissions to perform these operations.";
				throw new FlumeException(message);
			
		
	

	/**
	 * Delete the given spooled file
	 * 
	 * @param fileToDelete
	 * @throws IOException
	 */
	private void deleteCurrentFile(File fileToDelete) throws IOException 
		logger.info("Preparing to delete file ", fileToDelete);
		if (!fileToDelete.exists()) 
			logger.warn("Unable to delete nonexistent file: ", fileToDelete);
			return;
		
		if (!fileToDelete.delete()) 
			throw new IOException("Unable to delete spool file: " + fileToDelete);
		
		// now we no longer need the meta file
		deleteMetaFile();
	

	/**
	 * Returns the next file to be consumed from the chosen directory. If the
	 * directory is empty or the chosen file is not readable, this will return an
	 * absent option. If the @link #consumeOrder variable is
	 * @link ConsumeOrder#OLDEST then returns the oldest file. If the
	 * @link #consumeOrder variable is @link ConsumeOrder#YOUNGEST then returns
	 * the youngest file. If two or more files are equally old/young, then the file
	 * name with lower lexicographical value is returned. If the
	 * @link #consumeOrder variable is @link ConsumeOrder#RANDOM then cache the
	 * directory listing to amortize retreival cost, and return any arbitary file
	 * from the directory.
	 */
	private Optional<FileInfo> getNextFile() 
		List<File> candidateFiles = Collections.emptyList();

		if (consumeOrder != ConsumeOrder.RANDOM || candidateFileIter == null || !candidateFileIter.hasNext()) 
			/* Filter to exclude finished or hidden files */
			FileFilter filter = new FileFilter() 
				public boolean accept(File candidate) 
					String fileName = candidate.getName();
					if ((candidate.isDirectory()) || (fileName.endsWith(completedSuffix)) || (fileName.startsWith("."))
							|| ignorePattern.matcher(fileName).matches() || fileName.endsWith(".end")) 
						return false;
					
					// mk addlines to judge if the file has not the ".end" then return false
					logger.info("======>start to check cadidate.getName" + fileName + " with a .end mark!");
					File checkF = new File(FileUtil.getFileNameWithNewSuf(candidate.getPath(), "end"));
					logger.info("======>check is " + checkF.getName() + " existed?");
					if (!checkF.exists()) 
						logger.info("======>" + checkF.getName() + " not existed " + candidate.getName()
								+ " maybe is still transferring, it will not be committed");
						return false;
					
					return true;
				
			;
			candidateFiles = Arrays.asList(spoolDirectory.listFiles(filter));
			listFilesCount++;
			candidateFileIter = candidateFiles.iterator();
		

		if (!candidateFileIter.hasNext())  // No matching file in spooling directory.
			return Optional.absent();
		

		File selectedFile = candidateFileIter.next();
		if (consumeOrder == ConsumeOrder.RANDOM)  // Selected file is random.
			return openFile(selectedFile);
		 else if (consumeOrder == ConsumeOrder.YOUNGEST) 
			for (File candidateFile : candidateFiles) 
				long compare = selectedFile.lastModified() - candidateFile.lastModified();
				if (compare == 0)  // ts is same pick smallest lexicographically.
					selectedFile = smallerLexicographical(selectedFile, candidateFile);
				 else if (compare < 0)  // candidate is younger (cand-ts > selec-ts)
					selectedFile = candidateFile;
				
			
		 else  // default order is OLDEST
			for (File candidateFile : candidateFiles) 
				long compare = selectedFile.lastModified() - candidateFile.lastModified();
				if (compare == 0)  // ts is same pick smallest lexicographically.
					selectedFile = smallerLexicographical(selectedFile, candidateFile);
				 else if (compare > 0)  // candidate is older (cand-ts < selec-ts).
					selectedFile = candidateFile;
				
			
		

		return openFile(selectedFile);
	

	private File smallerLexicographical(File f1, File f2) 
		if (f1.getName().compareTo(f2.getName()) < 0) 
			return f1;
		
		return f2;
	

	/**
	 * Opens a file for consuming
	 * 
	 * @param file
	 * @return @link #FileInfo for the file to consume or absent option if the
	 *         file does not exists or readable.
	 */
	private Optional<FileInfo> openFile(File file) 
		try 
			// roll the meta file, if needed
			String nextPath = file.getPath();
			PositionTracker tracker = DurablePositionTracker.getInstance(metaFile, nextPath);
			if (!tracker.getTarget().equals(nextPath)) 
				tracker.close();
				deleteMetaFile();
				tracker = DurablePositionTracker.getInstance(metaFile, nextPath);
			

			// sanity check
			Preconditions.checkState(tracker.getTarget().equals(nextPath),
					"Tracker target %s does not equal expected filename %s", tracker.getTarget(), nextPath);

			ResettableInputStream in = new ResettableFileInputStream(file, tracker,
					ResettableFileInputStream.DEFAULT_BUF_SIZE, inputCharset, decodeErrorPolicy);
			EventDeserializer deserializer = EventDeserializerFactory.getInstance(deserializerType, deserializerContext,
					in);

			return Optional.of(new FileInfo(file, deserializer));
		 catch (FileNotFoundException e) 
			// File could have been deleted in the interim
			logger.warn("Could not find file: " + file, e);
			return Optional.absent();
		 catch (IOException e) 
			logger.error("Exception opening file: " + file, e);
			return Optional.absent();
		
	

	private void deleteMetaFile() throws IOException 
		if (metaFile.exists() && !metaFile.delete()) 
			throw new IOException("Unable to delete old meta file " + metaFile);
		
	

	/** An immutable class with information about a file being processed. */
	private static class FileInfo 
		private final File file;
		private final long length;
		private final long lastModified;
		private final EventDeserializer deserializer;

		public FileInfo(File file, EventDeserializer deserializer) 
			this.file = file;
			this.length = file.length();
			this.lastModified = file.lastModified();
			this.deserializer = deserializer;
		

		public long getLength() 
			return length;
		

		public long getLastModified() 
			return lastModified;
		

		public EventDeserializer getDeserializer() 
			return deserializer;
		

		public File getFile() 
			return file;
		
	

	@InterfaceAudience.Private
	@InterfaceStability.Unstable
	static enum DeletePolicy 
		NEVER, IMMEDIATE, DELAY
	

	/**
	 * Special builder class for ReliableSpoolingFileEventReader
	 */
	public static class Builder 
		private File spoolDirectory;
		private String completedSuffix = SpoolDirectorySourceConfigurationConstants.SPOOLED_FILE_SUFFIX;
		private String ignorePattern = SpoolDirectorySourceConfigurationConstants.DEFAULT_IGNORE_PAT;
		private String trackerDirPath = SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR;
		private Boolean annotateFileName = SpoolDirectorySourceConfigurationConstants.DEFAULT_FILE_HEADER;
		private String fileNameHeader = SpoolDirectorySourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY;
		private Boolean annotateBaseName = SpoolDirectorySourceConfigurationConstants.DEFAULT_BASENAME_HEADER;
		private String baseNameHeader = SpoolDirectorySourceConfigurationConstants.DEFAULT_BASENAME_HEADER_KEY;
		private String deserializerType = SpoolDirectorySourceConfigurationConstants.DEFAULT_DESERIALIZER;
		private Context deserializerContext = new Context();
		private String deletePolicy = SpoolDirectorySourceConfigurationConstants.DEFAULT_DELETE_POLICY;
		private String inputCharset = SpoolDirectorySourceConfigurationConstants.DEFAULT_INPUT_CHARSET;
		private DecodeErrorPolicy decodeErrorPolicy = DecodeErrorPolicy.valueOf(
				SpoolDirectorySourceConfigurationConstants.DEFAULT_DECODE_ERROR_POLICY.toUpperCase(Locale.ENGLISH));
		private ConsumeOrder consumeOrder = SpoolDirectorySourceConfigurationConstants.DEFAULT_CONSUME_ORDER;

		public Builder spoolDirectory(File directory) 
			this.spoolDirectory = directory;
			return this;
		

		public Builder completedSuffix(String completedSuffix) 
			this.completedSuffix = completedSuffix;
			return this;
		

		public Builder ignorePattern(String ignorePattern) 
			this.ignorePattern = ignorePattern;
			return this;
		

		public Builder trackerDirPath(String trackerDirPath) 
			this.trackerDirPath = trackerDirPath;
			return this;
		

		public Builder annotateFileName(Boolean annotateFileName) 
			this.annotateFileName = annotateFileName;
			return this;
		

		public Builder fileNameHeader(String fileNameHeader) 
			this.fileNameHeader = fileNameHeader;
			return this;
		

		public Builder annotateBaseName(Boolean annotateBaseName) 
			this.annotateBaseName = annotateBaseName;
			return this;
		

		public Builder baseNameHeader(String baseNameHeader) 
			this.baseNameHeader = baseNameHeader;
			return this;
		

		public Builder deserializerType(String deserializerType) 
			this.deserializerType = deserializerType;
			return this;
		

		public Builder deserializerContext(Context deserializerContext) 
			this.deserializerContext = deserializerContext;
			return this;
		

		public Builder deletePolicy(String deletePolicy) 
			this.deletePolicy = deletePolicy;
			return this;
		

		public Builder inputCharset(String inputCharset) 
			this.inputCharset = inputCharset;
			return this;
		

		public Builder decodeErrorPolicy(DecodeErrorPolicy decodeErrorPolicy) 
			this.decodeErrorPolicy = decodeErrorPolicy;
			return this;
		

		public Builder consumeOrder(ConsumeOrder consumeOrder) 
			this.consumeOrder = consumeOrder;
			return this;
		

		public RobustReliableSpoolingFileEventReader build() throws IOException 
			return new RobustReliableSpoolingFileEventReader(spoolDirectory, completedSuffix, ignorePattern,
					trackerDirPath, annotateFileName, fileNameHeader, annotateBaseName, baseNameHeader,
					deserializerType, deserializerContext, deletePolicy, inputCharset, decodeErrorPolicy, consumeOrder);
		
	


附上FileUtil.java

package org.sky.platform.util;
 
public class FileUtil 
 
    public static String getFileName(String fileCompletedFileName) 
        String fileName = "";
        int end = fileCompletedFileName.lastIndexOf(".");
        fileName = fileCompletedFileName.substring(0, end);
        return fileName;
    
 
    public static String getFileNameWithNewSuf(String oldFileName, String newSurffix) 
        StringBuffer newFileName = new StringBuffer();
        String fileName = getFileName(oldFileName);
        newFileName.append(fileName).append(".").append(newSurffix);
        return newFileName.toString();
    
 
    public static void main(String[] args) 
        String testFileName = "aaa.doc.log";
        System.out.println(getFileName(testFileName));
        System.out.println(getFileNameWithNewSuf(testFileName, "end"));
    

编译打包该工程 

mvn clean install -Dmaven.test.skip=true

然后,在flume1.9.0的根目录下建一个这样的目录层级:

flume1.9.0/plugins.d/RobustSpoolDirectorySource(插件名)即extends AbstractSource implements Configurable, EventDrivenSource的这个主类名/lib

然后,把maven生成的jar包放在这个目录下。这是标准的flume的插件的安装方式

接下去,我们制作用于启动flume的配置脚本。

启动flume用配置文件

flume1.9.0/conf/store01-conf.properties

store01.sources = sources1
store01.sinks = sinks1
store01.channels = channels1
 
# store01-Source
#store01.sources.sources1.type = spoolDir
store0

以上是关于flume+kafka集群解决某著名联锁大型超市超过25年POS线下收单系统变实时系统的典型案例的主要内容,如果未能解决你的问题,请参考以下文章

Flume+Kafka+Storm+Redis 大数据在线实时分析

Flume 学习笔记之 Flume NG+Kafka整合

第53课: Spark大型项目广告点击项目技术骨架实现之Spark+Kafka+Flume实战

sparkstreaming+flume+kafka实时流式处理完整流程

运维大数据之某银行flume性能调优及数据重复问题解决

hadoop-ha+zookeeper+hbase+hive+sqoop+flume+kafka+spark集群安装