图解Flume对接Kafka(附中文注释)

Posted 小基基o_O

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了图解Flume对接Kafka(附中文注释)相关的知识,希望对你有一定的参考价值。

1、前言

Flume基础:https://yellow520.blog.csdn.net/article/details/112758144
Kafka基础:https://yellow520.blog.csdn.net/article/details/112701565

2、架构图

2.1、前半part:File->Flume->Kafka

2.2、后半part:Kafka->Flume->HDFS

2.3、总

3、代码

mkdir -p /opt/test/data

3.1、File->Flume->Kafka

vi /opt/test/ffk
# 定义agent、source、channel名称,并绑定关系
a1.sources = r1
a1.channels = c1
a1.sources.r1.channels = c1

# source
a1.sources.r1.type = TAILDIR
# 指定文件组的名称
a1.sources.r1.filegroups = f1
# 指定组监控的目录(支持正则表达式)
a1.sources.r1.filegroups.f1 = /opt/test/data/.+txt
# 指定断点续传文件
a1.sources.r1.positionFile = /opt/test/data/position.json
# 指定一个批次采集多少数据
a1.sources.r1.batchSize = 100

# channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
# kafka服务地址
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092
# 主题
a1.channels.c1.kafka.topic = A1
# 数据写入时,是否以event格式写入,false表示只写body
a1.channels.c1.parseAsFlumeEvent = false

3.2、Kafka->Flume->HDFS

vi /opt/test/kfh
# 定义agent、channel、source、sink名称,并关联
a2.sources = r2
a2.channels = c2
a2.sinks = k2
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

# source
a2.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
# kafka服务地址
a2.sources.r2.kafka.bootstrap.servers = hadoop102:9092
# 主题
a2.sources.r2.kafka.topics = A1
# 消费者组
a2.sources.r2.kafka.consumer.group.id = g1
# 消费者组第一个消费topic的数据的时候从哪里开始消费
a2.sources.r2.kafka.consumer.auto.offset.reset = earliest
# source从Kafka拉消息的批次大小
a2.sources.r2.batchSize = 50
# 拦截器
a2.sources.r2.interceptors = i1
a2.sources.r2.interceptors.i1.type = a.b.c.TimeInterceptor$Builder

# channel
a2.channels = c2
# channel类型:内存(断电可能会丢失数据)
a2.channels.c2.type = memory

# sink
a2.sinks.k2.type = hdfs
# 指定数据存储目录
a2.sinks.k2.hdfs.path = hdfs://hadoop100:8020/kf/%Y-%m-%d
# 指定文件的前缀
a2.sinks.k2.hdfs.filePrefix = log-
# 指定滚动生成文件的时间间隔
a2.sinks.k2.hdfs.rollInterval = 30
# 指定滚动生成文件的大小(133169152=127*1024*1024<128M)
a2.sinks.k2.hdfs.rollSize = 133169152
# 写入多少个event之后滚动生成新文件,通常选0,表示禁用
a2.sinks.k2.hdfs.rollCount = 0
# 文件写入格式:SequenceFile-序列化文件、DataStream-文本文件、CompressedStream-压缩文件
a2.sinks.k2.hdfs.fileType = DataStream

注意:

因为source的batch size默认值1000,channel的transaction capacity默认值100
所以要把source’s batch size调小

3.2.1、自定义拦截器

业务逻辑:
由于Flume默认使用Linux系统时间输出到HDFS路径,23:59产生的数据到达HDFS可能到了下一天,所以使用拦截器来获取日志中的实际时间

<!-- 依赖 -->
<dependencies>
    <!-- Flume;其中scope设置为provided,打包时不用打包Flume,因为可以用服务器的那个Flume -->
    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.9.0</version>
        <scope>provided</scope>
    </dependency>
    <!-- json解析器 -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.62</version>
    </dependency>
</dependencies>
<!-- 打包插件,可以把json相关的依赖打包上 -->
<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
package a.b.c;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;
import java.util.Map;

public class TimeInterceptor implements Interceptor {
    // 初始化
    @Override
    public void initialize() {}

    // 处理单个event
    @Override
    public Event intercept(Event event) {
        // 解析json
        String str = new String(event.getBody());
        JSONObject js = JSON.parseObject(str);
        // 业务逻辑:按key取时间戳
        Long ts = js.getLong("ts");
        // hdfs.useLocalTimeStamp = false
        // hdfs.path = /kf/%Y-%m-%d
        // 添加timestamp
        // 写到HDFS时,会找event的headers的key=timestamp的value
        // 据此value得到%Y-%m-%d
        Map<String, String> headers = event.getHeaders();
        headers.put("timestamp", ts.toString());
        return event;
    }

    // 处理批次events
    @Override
    public List<Event> intercept(List<Event> events) {
        for (Event event : events)
            intercept(event);
        return events;
    }

    // 关闭
    @Override
    public void close() {}

    public static class Builder implements Interceptor.Builder {
        @Override
        public Interceptor build() {
            return new TimeInterceptor();
        }

        @Override
        public void configure(Context context) {}
    }
}

打包后将名称含with-dependencies的jar包上传到$FLUME_HOME/lib

ll $FLUME_HOME/lib | grep SNAPSHOT

4、测试

  1. 开启服务
zookeeper.py start
kafka.py start
hadoop.py start
  1. 创建主题
kafka-topics.sh \\
--zookeeper hadoop102:2181/kafka \\
--create \\
--replication-factor 2 \\
--partitions 3 \\
--topic A1
  1. 写一个Python2脚本,制造含时间的数据
vi /opt/test/a.py
from time import strptime, mktime
t0 = '2020-12-31'
for i in range(1, 200000, 2):
    t = int(i + mktime(strptime(t0, '%Y-%m-%d'))) * 1000
    js = {'ts': t, 'num': str(i)}
    print(js)
python /opt/test/a.py > /opt/test/data/a.txt
  1. 启动Flume
cd /opt/test/
nohup \\
flume-ng agent -n a1 -c $FLUME_HOME/conf/ -f ffk -Dflume.root.logger=INFO,console \\
>/opt/test/f1.log 2>&1 &
nohup \\
flume-ng agent -n a2 -c $FLUME_HOME/conf/ -f kfh -Dflume.root.logger=INFO,console \\
>/opt/test/f2.log 2>&1 &
  1. 查看HDFS

以上是关于图解Flume对接Kafka(附中文注释)的主要内容,如果未能解决你的问题,请参考以下文章

flume对接kafka

Flume对接Kafka

大数据技术之KafkaKafka APIKafka监控Flume对接KafkaKafka面试题

大数据技术之KafkaKafka APIKafka监控Flume对接KafkaKafka面试题

大数据技术之KafkaKafka APIKafka监控Flume对接KafkaKafka面试题

大数据技术之KafkaKafka APIKafka监控Flume对接KafkaKafka面试题