Flume---interceptor拦截器

Posted Shall潇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume---interceptor拦截器相关的知识,希望对你有一定的参考价值。

打开idea,创建工程

导入依赖

<dependency>
      <groupId>org.apache.flume</groupId>
      <artifactId>flume-ng-core</artifactId>
      <version>1.6.0</version>
</dependency>

编写拦截器程序

package flumestu;


import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @Author shall潇
 * @Date 2021/5/25
 * @Description  对接收到的event消息进行分类
 * event:header,body
 * 如果 body内容以 hello 开头,则将当前event header打上 hello 标签
 * 如果 body内容以 hi 开头,则打上 hi 标签
 */
public class InterceptorDemo implements Interceptor {

    ArrayList<Event> addHeaderEvents = null;
    @Override
    public void initialize() {
        addHeaderEvents = new ArrayList<>();
    }

    @Override
    public Event intercept(Event event) {
        Map<String, String> headers = event.getHeaders();
        byte[] body = event.getBody();
        String bodyStr = new String(body);
        if(bodyStr.startsWith("hello")){
            headers.put("type","hello");
        } else if(bodyStr.startsWith("hi")){
            headers.put("type","hi");
        } else {
            headers.put("type","other");
        }
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        addHeaderEvents.clear();
        for (Event event : list) {
            Event opEvent = intercept(event);
            addHeaderEvents.add(opEvent);
        }
        return addHeaderEvents;
    }

    @Override
    public void close() {
        addHeaderEvents.clear();
        addHeaderEvents = null;
    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new InterceptorDemo();
        }

        @Override
        public void configure(Context context) {
        }
    }
}

然后打包
clean—package
到 target 文件夹下找到刚打好的 jar包
将 jar 包拖到 flume/lib 目录下

编写conf 文件

【以hello开头的信息,进入hdfs;
以hi开头的信息,进入kafka,
其他类型信息,通过控制台显示】

interceptordemo.sources=interceptorDemoSource
interceptordemo.channels=hellochannel hichannel otherchannel
interceptordemo.sinks=hellosink hisink othersink

interceptordemo.sources.interceptorDemoSource.type=netcat
interceptordemo.sources.interceptorDemoSource.bind=localhost
interceptordemo.sources.interceptorDemoSource.port=44444
interceptordemo.sources.interceptorDemoSource.interceptors=interceptor1
interceptordemo.sources.interceptorDemoSource.interceptors.interceptor1.type=flumestu.InterceptorDemo$Builder
interceptordemo.sources.interceptorDemoSource.selector.type=multiplexing
interceptordemo.sources.interceptorDemoSource.selector.mapping.hello=hellochannel
interceptordemo.sources.interceptorDemoSource.selector.mapping.hi=hichannel
interceptordemo.sources.interceptorDemoSource.selector.mapping.other=otherchannel
interceptordemo.sources.interceptorDemoSource.selector.header=type

interceptordemo.channels.hellochannel.type=memory
interceptordemo.channels.hellochannel.capacity=1000
interceptordemo.channels.hellochannel.transactionCapacity=100
interceptordemo.channels.hichannel.type=memory
interceptordemo.channels.hichannel.capacity=1000
interceptordemo.channels.hichannel.transactionCapacity=100
interceptordemo.channels.otherchannel.type=memory
interceptordemo.channels.otherchannel.capacity=1000
interceptordemo.channels.otherchannel.transactionCapacity=100

interceptordemo.sinks.hellosink.type=hdfs
interceptordemo.sinks.hellosink.hdfs.fileType=DataStream
interceptordemo.sinks.hellosink.hdfs.filePrefix=hello
interceptordemo.sinks.hellosink.hdfs.fileSuffix=.csv
interceptordemo.sinks.hellosink.hdfs.path=hdfs://192.168.159.100:9000/flumefile/hello/%Y-%m-%d
interceptordemo.sinks.hellosink.hdfs.useLocalTimeStamp=true
interceptordemo.sinks.hellosink.hdfs.batchSize=640
interceptordemo.sinks.hellosink.hdfs.rollCount=0
interceptordemo.sinks.hellosink.hdfs.rollSize=6400000
interceptordemo.sinks.hellosink.hdfs.rollInterval=3

interceptordemo.sinks.hisink.type=org.apache.flume.sink.kafka.KafkaSink
interceptordemo.sinks.hisink.batchSize=640
interceptordemo.sinks.hisink.brokerList=192.168.159.100:9092
interceptordemo.sinks.hisink.topic=hi

interceptordemo.sinks.othersink.type=logger

interceptordemo.sources.interceptorDemoSource.channels=hellochannel hichannel otherchannel
interceptordemo.sinks.hisink.channel=hichannel
interceptordemo.sinks.hellosink.channel=hellochannel
interceptordemo.sinks.othersink.channel=otherchannel

测试

1.创建对应的Topic,并开启对应的消费者

kafka-topics.sh --zookeeper hadoop100:2181 --create --topic hi --partitions 1 --replication-factor 1
kafka-console-consumer.sh --topic hi --bootstrap-server 192.168.159.100:9092 --from-beginning

2.创建checkpoint,data中的文件夹

3.执行命令

./bin/flume-ng agent --name interceptordemo --conf ./conf/ --conf-file ./conf/testjob/netcat-flume-interceptor.conf -Dflume.root.logger=INFO,console

4.通过网络接收

telnet localhost 44444

在这里插入图片描述
1、HDFS路径上出现了对应的文件
在这里插入图片描述
2、kafka消费者也接收到了对应的数据
在这里插入图片描述
3、控制台也打印了对应的信息
在这里插入图片描述

以上是关于Flume---interceptor拦截器的主要内容,如果未能解决你的问题,请参考以下文章

Flume拦截器 & 测试Flume-Kafka通道

Flume拦截器 & 测试Flume-Kafka通道

Flume拦截器 & 测试Flume-Kafka通道

flume

Flume学习系列---- Custom Interceptors(自定义拦截器)

07_Flume_regex interceptor实践