Flume实战案例

Posted traveller-hzq

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume实战案例相关的知识,希望对你有一定的参考价值。

从端口读数据读取到本地文件

#1.给三个组件命名
a3.sources = r1
a3.channels = c1
a3.sinks = k1 
#2.给source组件属性赋值
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 6666
#3.给channel组件属性赋值
a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100
#4.给sink组件属性赋值
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/flume-1.9.0/datas/fileroll
#5.让sources、sinks连接上对应的channels
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1 

从端口读数据读取到HDFS

a2 ==> a2.conf


a2.sources = r1
a2.channels = c1
a2.sinks = k1

a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 5555

a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100

a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
a2.sinks.k1.hdfs.filePrefix = logs-
a2.sinks.k1.hdfs.round = true
a2.sinks.k1.hdfs.roundValue = 1
a2.sinks.k1.hdfs.roundUnit = hour
a2.sinks.k1.hdfs.useLocalTimeStamp = true
a2.sinks.k1.hdfs.batchSize = 100
a2.sinks.k1.hdfs.fileType = DataStream
a2.sinks.k1.hdfs.rollInterval = 60
a2.sinks.k1.hdfs.rollSize = 134217700
a2.sinks.k1.hdfs.rollCount = 0

a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

复制和多路复用

技术图片

可以将相同数据复制到多个channel中(flume默认),也可以将不同数据分发到不同channel中,sink可以选择传送到不同的目的地

监控一个文件,然后通过两个channel搭配两个sink吧内容写出到控制台.

a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2 

a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /tmp/atguigu/hive.log
a1.sources.r1.selector.type = replicating

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 100

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 5555

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 6666

a1.sources.r1.channels = c1 c2 
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2 

故障转移

技术图片

Flume支持使用将多个sink逻辑上分到一个sink组,sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能。这里的故障,指的是Sink故障

1)通过sinkgroups里priority属性配置的权重来决定哪台的优先级高,同一时间只能有一台机器工作

2)当当前的sink挂掉后切换为standby模式(假设优先级10),并立刻切换到另一台(假设优先级9),当sink修复好重新启动后,隔段时间会恢复使用优先级为10的sink

  1. 遇到故障时,我们要立即修复
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2 

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 4444

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#将数据写到另一台Flume服务器上
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 5555
#将数据写到另一台Flume服务器上
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop103
a1.sinks.k2.port = 6666

#使用sink processor来控制channel的数据流向
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2  
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

负载均衡

通过将sinkprocessor里的type属性来控制processor模式,分别是(负载均衡load_balance、故障转移failover)

  1. 使用负载均衡以后,channel会轮训分配任务,减少机器负荷
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2 

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 4444

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 5555

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 6666

a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

聚合

技术图片

聚合,指的是将多台日志服务器上的数据,汇总到一台日志服务器上,进行输出

a3 ==> a3.conf(hadoop104)

a3.sources = r1
a3.channels = c1
a3.sinks = k1 

a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop104
a3.sources.r1.port = 6666

a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100

a3.sinks.k1.type = logger

a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1 


a2 ==> a2.conf(hadoop103)

a2.sources = r1
a2.channels = c1
a2.sinks = k1 

a2.sources.r1.type = netcat
a2.sources.r1.bind = localhost
a2.sources.r1.port = 4444

a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100

a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 6666

a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1 


a1 ==> a1.conf(hadoop102)

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /tmp/atguigu/hive.log

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 6666

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1 

ChannelSelector案例

ChannelSelector的作用就是选出Event将要被发往哪个Channel。其共有两种类型,分别是Replicating(复制)和Multiplexing(多路复用)。

ReplicatingSelector会将同一个Event发往所有的Channel,Multiplexing会根据相应的原则,将不同的Event发往不同的Channel。默认是Replicating

  1. Multiplexing类型的ChannelSelector会根据Event中Header中的某个属性决定分发到哪个Channel。
  2. 每个event里的header默认是没有值的,所以,multiplexing类型的ChannelSelector一般会配合自定义拦截器使用

replicating类型例子如下

a1.sources = r1
a1.channels = c1 c2 # 如果有100个Event,那么c1和c2中都会有这100个事件

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

multiplexing类型的ChannelSelector例子如下

a1.sources = r1
a1.channels = c1 c2

a1.sources.source1.selector.type = multiplexing
a1.sources.source1.selector.header = title # 以header中的title对应的值作为条件
a1.sources.source1.selector.mapping.a = c2 # 如果header中title的值为a,使用c2这个channel
a1.sources.source1.selector.mapping.b = c1 # 如果header中title的值为FAIL,使用c1这个channel
a1.sources.source1.selector.default = c1 # 默认使用c1这个channel

SinkProcessor案例

SinkProcessor共有三种类型,分别是DefaultSinkProcessor、LoadBalancingSinkProcessor和FailoverSinkProcessor

DefaultSinkProcessor对应的是单个的Sink,LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group,LoadBalancingSinkProcessor可以实现负载均衡的功能,FailoverSinkProcessor可以错误恢复的功能。

自定义Interceptor

使用Flume采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。

技术图片

需求

在该案例中,我们以端口数据模拟日志,以数字(单个)和字母(单个)模拟不同类型的日志,我们需要自定义interceptor区分数字和字母,将其分别发往不同的分析系统(Channel)。

实现代码

1)创建maven项目,引入依赖

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.9.0</version>
</dependency>

2)Java代码

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;
import java.util.Map;

/**
 *  1. 如何自定义拦截器?
 *   flume的自定义拦截器需要实现Flume提供的Interceptor接口.
 *
 *  实现抽象方法:
 *      initialize: 完成一些初始化工作.
 *      close: 完成一些善后的工作
 *      intercept:拦截器的核心处理方法.  拦截的逻辑.
 *          intercept(Event event) : 单个event的拦截处理
 *          intercept(List<Event> events): 批次event的拦截处理
 *
 *  2. 拦截器的对象如何实例化?
 *    在拦截器中定义一个static的内部类,实现Flume提供的Builder接口
 *
 *   实现抽象方法:
 *      build : 用于构建拦截器对象
 *      configure:用于读取配置信息(xxxx.conf)
 *
 *
 *
 */
public class LogDataInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    /**
     * 需求: 判断每个Event的body中是否包含"atguigu"
     *       如果包含,给Event的header中添加一个kv:  title = at
     *       如果不包含,给Event的header中添加一个kv: title = ot
     */
    @Override
    public Event intercept(Event event) {
        //1. 获取event的 header 和 body
        Map<String, String> headers = event.getHeaders();
        String body = new String(event.getBody());  // 编码问题

        //2. 判断处理
        if(body.contains("atguigu")){
            headers.put("title","at");
        }else{
            headers.put("title","ot");
        }

        //3. 将处理好的event返回
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
            intercept(event);
        }

        return events ;
    }

    @Override
    public void close() {

    }


    public static class MyBuilder   implements  Builder{

        @Override
        public Interceptor build() {
            return new LogDataInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }

}

3)将代码打成jar包

4)配置文件

1.进阶案例 - channel选择器 - 多路
a3 ==> a3.conf

a3.sources = r1
a3.channels = c1
a3.sinks = k1 

a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 6666

a3.channels.c1.type = memory
a3.channels.c1.capacity = 10000
a3.channels.c1.transactionCapacity = 100

a3.sinks.k1.type = logger

a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1 


a2 ==> a2.conf


a2.sources = r1
a2.channels = c1
a2.sinks = k1

a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 5555

a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 100

a2.sinks.k1.type =logger

a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1


a1 ==> a1.conf

a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2 

a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 4444

a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 100

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 5555

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 6666

#将选择器类型改为multiplexing分发
a1.sources.r1.selector.type = multiplexing
#检测每个event里head的title key
a1.sources.r1.selector.header = title
#如果title的值为at,吧event发到channel c1里,如果为ot,发到channel c2里,如果都不匹配,默认发到c1里
a1.sources.r1.selector.mapping.at = c1
a1.sources.r1.selector.mapping.ot = c2
a1.sources.r1.selector.default=c1
#给拦截器命名i1
a1.sources.r1.interceptors = i1
#这里写自定义类的全类名
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.LogDataInterceptor$MyBuilder

a1.sources.r1.channels = c1 c2 
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

5)分别在三台机器上启动flume并测试

[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console

[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console

[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf

注:使用 jps-ml 查看Flume进程

以上是关于Flume实战案例的主要内容,如果未能解决你的问题,请参考以下文章

Flume实战案例

Flume实战案例 -- 从HDFS上读取某个文件到本地目录

互联网大数据日志收集离线实时分析实战案例

Express实战 - 应用案例- realworld-API - 路由设计 - mongoose - 数据验证 - 密码加密 - 登录接口 - 身份认证 - token - 增删改查API(代码片段

Flume配置案例

Flume入门教程-简单案例