Flume---interceptor拦截器
Posted Shall潇
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume---interceptor拦截器相关的知识,希望对你有一定的参考价值。
打开idea,创建工程
导入依赖
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0</version>
</dependency>
编写拦截器程序
package flumestu;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @Author shall潇
* @Date 2021/5/25
* @Description 对接收到的event消息进行分类
* event:header,body
* 如果 body内容以 hello 开头,则将当前event header打上 hello 标签
* 如果 body内容以 hi 开头,则打上 hi 标签
*/
public class InterceptorDemo implements Interceptor {
ArrayList<Event> addHeaderEvents = null;
@Override
public void initialize() {
addHeaderEvents = new ArrayList<>();
}
@Override
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
byte[] body = event.getBody();
String bodyStr = new String(body);
if(bodyStr.startsWith("hello")){
headers.put("type","hello");
} else if(bodyStr.startsWith("hi")){
headers.put("type","hi");
} else {
headers.put("type","other");
}
return event;
}
@Override
public List<Event> intercept(List<Event> list) {
addHeaderEvents.clear();
for (Event event : list) {
Event opEvent = intercept(event);
addHeaderEvents.add(opEvent);
}
return addHeaderEvents;
}
@Override
public void close() {
addHeaderEvents.clear();
addHeaderEvents = null;
}
public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new InterceptorDemo();
}
@Override
public void configure(Context context) {
}
}
}
然后打包
clean—package
到 target 文件夹下找到刚打好的 jar包
将 jar 包拖到 flume/lib 目录下
编写conf 文件
【以hello开头的信息,进入hdfs;
以hi开头的信息,进入kafka,
其他类型信息,通过控制台显示】
interceptordemo.sources=interceptorDemoSource
interceptordemo.channels=hellochannel hichannel otherchannel
interceptordemo.sinks=hellosink hisink othersink
interceptordemo.sources.interceptorDemoSource.type=netcat
interceptordemo.sources.interceptorDemoSource.bind=localhost
interceptordemo.sources.interceptorDemoSource.port=44444
interceptordemo.sources.interceptorDemoSource.interceptors=interceptor1
interceptordemo.sources.interceptorDemoSource.interceptors.interceptor1.type=flumestu.InterceptorDemo$Builder
interceptordemo.sources.interceptorDemoSource.selector.type=multiplexing
interceptordemo.sources.interceptorDemoSource.selector.mapping.hello=hellochannel
interceptordemo.sources.interceptorDemoSource.selector.mapping.hi=hichannel
interceptordemo.sources.interceptorDemoSource.selector.mapping.other=otherchannel
interceptordemo.sources.interceptorDemoSource.selector.header=type
interceptordemo.channels.hellochannel.type=memory
interceptordemo.channels.hellochannel.capacity=1000
interceptordemo.channels.hellochannel.transactionCapacity=100
interceptordemo.channels.hichannel.type=memory
interceptordemo.channels.hichannel.capacity=1000
interceptordemo.channels.hichannel.transactionCapacity=100
interceptordemo.channels.otherchannel.type=memory
interceptordemo.channels.otherchannel.capacity=1000
interceptordemo.channels.otherchannel.transactionCapacity=100
interceptordemo.sinks.hellosink.type=hdfs
interceptordemo.sinks.hellosink.hdfs.fileType=DataStream
interceptordemo.sinks.hellosink.hdfs.filePrefix=hello
interceptordemo.sinks.hellosink.hdfs.fileSuffix=.csv
interceptordemo.sinks.hellosink.hdfs.path=hdfs://192.168.159.100:9000/flumefile/hello/%Y-%m-%d
interceptordemo.sinks.hellosink.hdfs.useLocalTimeStamp=true
interceptordemo.sinks.hellosink.hdfs.batchSize=640
interceptordemo.sinks.hellosink.hdfs.rollCount=0
interceptordemo.sinks.hellosink.hdfs.rollSize=6400000
interceptordemo.sinks.hellosink.hdfs.rollInterval=3
interceptordemo.sinks.hisink.type=org.apache.flume.sink.kafka.KafkaSink
interceptordemo.sinks.hisink.batchSize=640
interceptordemo.sinks.hisink.brokerList=192.168.159.100:9092
interceptordemo.sinks.hisink.topic=hi
interceptordemo.sinks.othersink.type=logger
interceptordemo.sources.interceptorDemoSource.channels=hellochannel hichannel otherchannel
interceptordemo.sinks.hisink.channel=hichannel
interceptordemo.sinks.hellosink.channel=hellochannel
interceptordemo.sinks.othersink.channel=otherchannel
测试
1.创建对应的Topic,并开启对应的消费者
kafka-topics.sh --zookeeper hadoop100:2181 --create --topic hi --partitions 1 --replication-factor 1
kafka-console-consumer.sh --topic hi --bootstrap-server 192.168.159.100:9092 --from-beginning
2.创建checkpoint,data中的文件夹
3.执行命令
./bin/flume-ng agent --name interceptordemo --conf ./conf/ --conf-file ./conf/testjob/netcat-flume-interceptor.conf -Dflume.root.logger=INFO,console
4.通过网络接收
telnet localhost 44444
1、HDFS路径上出现了对应的文件
2、kafka消费者也接收到了对应的数据
3、控制台也打印了对应的信息
以上是关于Flume---interceptor拦截器的主要内容,如果未能解决你的问题,请参考以下文章