Flume拦截器 & 测试Flume-Kafka通道

Posted 闭关苦炼内功

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume拦截器 & 测试Flume-Kafka通道相关的知识,希望对你有一定的参考价值。

1)创建Maven工程flume-interceptor

2)创建包名:com.atguigu.flume.interceptor

3)在pom.xml文件中添加如下配置

<dependencies>
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

注意:
scope中provided的含义是编译时用该jar包。打包时时不用。因为集群上已经存在flume的jar包。只是本地编译时用一下。

4)在com.atguigu.flume.interceptor包下创建JSONUtils类

package com.atguigu.flume.interceptor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;

public class JSONUtils 
    public static boolean isJSONValidate(String log)
        try 
            JSON.parse(log);
            return true;
        catch (JSONException e)
            return false;
        
    

5)在com.atguigu.flume.interceptor包下创建LogInterceptor类

package com.atguigu.flume.interceptor;

import com.alibaba.fastjson.JSON;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;

public class ETLInterceptor implements Interceptor 

    @Override
    public void initialize() 

    

    @Override
    public Event intercept(Event event) 

        byte[] body = event.getBody();
        String log = new String(body, StandardCharsets.UTF_8);

        if (JSONUtils.isJSONValidate(log)) 
            return event;
         else 
            return null;
        
    

    @Override
    public List<Event> intercept(List<Event> list) 

        Iterator<Event> iterator = list.iterator();

        while (iterator.hasNext())
            Event next = iterator.next();
            if(intercept(next)==null)
                iterator.remove();
            
        

        return list;
    

    public static class Builder implements Interceptor.Builder

        @Override
        public Interceptor build() 
            return new ETLInterceptor();
        
        @Override
        public void configure(Context context) 

        

    

    @Override
    public void close() 

    

6)打包

7)需要先将打好的包放入到hadoop102的/opt/module/flume/lib文件夹下面。

[atguigu@hadoop102 lib]$ ls | grep interceptor
flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar

8)分发Flume到hadoop103、hadoop104

[atguigu@hadoop102 module]$ xsync flume/

9)分别在hadoop102、hadoop103上启动Flume

[atguigu@hadoop102 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &
[atguigu@hadoop103 flume]$ bin/flume-ng agent --name a1 --conf-file conf/file-flume-kafka.conf &

测试Flume-Kafka通道

(1)生成日志

[atguigu@hadoop102 ~]$ lg.sh

(2)消费Kafka数据,观察控制台是否有数据获取到

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh \\
--bootstrap-server hadoop102:9092 --from-beginning --topic topic_log

说明:
如果获取不到数据,先检查Kafka、Flume、Zookeeper是否都正确启动。
再检查Flume的拦截器代码是否正常。

以上是关于Flume拦截器 & 测试Flume-Kafka通道的主要内容,如果未能解决你的问题,请参考以下文章

Flume拦截器 & 测试Flume-Kafka通道

Flume---interceptor拦截器

图解Flume对接Kafka(附中文注释)

图解Flume对接Kafka(附中文注释)

Flume 拦截器(interceptor)详解

flume 自定义 hbase sink