Flume+Kafka+HDFS综合运用

Posted 皓洲

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume+Kafka+HDFS综合运用相关的知识,希望对你有一定的参考价值。

Flume+Kafka+HDFS综合运用

实验内容

如下图所示:

在某一实际应用中,有一个的数据源(可用Source类型为Exec Source或NetCat Source的Agent a1来用模拟),

为方便后期数据分析,需要记录事件的产生IP时间(格式:年月日时分秒)以及事件类型(事件类型根据事件Body中包含WARNING:ERROR:、**INFO:**来确定为WARNING、ERROR、INFO,如不包含,则无需记录事件类型)事件经处理后汇总到Agent a2。

Agent a2根据事件类型,将事件分别送入不同通道进行下一步处理,并将事件类型为WARNING、ERROR写入Kafka集群,并最终被消费者和Agent a3进行消费。

C:\\Users\\zhz\\AppData\\Roaming\\Typora\\typora-user-images\\image-20210429155532185.png

步骤分析

先分析a1,a1需要输出产生IP、时间、事件类型,这里就需要用到拦截器,产生IP使用Host Interceptor,产生时间使用Timestamp Interceptor,产生事件类型需要我们自定义拦截器

然后分析a2,a2要将正常信息输出,把WARNING、ERROR作为生产者消息传到Kafka集群中,这里需要用到多路 Channel选择器

最后分析a3,a3的Source要作为消费者接收Kafka的消息,再把消息输出到HDFS

还要另外写一个消费者,把错误信息输出到控制台中。

a1的配置信息

自定义拦截器

package flume.zhz.com;
import org.apache.commons.codec.Charsets;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Map;

/**
 * @author zhz
 * @date 2021/4/29 16:32
 * 备注:
 */
public class MyFlumeInterceptor implements Interceptor {
    //自定义属性hostIP
    private String hostIP=null;
    private MyFlumeInterceptor(String hostIP){
        this.hostIP=hostIP;
    }
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {

        String[] statuses = {"WARNING","ERROR","INFO"};
        /**
         * 处理body
         */
        StringBuilder builder = new StringBuilder();
        //获得body体字节数组
        byte[] byteBody = event.getBody();
        //将body体转化为字符串
        String body = new String(byteBody, Charsets.UTF_8);

        //获得headers
        Map<String,String> headers = event.getHeaders();
        hostIP = headers.get("host");

        //时间
        String timestamp = headers.get("timestamp");
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String datetime = sdf.format(new Date(Long.valueOf(timestamp)));

        //事件类型
        String status = "NULL";
        for(String str : statuses) {
            if(body.indexOf(str) == 0) {
                status = str;
            }
        }

        //拼接IP地址与body体,形成新body
        builder.append("ip:"+hostIP);
        builder.append(";datetime:"+datetime);
        builder.append(";status:"+status);
        builder.append(";body:"+body);
        byte[] newBody = builder.toString().trim().getBytes();
        //重新设置body体
        event.setBody(newBody);
        System.out.println("body信息:"+builder.toString().trim());

        /**
         * 处理headers
         */
        String error = "ERROR:";
        String warning = "WARNING:";
        String info = "INFO:";
        StringBuilder str = new StringBuilder();
        boolean flag = true;
        for(int i=0;i<body.length()&&i<8;i++){
            str.append(body.charAt(i));
            if (str.equals(error)||str.equals(warning)||str.equals(info)){
                headers.put(str.toString(),body);
                flag = false;
                break;
            }
        }
        if (flag)headers.put("INFO:",body);
        event.setHeaders(headers);
        return event;
    }

    /**
     * 定义内部类MyBuilder,用于构建自定义拦截器MyFlumeInterceptor的实例,
     * 并获取Flume配置文件中自定义的拦截器属性值,将值传给自定义类MyFlumeInterceptor
     */
    public static class MyBuilder implements Interceptor.Builder{
        private String hostIP=null;
        public void configure(Context context){
            //获取Flume配置文件中设置的自定义属性值
            //字符串“hostIP”需与配置文件中设置的属性hostIP一致
            String hostIP = context.getString("hostIP");
            this.hostIP = hostIP;
        }
        public Interceptor build(){
            //实例化自定义拦截器并传入自定义属性
            return new MyFlumeInterceptor(hostIP);
        }
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        for(Event event : list) {
            intercept(event);
        }
        return list;
    }

    @Override
    public void close() {

    }
}

a1配置信息

# 给Agent中的三个组件各起一个别名,a1代表为Agent起的别名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source属性配置信息
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# 拦截器名称
a1.sources.r1.interceptors = i1 i2 i3
# 时间拦截器
a1.sources.r1.interceptors.i1.type = timestamp
# 主机名拦截器
a1.sources.r1.interceptors.i2.type = host
a1.sources.r1.interceptors.i2.useIP = true
# 自定义
a1.sources.r1.interceptors.i3.type = flume.zhz.com.MyFlumeInterceptor$MyBuilder


# sink属性配置信息
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = centos02
a1.sinks.k1.port = 23570

# channel属性配置信息
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 绑定source和sink到channel上
a1.sources.r1.channels= c1
a1.sinks.k1.channel = c1

a2配置信息

sink输出到kafka参考官方文档:https://flume.liyifeng.org/#kafka-sink

# 给Agent中的三个组件各起一个别名,a1代表为Agent起的别名
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# source属性配置信息
a1.sources.r1.type = avro
a1.sources.r1.bind = centos02
a1.sources.r1.port = 23570

# 选择器配置信息
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = status
a1.sources.r1.selector.mapping.NULL = c1
a1.sources.r1.selector.mapping.INFO = c1
a1.sources.r1.selector.mapping.ERROR = c2
a1.sources.r1.selector.mapping.WARNING = c2


# sink1属性配置信息(控制台输出)
a1.sinks.k1.type = logger

# sink2属性配置信息(输出到kafka)
a1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k2.kafka.topic = topictest
a1.sinks.k2.kafka.bootstrap.servers = centos01:9092,centos02:9092,centos03:9092
a1.sinks.k2.serializer.class = kafka.serializer.StringEncoder


# channel属性配置信息
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# channe2属性配置信息
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# 绑定source和sink到channel上
a1.sources.r1.channels= c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

a3配置信息

# 给Agent中的三个组件各起一个别名,a1代表为Agent起的别名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source属性配置信息
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = centos01:9092,centos02:9092,centos03:9092
a1.sources.r1.kafka.topics = topictest
a1.sources.r1.serializer.class = kafka.serializer.StringEncoder


# sink属性配置信息
# 组件类型为hdfs
a1.sinks.k1.type = hdfs
#将文件放入按年分类的文件夹中
a1.sinks.k1.hdfs.path = hdfs://centos01:8020/flume/%Y-%m
# 使用时间戳作为文件前缀
a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H
# 文件后缀
a1.sinks.k1.hdfs.fileSuffix = .log
# 使用服务器时间
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 文件块副本数
a1.sinks.k1.hdfs.minBlockReplicas = 1
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.rollInterval = 86400
a1.sinks.k1.hdfs.rollSize = 1000000
a1.sinks.k1.hdfs.rollCount = 10000

# channe1属性配置信息
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000


# 绑定source和sink到channel上
a1.sources.r1.channels= c1
a1.sinks.k1.channel = c1

运行

开启zookeeper集群,hdfs HA集群,kafka集群和flume,节点二的生产者,节点三的消费者

在这里插入图片描述

测试语句:

在这里插入图片描述

正常信息显示到了centos02的控制台上:

在这里插入图片描述

消费者捕捉错误信息,并上传到hdfs上:

在这里插入图片描述

在这里插入图片描述

以上是关于Flume+Kafka+HDFS综合运用的主要内容,如果未能解决你的问题,请参考以下文章

图解Flume对接Kafka(附中文注释)

图解Flume对接Kafka(附中文注释)

实战系列Flume + kafka + HDFS构建日志采集系统

flume+kafka+hdfs详解

Flume + kafka + HDFS构建日志采集系统

kafka怎么收集到flume的日志