Flume自定义Source,Interceptors

Posted 职场spark哥

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume自定义Source,Interceptors相关的知识,希望对你有一定的参考价值。

工具:idea,maven,flume源码包

1.防止数据丢失,续接偏移量,自定义source.


import org.apache.commons.io.FileUtils;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.apache.flume.source.ExecSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by jw on.
 * <p>
 * 特点:
 * 1.可配置的,可通过修改配置文件就组合到现有的Flume组件中
 * 2.可以实时采集一个文件产生的数据
 * 3.可以记录输出的偏移量
 */
public class TailFileSource extends AbstractSource implements Configurable, EventDrivenSource {
    private static final Logger logger = LoggerFactory.getLogger(TailFileSource.class);
    private String filePath;
    private String posiFile;
    private String charSet;
    private Long interval;
    private ExecutorService executor;
private FileRunnable fileRunnable;

    //读取配置文件的,configure在start反复执行之前执行一次
    @Override
    public void configure(Context context) {
        //读取配置文件中的配置
        filePath = context.getString("filePath");
        posiFile = context.getString("posiFile");
        charSet = context.getString("charSet");
        interval = context.getLong("interval");
    }
@Override
    public synchronized void start() {
        //启动、做一些准备工作
        //创建一个单线程的线程池
        executor = Executors.newSingleThreadExecutor();
        //一个任务其实就是一个实现了Runnable的接口的实现类
        FileRunnable fileRunnable = new FileRunnable(filePath, posiFile, charSet, interval, getChannelProcessor());
        //提交任务到线程池
        executor.submit(fileRunnable);
    }
@Override

   //停止,收尾释放资源

    public synchronized void stop() {
        fileRunnable.setFlag(false);
        executor.shutdown();
    
    }
private static class FileRunnable implements Runnable {
private String filePath;
        private String charset;
        private Long interval;
        private ChannelProcessor channelProcessor;
        private Long offset = 0L;
        private RandomAccessFile randomAccessFile;
        private File positionFile;
        private boolean flag = true;
public void setFlag(boolean flag) {
            this.flag = flag;
        }
 //ChannelProcessor可以将数据发送给Channel
        public FileRunnable(String filePath, String posiFile, String charset, Long interval, ChannelProcessor channelProcessor) {
this.filePath = filePath;
            this.charset = charset;
            this.interval = interval;
            this.channelProcessor = channelProcessor;
            //在构造器完成读取上一次的偏移量
            //读取偏移量对应的文件,获取偏移量的值
try {
                positionFile = new File(posiFile);
                if (!positionFile.exists()) {
                    positionFile.createNewFile();
                }
                String offsetStr = FileUtils.readFileToString(positionFile);
                if (offsetStr != null && !"".equals(offsetStr)) {
                    offset = Long.parseLong(offsetStr);
                }
                //快速定位到想要读取的位置
                randomAccessFile = new RandomAccessFile(filePath, "r");
               //调到指定的偏移量位置
                randomAccessFile.seek(offset);
} catch (IOException e) {
                logger.error("read position file error", e);
            }}
@Override
        public void run() {
//while循环,不停的执行下面的方法
            while (flag) {
                //接着偏移量读取数据
                try {
                    String line = randomAccessFile.readLine();
                    if (line != null) {
                         //解决了randomAccessFile 读取文件有乱码
                        line = new String(line.getBytes("ISO-8859-1"),charset);
                        //将一行数据构造成一个Event发送给下一个组件(Channel)
                    channelProcessor.processEvent(EventBuilder.withBody(line.getBytes()));
                        //先获取当前的偏移量
                        offset = randomAccessFile.getFilePointer();
                        //更新偏移量
                        FileUtils.writeStringToFile(positionFile, offset + "");
                    } else {
                        Thread.sleep(interval);
                    }
                } catch (IOException e) {
                    logger.error("read file error", e);
                } catch (InterruptedException e) {
                    logger.error("thread sleep error", e);
                } } }}}

2.自定义拦截器,可以将source 数据以自定义方式输出到hdfs,比如以下案例,按照日志类型的目录进行数据输出。

import com.alibaba.fastjson.JSONObject;
import com.it18zhang.app.common.AppBaseLog;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.io.FileWriter;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import static org.apache.flume.interceptor.TimestampInterceptor.Constants.*;

/**
 * 自定义flume的拦截器,提取body中的createTimeMS字段作为header
 */
public class LogCollInterceptor implements Interceptor {

    private final boolean preserveExisting;

    private LogCollInterceptor(boolean preserveExisting) {
        this.preserveExisting = preserveExisting;
    }

    public void initialize() {
    }

    /**
     * Modifies events in-place.重点拦截逻辑编写
     */
    public Event intercept(Event event) {
        Map<String, String> headers = event.getHeaders();
        //处理时间
        //获得日志体 由于本项目日志是json格式,所以转化json
        byte[] json = event.getBody();
        String jsonStr = new String(json);
        save(jsonStr);
        AppBaseLog log = JSONObject.parseObject(jsonStr , App                BaseLog.class);
        long time = log.getCreatedAtMs();
        headers.put(TIMESTAMP, Long.toString(time));
        save(time +"");

        //处理log类型的头 按照5种日志类型进行拦截
        //pageLog
        String logType = "" ;
        if(jsonStr.contains("pageId")){
            logType = "page" ;
        }
        //eventLog
        else if (jsonStr.contains("eventId")) {
            logType = "event";
        }
        //usageLog
        else if (jsonStr.contains("singleUseDurationSecs")) {
            logType = "usage";
        }
        //error
        else if (jsonStr.contains("errorBrief")) {
            logType = "error";
        }
        //startup
        else if (jsonStr.contains("network")) {
            logType = "startup";
        }
        headers.put("logType", logType);
        save(logType);
        return event;
    }

    /**
     * Delegates to {@link #intercept(Event)} in a loop.
     *
     * @param events
     * @return
     */
    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
            intercept(event);
        }
        return events;
    }
public void close() {
    }

    /**
     */
    public static class Builder implements Interceptor.Builder {
private boolean preserveExisting = PRESERVE_DFLT;
public Interceptor build() {
            return new LogCollInterceptor(preserveExisting);
        }
public void configure(Context context) {
            preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT);
        }
    }

    /**
     *保存
     */
    private void save(String log)  {
        try {
            FileWriter fw = new FileWriter("/home/centos/l.log",true);
            fw.append(log + "\r\n");
            fw.flush();
            fw.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
public static class Constants {
        public static String TIMESTAMP = "timestamp";
        public static String PRESERVE = "preserveExisting";
        public static boolean PRESERVE_DFLT = false;
    }}

   总结:以上两种自定义后,需要打jar包,放在flume的lib目录下。自定义flume,需要实现和继承相关的父类和接口,我们可以参考源码进行改动。

了解更多关于此项目的信息,可以扫码关注,加小编微信 




 

以上是关于Flume自定义Source,Interceptors的主要内容,如果未能解决你的问题,请参考以下文章

flume自定义source

flume 自定义 hbase sink

Flume自定义Source

Hadoop实战-Flume之自定义Source(十八)

Flume-自定义 Source 读取 MySQL 数据

大数据Flume自定义类型