离线数仓之flume采集过程设置拦截器
Posted Mr.zhou_Zxy
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了离线数仓之flume采集过程设置拦截器相关的知识,希望对你有一定的参考价值。
离线数仓之拦截器设置
frp穿透获取数据,通过分割脚本,将采集到的数据分割到指定的文件夹,然后由flume采集到hdfs,本次介绍的拦截器是设置flume中。主要实现功能是对flume接收到的json数据进行base解码,并传回给flume,由flume再次上传到hdfs。
package com.zxy.bigdata.flume.intercepter;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONPath;
import com.google.common.collect.Lists;
import org.apache.commons.codec.binary.Base64;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
/**
* @author lixi
* @date 2021-05-25
* @desc Base64的拦截器,拦截flume的日志,再通过base64解码之后再传入到hdfs
* @version v1.0
*/
public class B64Intercepter implements Interceptor {
/**
* 拦截方法
*
eyJwcm9qZWN0IjoibmV3cyIsImlwIjoiMzkuOTkuMTcwLjUzIiwiY3RpbWUiOjE2MjE4ODY0NzMzMTh9
-
eyJjb250ZW50Ijp7InV1aWQiOiIwY2ViY2M5NS0wZmM0LTRlMjktOTVkOC05MjdhMTc3YTg5M2EiLCJkaXN0aW5jdF9pZCI6IjQzNCIsImV2ZW50IjoiQXBwUGFnZVZpZXciLCJwcm9wZXJ0aWVzIjp7Im1vZGVsIjoiTWF0ZTciLCJuZXR3b3JrX3R5cGUiOiIzRyIsImlzX2NoYXJnaW5nIjoiIiwiYXBwX3ZlcnNpb24iOiIxLjAiLCJlbGVtZW50X25hbWUiOiIiLCJlbGVtZW50X3BhZ2UiOiLmlrDpl7vliJfooajpobUiLCJjYXJyaWVyIjoi5Lit5Zu96IGU6YCaIiwib3MiOiIiLCJpbWVpIjoiMjM0NzY3NTc4NTY1IiwiYmF0dGVyeV9sZXZlbCI6IjMxIiwic2NyZWVuX3dpZHRoIjoiMTAyNCIsInNjcmVlbl9oZWlnaHQiOiI3MjAiLCJkZXZpY2VfaWQiOiJaSEFPV1VYWFg2RUZFQjk2MDgwQ0UiLCJjbGllbnRfdGltZSI6IjIwMjEtMDUtMjUgMDQ6MDE6MTMiLCJpcCI6IjIxMC40Mi4zMi44IiwibWFudWZhY3R1cmVyIjoiSHVhd2VpIiwiYXJ0aWNsZV9pZCI6IjE5NzU3IiwiYWN0aW9uX3R5cGUiOiIifSwidHlwZSI6InRyYWNrIn19
*/
@Override
public Event intercept(Event event) {
//1. 获取到数据本身
String text = new String(event.getBody());
//2. 拆分
String[] textArray = text.split("-");
//3. 校验数据的合法性
byte[] body = null;
if (textArray.length == 2) {
try {
//4. 获取到meta的json字符串
String meta = new String(Base64.decodeBase64(textArray[0]));
//5. 获取到数据内容的json字符串
String content = new String(Base64.decodeBase64(textArray[1]));
//6. 将meta的json字符串转换json对象
JSONObject jsonMeta = JSONObject.parseObject(meta);
//7. 获取jsonMeta对象中的ctime
String ctime = JSONPath.eval(jsonMeta, "$.ctime").toString();
//8. 抽取meta的ctime的数据
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
ctime = sdf.format(Double.parseDouble(ctime));
//9. 将这个时间可以存放到event的header中
event.getHeaders().put("ctime", ctime);
//10. 内容数据的json对象
JSONObject jsonContent = JSONObject.parseObject(content);
//11. 将jsonMeta和jsonContent合成一个json对象
JSONObject jsonObject = new JSONObject();
jsonObject.put("ctime", JSONPath.eval(jsonMeta, "$.ctime"));
jsonObject.put("project", JSONPath.eval(jsonMeta, "$.project"));
jsonObject.put("content", JSONPath.eval(jsonContent, "$.content"));
//12. 获取到json的字节数组
body = jsonObject.toString().getBytes();
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
//13. 将处理之后的数据封装回event中
event.setBody(body);
//14. 返回了
return event;
}
/**
* 拦截方法
* 实际这个方法才是真正的会自动的被flume所调用
*/
@Override
public List<Event> intercept(List<Event> list) {
//1. 创建的一个新数组
ArrayList<Event> intercepted = Lists.newArrayListWithCapacity(list.size());
//2. 遍历
for (Event event : list) {
//3. 处理event
Event interceptEvent = intercept(event);
//4. 添加到新的集合中
if (interceptEvent != null) {
intercepted.add(interceptEvent);
}
}
return intercepted;
}
/**
* 默认flume创建拦截器,会调用这个内部类的这个里面的方法
*/
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new B64Intercepter();
}
@Override
public void configure(Context context) {}
}
/**
* 你的拦截器出生了就会立刻调用此方法
*/
@Override
public void initialize() {}
/**
* 临死之前调用一次
*/
@Override
public void close() {}
}
以上是关于离线数仓之flume采集过程设置拦截器的主要内容,如果未能解决你的问题,请参考以下文章