Flume自定义Source,Interceptors
Posted 职场spark哥
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume自定义Source,Interceptors相关的知识,希望对你有一定的参考价值。
工具:idea,maven,flume源码包
1.防止数据丢失,续接偏移量,自定义source.
import org.apache.commons.io.FileUtils;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.apache.flume.source.ExecSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by jw on.
* <p>
* 特点:
* 1.可配置的,可通过修改配置文件就组合到现有的Flume组件中
* 2.可以实时采集一个文件产生的数据
* 3.可以记录输出的偏移量
*/
public class TailFileSource extends AbstractSource implements Configurable, EventDrivenSource {
private static final Logger logger = LoggerFactory.getLogger(TailFileSource.class);
private String filePath;
private String posiFile;
private String charSet;
private Long interval;
private ExecutorService executor;
private FileRunnable fileRunnable;
//读取配置文件的,configure在start反复执行之前执行一次
@Override
public void configure(Context context) {
//读取配置文件中的配置
filePath = context.getString("filePath");
posiFile = context.getString("posiFile");
charSet = context.getString("charSet");
interval = context.getLong("interval");
}
@Override
public synchronized void start() {
//启动、做一些准备工作
//创建一个单线程的线程池
executor = Executors.newSingleThreadExecutor();
//一个任务其实就是一个实现了Runnable的接口的实现类
FileRunnable fileRunnable = new FileRunnable(filePath, posiFile, charSet, interval, getChannelProcessor());
//提交任务到线程池
executor.submit(fileRunnable);
}
@Override
//停止,收尾释放资源
public synchronized void stop() {
fileRunnable.setFlag(false);
executor.shutdown();
}
private static class FileRunnable implements Runnable {
private String filePath;
private String charset;
private Long interval;
private ChannelProcessor channelProcessor;
private Long offset = 0L;
private RandomAccessFile randomAccessFile;
private File positionFile;
private boolean flag = true;
public void setFlag(boolean flag) {
this.flag = flag;
}
//ChannelProcessor可以将数据发送给Channel
public FileRunnable(String filePath, String posiFile, String charset, Long interval, ChannelProcessor channelProcessor) {
this.filePath = filePath;
this.charset = charset;
this.interval = interval;
this.channelProcessor = channelProcessor;
//在构造器完成读取上一次的偏移量
//读取偏移量对应的文件,获取偏移量的值
try {
positionFile = new File(posiFile);
if (!positionFile.exists()) {
positionFile.createNewFile();
}
String offsetStr = FileUtils.readFileToString(positionFile);
if (offsetStr != null && !"".equals(offsetStr)) {
offset = Long.parseLong(offsetStr);
}
//快速定位到想要读取的位置
randomAccessFile = new RandomAccessFile(filePath, "r");
//调到指定的偏移量位置
randomAccessFile.seek(offset);
} catch (IOException e) {
logger.error("read position file error", e);
}}
@Override
public void run() {
//while循环,不停的执行下面的方法
while (flag) {
//接着偏移量读取数据
try {
String line = randomAccessFile.readLine();
if (line != null) {
//解决了randomAccessFile 读取文件有乱码
line = new String(line.getBytes("ISO-8859-1"),charset);
//将一行数据构造成一个Event发送给下一个组件(Channel)
channelProcessor.processEvent(EventBuilder.withBody(line.getBytes()));
//先获取当前的偏移量
offset = randomAccessFile.getFilePointer();
//更新偏移量
FileUtils.writeStringToFile(positionFile, offset + "");
} else {
Thread.sleep(interval);
}
} catch (IOException e) {
logger.error("read file error", e);
} catch (InterruptedException e) {
logger.error("thread sleep error", e);
} } }}}
2.自定义拦截器,可以将source 数据以自定义方式输出到hdfs,比如以下案例,按照日志类型的目录进行数据输出。
import com.alibaba.fastjson.JSONObject;
import com.it18zhang.app.common.AppBaseLog;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.io.FileWriter;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import static org.apache.flume.interceptor.TimestampInterceptor.Constants.*;
/**
* 自定义flume的拦截器,提取body中的createTimeMS字段作为header
*/
public class LogCollInterceptor implements Interceptor {
private final boolean preserveExisting;
private LogCollInterceptor(boolean preserveExisting) {
this.preserveExisting = preserveExisting;
}
public void initialize() {
}
/**
* Modifies events in-place.重点拦截逻辑编写
*/
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
//处理时间
//获得日志体 由于本项目日志是json格式,所以转化json
byte[] json = event.getBody();
String jsonStr = new String(json);
save(jsonStr);
AppBaseLog log = JSONObject.parseObject(jsonStr , App BaseLog.class);
long time = log.getCreatedAtMs();
headers.put(TIMESTAMP, Long.toString(time));
save(time +"");
//处理log类型的头 按照5种日志类型进行拦截
//pageLog
String logType = "" ;
if(jsonStr.contains("pageId")){
logType = "page" ;
}
//eventLog
else if (jsonStr.contains("eventId")) {
logType = "event";
}
//usageLog
else if (jsonStr.contains("singleUseDurationSecs")) {
logType = "usage";
}
//error
else if (jsonStr.contains("errorBrief")) {
logType = "error";
}
//startup
else if (jsonStr.contains("network")) {
logType = "startup";
}
headers.put("logType", logType);
save(logType);
return event;
}
/**
* Delegates to {@link #intercept(Event)} in a loop.
*
* @param events
* @return
*/
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}
public void close() {
}
/**
*/
public static class Builder implements Interceptor.Builder {
private boolean preserveExisting = PRESERVE_DFLT;
public Interceptor build() {
return new LogCollInterceptor(preserveExisting);
}
public void configure(Context context) {
preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DFLT);
}
}
/**
*保存
*/
private void save(String log) {
try {
FileWriter fw = new FileWriter("/home/centos/l.log",true);
fw.append(log + "\r\n");
fw.flush();
fw.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static class Constants {
public static String TIMESTAMP = "timestamp";
public static String PRESERVE = "preserveExisting";
public static boolean PRESERVE_DFLT = false;
}}
总结:以上两种自定义后,需要打jar包,放在flume的lib目录下。自定义flume,需要实现和继承相关的父类和接口,我们可以参考源码进行改动。
想了解更多关于此项目的信息,可以扫码关注,加小编微信
以上是关于Flume自定义Source,Interceptors的主要内容,如果未能解决你的问题,请参考以下文章