Flume自定义Source

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume自定义Source相关的知识,希望对你有一定的参考价值。

模拟编写了一个Flume 1.7中TAILDIR的功能实现,通过手动控制文件的读取位置来达到对文件的读写,防止flume挂了之后重复消费的情况。
以下是代码实现,仅做参考,生产上直接用TAILDIR读取文件内容即可,若要读取一个目录下的子目录,可使用github上以实现的这个项目包:https://github.com/qwurey/flume-source-taildir-recursive

package com.fwmagic.flume.source;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @Description:自定义Source 1、读取指定目录下的文件,如nginx的access.log
 * 2、读取文件前先判断offset文件是否存在,不存在则创建它
 * 3、每次读取完都写一个offset文件记录读取到文件的什么位置,防止重启flume时发生重复消费的情况
 * 4、如何自定义?参考ExecSource
 * <p>
 * (1):获取自定义配置文件属性
 * (2):创建线程池,用channelProcessor发送数据给channel
 * (3):线程池提交(启动任务)
 * 任务内容:
 * (1):读取偏移量文件,没有则创建,有则获取偏移量,将读取的指针重置到指定偏移量
 * (2):读取指定的日志文件,将读取的一行内容打包成Event,用Channel发送Event
 * (3):获取读取内容后的偏移量,重置偏移量
 * (4):stop方法调用,关闭线程池,调用super.stop方法。
 * @Date:Create in 2018/8/19
 */
public class TailFileSource extends AbstractSource implements EventDrivenSource, Configurable {

    /*监听的文件*/
    private String filePath;

    /*记录读取偏移量的文件*/
    private String posiFile;

    /*若读取文件暂无内容,则等待数秒*/
    private Long interval;

    /*读写文件的字符集*/
    private String charset;

    /*读取文件内容的线程*/
    private FileRunner fileRunner;

    /*线程池*/
    private ExecutorService executor;

    private static final Logger logger = LoggerFactory.getLogger(TailFileSource.class);

    /**
     * 初始化配置文件内容
     *
     * @param context
     */
    @Override
    public void configure(Context context) {
        filePath = context.getString("filePath");
        posiFile = context.getString("posiFile");
        interval = context.getLong("interval", 2000L);
        charset = context.getString("charset", "UTF-8");
    }

    @Override
    public synchronized void start() {
        //启动一个线程,用于监听对应的日志文件
        //创建一个线程池
        executor = Executors.newSingleThreadExecutor();
        //用channelProcessor发送数据给channel
        ChannelProcessor channelProcessor = super.getChannelProcessor();
        fileRunner = new FileRunner(filePath, posiFile, interval, charset, channelProcessor);
        executor.submit(fileRunner);
        super.start();
    }

    @Override
    public synchronized void stop() {
        fileRunner.setFlag(Boolean.FALSE);
        while (!executor.isTerminated()) {
            logger.debug("waiting for exec executor service to stop");
            try {
                executor.awaitTermination(500, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
                logger.debug("Interrupted while waiting for executor service to stop,Just exiting.");
                Thread.currentThread().interrupt();
            }
        }
        super.stop();
    }

    public static class FileRunner implements Runnable {
        private Long interval;

        private String charset;

        private Long offset = 0L;

        private File pFile;

        private RandomAccessFile raf;

        private ChannelProcessor channelProcessor;

        private Boolean flag = Boolean.TRUE;

        public void setFlag(Boolean flag) {
            this.flag = flag;
        }

        public FileRunner(String filePath, String posiFile, Long interval, String charset, ChannelProcessor channelProcessor) {
            this.interval = interval;
            this.charset = charset;
            this.channelProcessor = channelProcessor;

            //1、判断是否有偏移量文件,有则读取偏移量,没有则创建
            pFile = new File(posiFile);
            if (!pFile.exists()) {
                try {
                    pFile.createNewFile();
                } catch (IOException e) {
                    e.printStackTrace();
                    logger.error("create position file error!", e);
                }
            }
            //2、判断偏移量中的文件内容是否大于0
            try {
                String offsetStr = FileUtils.readFileToString(pFile, this.charset);
//          3、如果偏移量文件中有记录,则将内容转换为Long
                if (StringUtils.isNotBlank(offsetStr)) {
                    offset = Long.parseLong(offsetStr);
                }
//           4、如果有偏移量,则直接跳到文件的偏移量位置
                raf = new RandomAccessFile(filePath, "r");
//              跳到指定的位置
                raf.seek(offset);
            } catch (IOException e) {
                e.printStackTrace();
                logger.error("read position file error!", e);
            }
        }

        @Override
        public void run() {
            //监听文件
            while (flag) {
//            读取文件中的内容
                String line = null;
                try {
                    line = raf.readLine();
                    if (StringUtils.isNotBlank(line)) {
//                      把数据打包成Event,发送到Channel
                        line = new String(line.getBytes("ISO-8859-1"), "UTF-8");
                        Event event = EventBuilder.withBody(line.getBytes());
                        channelProcessor.processEvent(event);
                        //更新偏移量文件,把偏移量写入文件
                        offset = raf.getFilePointer();
                        FileUtils.writeStringToFile(pFile, offset.toString());
                    } else {
                        try {
                            Thread.sleep(interval);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            logger.error("thread sleep error", e);
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

以上是关于Flume自定义Source的主要内容,如果未能解决你的问题,请参考以下文章

flume自定义source

flume 自定义 hbase sink

005- Flume Source之Custom

Hadoop实战-Flume之自定义Source(十八)

Flume-自定义 Source 读取 MySQL 数据

大数据Flume自定义类型