Flume拦截器 & 测试Flume-Kafka通道
Posted 闭关苦炼内功
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume拦截器 & 测试Flume-Kafka通道相关的知识,希望对你有一定的参考价值。
1)创建Maven工程flume-interceptor
2)创建包名:com.atguigu.flume.interceptor
3)在pom.xml文件中添加如下配置
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
注意:
scope中provided的含义是编译时用该jar包。打包时时不用。因为集群上已经存在flume的jar包。只是本地编译时用一下。
4)在com.atguigu.flume.interceptor包下创建JSONUtils类
package com.atguigu.flume.interceptor;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;
public class JSONUtils
public static boolean isJSONValidate(String log)
try
JSON.parse(log);
return true;
catch (JSONException e)
return false;
5)在com.atguigu.flume.interceptor包下创建LogInterceptor类
package com.atguigu.flume.interceptor;
import com.alibaba.fastjson.JSON;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
public class ETLInterceptor implements Interceptor
@Override
public void initialize()
@Override
public Event intercept(Event event)
byte[] body = event.getBody();
String log = new String(body, StandardCharsets.UTF_8);
if (JSONUtils.isJSONValidate(log))
return event;
else
return null;
@Override
public List<Event> intercept(List<Event> list)
Iterator<Event> iterator = list.iterator();
while (iterator.hasNext())
Event next = iterator.next();
if(intercept(next)==null)
iterator.remove();
return list;
public static class Builder implements Interceptor.Builder
@Override
public Interceptor build()
return new ETLInterceptor();
@Override
public void configure(Context context)
@Override
public void close()
6)打包
7)需要先将打好的包放入到hadoop102的/opt/module/flume/lib文件夹下面。
[atguigu@hadoop102 lib]$ ls | grep interceptor
flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar
8)分发Flume到hadoop103、hadoop104
[atguigu@hadoop102 module]$ xsync flume/
9)分别在hadoop102、hadoop103上启动Flume
[atguigu@hadoop102 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &
[atguigu@hadoop103 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &
测试Flume-Kafka通道
(1)生成日志
[atguigu@hadoop102 ~]$ lg.sh
(2)消费Kafka数据,观察控制台是否有数据获取到
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \\
--bootstrap-server hadoop102:9092 --from-beginning --topic topic_log
说明:
如果获取不到数据,先检查Kafka、Flume、Zookeeper是否都正确启动。
再检查Flume的拦截器代码是否正常。
以上是关于Flume拦截器 & 测试Flume-Kafka通道的主要内容,如果未能解决你的问题,请参考以下文章