Flume学习

Posted 六块腹肌的程序猿

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume学习相关的知识,希望对你有一定的参考价值。

简介:

1.Flume原本是Cloudera公司开发的后来贡献给了Apache的一套分布式的、可靠的、针对日志数据进行收集、汇聚和传输的机制
2.在大数据中,实际开发中有超过70%的数据来源于日志-日志是大数据的基石
3.Flume针对日志提供了非常简单且灵活的流式传输机制
4.版本
a.Flume0.X:又称之为Flume-og。依赖于Zookeeper,结构配置相对复杂,现在市面上已经停用这个版本
b.Flume1.X:又称之为Flume-og。不依赖于Zookeeper,结构配置相对简单,是市面上常用的版本

二.基本概念
1.Event
a.在Flumn中,会将收集到的每一条日志封装成一个Event对象 - 在Flume中,一个Event就对应了一条日志
b.Event本质上是一个json串,固定的包含两部分:headers和body -Flume将收集到的日志封装成一个json,而这个json就是Event。Event的结构是“headers”:,“body”:“”
2.Agent:是Flume流动模型的基本组成结构,固定的包含了三个部分:
a.Source:从数据源采集数据的-collecting
b.Channel:临时存储数据-aggregating
c.Sink:将数据写往目的地-moving

三、流动模型/拓扑结构
1.单级流动

2.多级流动

3.扇入流动

4.扇出流动

5.复杂流动:实际过程中,根据不同的需求将上述的流动模型进行组合,就构成了复杂流动结构
四、Flume的执行流程

1.Source会先采集数据,然后将数据发送给ChannelProcessor进行处理
2.ChannelProcessor处理之后,会将数据交给Interceptor来处理,注意,在Flume允许存在多个Interceptor来构成拦截器链
3.Interceptor处理完成之后,会交给Selector处理,Selector存在两种模式:replicating和multiplexing。Selector收到数据之后会根据对应的模式将数据交给对应的Channel处理
4.Channel处理之后会交给SinkProcessor。SinkProcessor本质上是一个Sinkgroup,包含了三种方式:Default,Failover和LoadBalance。SinkProcessor收到数据之后会根据对应的方法将数据交给Sink来处理
5.Sink收到数据之后,会将数据写到指定的目的地

Flumn安装:

一:安装
1.要求虚拟机或者云主机上必须安装JDK1.8,最好安装Hadoop
2.进入/home/software
cd /home/software
flume下载地址
3.解压
tar -xvf apache-flume-1.9.0-bin.tar.gz
4.让Flume和Hadoop兼容(如果没有安装Hadoop,那么这一步不需要执行)
cd /home/software/apache-flume-1.9.0-bin/lib
rm -rf guava-11.0.2.jar
5.新建目录用于存储Flume的格式文件
cd …
mkdir data
cd data
6.编辑格式文件
vim basic.conf
7.添加格式文件内容

#给Agent起名
#给Source起名
a1.sources = s1
#给channels起名
a1.channels = c1
#给Sink起名
a1.sinks = k1

#配置Source
a1.sources.s1.type = netcat
a1.sources.s1.bind = hadoop01
a1.sources.s1.port = 8090

#配置Channel
a1.channels.c1.type = memory

#配置Sink
a1.sinks.k1.type = logger

#将Source和Channel绑定
a1.sources.s1.channels = c1
#将Sink和Channels绑定
a1.sinks.k1.channel = c1

8.启动
…/bin/flume-ng agent -n a1 -c …/conf -f basic.conf -Dflume.root.logger=INFO,console
9.测试:
另起一个客户端:nc hadoop01 8090

启动Flume的机器显示接收如下结果:

二、参数

参数解释
–n,–name指定要运行的Agent的名字
-c,–conf指定Flume运行的原生配置
-f,–conf-file指定要运行的文件
-Dflume.root.logger指定Flume本身运行日志的打印级别及打印方式

Source组件

AVRO Source

一、概述
1.AVRO Source监听指定的端口,接受其他节点发送来的被AVRO序列化的数据
2.AVRO Source结合AVRO Sink可以实现更多的流动模型,包括多级流动、扇入流动以及扇出流动
二、配置属性

属性解释
type必须是avro
bind要监听的主机的主机名或者IP
port要监听的端口

三、案例
1.编辑格式文件,在格式文件中需要添加指定内容
vim avrosource.conf

a1.sources = s1
a1.channels = c1
a1.sinks = k1

#配置AVRO Source
#必须是avro
a1.sources.s1.type = avro
#指定要监听的主机
a1.sources.s1.bind = hadoop01
#指定要监听的主机
a1.sources.s1.port = 8090

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

2.启动Flume
…/bin/flume-ng agent -n a1 -c …/conf -f avrosource.conf -Dflume.root.logger=INFO,console
3.在另一个窗口中,进入指定目录,编辑文件
cd /home/software/apache-flume-1.9.0-bin/data
vim a.txt
写入
hello world
hello flume
4.运行AVRO客户端
…/bin/flume-ng avro-client -H hadoop01 -p 8090 -F a.txt

flumn收到AVRO的信息

Exec Source

一、概述
1.Exec Source会运行指定的命令,然后将命令的执行结果作为日志进行收集
2.利用这个Source可以实现对文件或者其他操作的实时监听
二、配置属性

属性解释
type必须是exec
command要执行和监听的命令
shell最好指定这个属性,表示指定Shell的运行方式

三、案例
1.需求:实时监听/home/a.txt文件的变化
2.编辑格式文件,添加如下内容
vim execsource.conf

a1.sources = s1
a1.channels = c1
a1.sinks = k1

#配置Exec Source
#必须是exec
a1.sources.s1.type = exec
#指定要运行的命令
a1.sources.s1.command = tail -F /home/a.txt
#指定Shell的运行方式/类型
a1.sources.s1.shell = /bin/bash -c

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

3.启动Flume
…/bin/flume-ng agent -n a1 -c …/conf -f execsource.conf -Dflume.root.logger=INFO,console
4.测试:
另起一个客户端,向监听文件追加内容
cd /home
mkdir a.txt
echo “hello” >>a.txt
观察flume运行窗口

Spooling Directory Source
一、概述
1.Spooling Directory Source是监听指定的目录,自动将目录中出现的新文件的内容进行收集
2.如果不指定,默认情况下,一个文件被收集之后,会自动添加一个后缀.COMPLETED,通过属性fileSuffix来修改

二、配置属性

属性解释
type必须是spoodir
spoolDir要监听的目录
fileSuffix收集之后添加的文件后缀,默认是.COMPLETED

三、案例
1.编辑格式文件,添加如下内容
vim spoolingdirsource.conf

a1.sources = s1
a1.channels = c1
a1.sinks = k1

#配置Spooling Directory Source
#必须是spooldir
a1.sources.s1.type = spooldir
#指定要监听的目录
a1.sources.s1.spoolDir = /home/flumedata

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

2.启动Flume
…/bin/flume-ng agent -n a1 -c …/conf -f spoolingdirsource.conf -Dflume.root.logger=INFO,console

另外还有:
Netcat Source
Sequence Generator source
HTTP Source
Custom Source
也可去官网查看提供的各种source

Custom Source
一、概述
1.自定义Source:需要定义一个类实现Source接口的子接口:EventDrivenSource或者PollableSource
a.EventDrivenSource:事件驱动源-被动型Source。需要自己定义线程来获取数据处理数据
b.PollableSource:拉取源 -主动型Source。提供了线程来获取数据,只需要考虑怎么处理数据即可
2.除了实现上述两个接口之一,这个自定义的类一般还需要考虑实现Configurable接口,通过接口的方法获取指定的属性
二、步骤
1.需要构建Maven工程,导入对应的POM依赖

<!--Flume的核心包-->
    <dependency>
      <groupId>org.apache.flume</groupId>
      <artifactId>flume-ng-core</artifactId>
      <version>1.9.0</version>
    </dependency>
    <!--Flume的开发工具包-->
    <dependency>
      <groupId>org.apache.flume</groupId>
      <artifactId>flume-ng-sdk</artifactId>
      <version>1.9.0</version>
    </dependency>
    <!--Flume的配置包-->
    <dependency>
      <groupId>org.apache.flume</groupId>
      <artifactId>flume-ng-configuration</artifactId>
      <version>1.9.0</version>
    </dependency>

2.定义类继承AbstractSource 实现EventDrivenSource和Configurable接口
3.覆盖configure, start和stop方法
4.定义完成后,需要将类打成jar包放到Flume安装目录的lib目录下

package sc.flume;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


//模拟:Sequence Generator Source
public class AuthSource extends AbstractSource implements EventDrivenSource, Configurable 

    private long end;
    private long step;
    ExecutorService es = null;

    //通过这个方法来获取指定的属性值
    @Override
    public void configure(Context context) 
        //获取自增的最大值,如果不指定,默认是Long.MAX_VALUE
        end = context.getLong("end", Long.MAX_VALUE);
        step = context.getLong("step", 1L);

    

    //启动Source
    @Override
    public void start() 
        //构建线程池
        es = Executors.newFixedThreadPool(5);
        //获取Channel处理器
        ChannelProcessor cp = this.getChannelProcessor();
        //提交任务
        es.submit(new Add(end, step, cp));

    

    @Override
    public void stop() 
        if (es!=null)
            es.shutdown();
        

    



class Add implements Runnable

    private final long end;
    private final long step;
    private final ChannelProcessor cp;

    public Add(long end, long step, ChannelProcessor cp) 
        this.end = end;
        this.step = step;
        this.cp = cp;
    

    @Override
    public void run() 
        for (long i =0;i < end; i+=step) 
            //在Flume中,数据都是以Event形式存在
            //封装body
            byte[] body = (i + "").getBytes(StandardCharsets.UTF_8);
            //封装headers
            Map<String,String> headers = new HashMap<>();
            headers.put("time",System.currentTimeMillis() + "");
            //构建Event对象
            Event e = EventBuilder.withBody(body, headers);
            cp.processEvent(e);

        
    

5.编写格式文件,例如
cd /home/software/apache-flume-1.9.0-bin/data
vim authsource.conf

a1.sources = s1
a1.channels = c1
a1.sinks = k1

#配置自定义 Source
#必须是avro
a1.sources.s1.type = sc.flume.AuthSource
#指定结束范围
a1.sources.s1.end = 100
#指定递增的步长
a1.sources.s1.step = 5

a1.channels.c1.type = memory
a1.sinks.k1.type = logger

a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

6.启动Flume
…/bin/flume-ng agent -n a1 -c …/conf -f authsource.conf -Dflume.root.logger=INFO,console
7.结果

5.编写

Sink组件

hdfs sink
一、概述
1.HDFS Sink将收集到的数据写到HDFS中
2.在往HDFS上写的时候,支持三种文件类型:文本类型,序列类型以及压缩类型。如果不指定,那么默认使用序列类型
3.在往HDFS上写数据的时候,数据的存储文件会定时的滚动,如果不指定,那么每隔30s会滚动一次,生成一个文件,那么此时会生成大量的小文件
二、配置属性

属性解释
type必须是hdfs
hdfs.path数据在HDFS上的存储路径
hdfs.rollInterval指定文件的滚动的间隔时间
hdfs.fileType指定文件的存储类型:DataStream(文本),SequenceFile(序列),CompressedStream(压缩)

三、案列
1.编辑格式文件,添加如下内容

a1.sources = s1
a1.channels = c1
a1.sinks = k1

#配置Exec Source
#必须是exec
a1.sources.s1.type = exec
#指定要运行的命令
a1.sources.s1.command = tail -F /home/a.txt
#指定Shell的运行方式/类型
a1.sources.s1.shell = /bin/bash -c

a1.channels.c1.type = memory

#配置HDFS Sink
a1.sinks.k1.type = hdfs
#指定数据在HDFS上的存储路径
a1.sinks.k1.hdfs.path = hdfs://hadoop01:9000/flumedata
#指定文件的存储类型
a1.sinks.k1.hdfs.fileType = DataStream
#指定文件滚动的间隔时间
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

2.启动Flume
…/bin/flume-ng agent -n a1 -c …/conf -f hdfssink.conf -Dflume.root.logger=INFO,console
3.测试:
另起一个客户端,向监听文件追加内容
cd /home
mkdir a.txt
echo “hello” >>a.txt
查询hdfs上文件

还提供了别的Sink的类型:可以官网查看

Custom Sink
一、概述
1.定义一个类实现Sink接口,考虑到需要获取配置属性,所以同样需要实现Configurable接口
2.不同于自定义Soure,自定义Sink需要考虑事务问题
二、事务
1.Source收集数据之后i,会doPut操作将数据放到队列PutList(本质上是一个阻塞式队列)中
2.PutList会试图将数据推送到Channel中。如果PutList成功将数据放到了Channel中,那么执行doCommit操作;反之执行doRollback操作
3.Channel有了数据之后,会将数据通过doTake操作推送到TakeList中
4.TakeList会将数据推送到Sink中,如果Sink写出成功,那么执行doCommit;反之执行doRollback操作
三、自定义Sink步骤
1.构建Maven工程,导入对应的POM依赖
2.定义一个类继承AbstractSink,实现Sink接口和Configurable接口,覆盖configure,start,process和stop方法

package sc.flume;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;

import java.io.FileNotFoundException;
import java.io.PrintStream;
import java.util.Map;

//模拟:File Roll Sink ->将数据写到本地磁盘上
public class AuthSink extends AbstractSink implements Sink, Configurable 

    private String path;
    private PrintStream printStream;

    @Override
    public void configure(Context context) 
        //获取指定的存储路径
        path = context.getString("path");
        //判断用户是否指定了属性
        if (path == null) 
            throw new IllegalArgumentException("必须指定path属性!!!");
        
    

    //启动Sink
    @Override
    public synchronized void start() 
        //构建流用于将数据写到磁盘上
        try 
            printStream = new PrintStream(path + "/" + System.currentTimeMillis());
         catch (FileNotFoundException e) 
            throw new RuntimeException(e);
        

    

    //处理逻辑需要覆盖在这个方法中
    @Override
    public Status process() throws EventDeliveryException 
        //获取Sink对应的Channel
        Channel c = this.getChannel();
        //获取事务
        Transaction t = c.getTransaction();
        //开启事务
        t.begin();
        //获取数据
        Event e;
        try 
            while ((e = c.take()) != null)
                //获取headers
                Map<String, String> headers = e.getHeaders();
                //写出headers部分的数据
                printStream.println("headers");
                for (Map.Entry<String, String> h : headers.entrySet()) 
                    printStream.println("\\t" + h.getKey() + ":" + h.getValue());
                
                //获取body
                byte[] body = e.getBody();
                //写出body数据
                printStream.println("body");
                printStream.println("\\t" + new String(body));
            
            //如果循环正常结束,那么说明数据正常写出
            //提交事务
            t.commit();
            return Status.READY;
         catch (Exception ex) 
            //如果循环失败,那么进入catch块
            //回滚事务
            t.rollback();
            return Status.BACKOFF;
         finally 
            //无论成功与否,都需要关闭事务
            t.close();
        
    

    @Override
    public synchronized void stop() 
        if (printStream != null)
            printStream.close();
        
    


3.完成之后打成jar包放到Flume安装目录的lib目录下
4.编写格式文件

a1.sources = s1
a1.channels = c1
a1.sinks = k1

#配置Exec Source
#必须是exec
a1.sources.s1.type = exec
#指定要运行的命令
a1.sources.s1.command = tail -F /home/a.txt
#指定Shell的运行方式/类型
a1.sources.s1.shell = /bin/bash -c

a1.channels.c1.type = memory

#配置自定义
#类型必须是类的全路径名
a1.sinks.k1.type = sc.flume.AuthSink
#指定文件的存储路径
a1.sinks.k1.path = /home/flumedata
#指定口端口
a1.sinks.k1.port = 8090

a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

5.启动Flume
…/bin/flume-ng agent -n a1 -c …/conf -f authsink.conf -Dflume.root.logger=INFO,console
6.测试:
另起一个客户端,向监听文件追加内容
cd /home
mkdir a.txt
echo “hello” >>a.txt
cat /home/flumedata
vim 1651458810852
查看结果

多级流动、扇入流动、扇出流动

一、多级流动
1.AVRO Sink结合AVRO Source实现多级、扇入、扇出流动效果
2.案列:
①将flume考到另外两台中hadoop02、hadoop03
scp -r /home/software/apache-flume-1.9.0-bin root@hadoop02:/home/software/
scp -r /home/software/apache-flume-1.9.0-bin root@hadoop03:/home/software/
②分别编辑三台格式文件,添加如下内容 :
cd /home/software/apache-flume-1.9.0-bin/data
vim duoji.conf

在hadoop01的duoji.conf文件里修改

a1.sources = s1
a1.channels = c1
a1.sinks = k1

#配置Exec Source
#必须是exec
a1.sources.s1.type = exec
#指定要运行的命令
a1.sources.s1.command = tail -F /home/a.txt
#指定Shell的运行方式/类型
a1.sources.s1.shell = /bin/bash -c

a1.channels.c1.type = memory

#配置多级流动
#类型必须是avro
a1.sinks.k1.type = avro
#指定主机名或者IP
a1.sinks.k1.hostname = hadoop02
#指定口端口
a1.sinks.k1.port = 8090

a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

在hadoop02的duoji.conf文件里修改

a1.sources = s1
a1.channels = c1
a1.sinks = k1

a1.sources.s1.type = avro
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 8090

a1以上是关于Flume学习的主要内容,如果未能解决你的问题,请参考以下文章

Flume 推文的未知文件格式

Flume学习之路 Flume的配置方式

Flume学习笔记

flume学习

Flume学习笔记

Flume学习笔记