离线数仓之flume采集过程设置拦截器

Posted Mr.zhou_Zxy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了离线数仓之flume采集过程设置拦截器相关的知识,希望对你有一定的参考价值。

离线数仓之拦截器设置

frp穿透获取数据,通过分割脚本,将采集到的数据分割到指定的文件夹,然后由flume采集到hdfs,本次介绍的拦截器是设置flume中。主要实现功能是对flume接收到的json数据进行base解码,并传回给flume,由flume再次上传到hdfs。

package com.zxy.bigdata.flume.intercepter;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.JSONPath;
import com.google.common.collect.Lists;
import org.apache.commons.codec.binary.Base64;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;

/**
 * @author lixi
 * @date 2021-05-25
 * @desc Base64的拦截器,拦截flume的日志,再通过base64解码之后再传入到hdfs
 * @version v1.0
 */
public class B64Intercepter implements Interceptor {

    /**
     * 拦截方法
     *
     eyJwcm9qZWN0IjoibmV3cyIsImlwIjoiMzkuOTkuMTcwLjUzIiwiY3RpbWUiOjE2MjE4ODY0NzMzMTh9
     -
     eyJjb250ZW50Ijp7InV1aWQiOiIwY2ViY2M5NS0wZmM0LTRlMjktOTVkOC05MjdhMTc3YTg5M2EiLCJkaXN0aW5jdF9pZCI6IjQzNCIsImV2ZW50IjoiQXBwUGFnZVZpZXciLCJwcm9wZXJ0aWVzIjp7Im1vZGVsIjoiTWF0ZTciLCJuZXR3b3JrX3R5cGUiOiIzRyIsImlzX2NoYXJnaW5nIjoiIiwiYXBwX3ZlcnNpb24iOiIxLjAiLCJlbGVtZW50X25hbWUiOiIiLCJlbGVtZW50X3BhZ2UiOiLmlrDpl7vliJfooajpobUiLCJjYXJyaWVyIjoi5Lit5Zu96IGU6YCaIiwib3MiOiIiLCJpbWVpIjoiMjM0NzY3NTc4NTY1IiwiYmF0dGVyeV9sZXZlbCI6IjMxIiwic2NyZWVuX3dpZHRoIjoiMTAyNCIsInNjcmVlbl9oZWlnaHQiOiI3MjAiLCJkZXZpY2VfaWQiOiJaSEFPV1VYWFg2RUZFQjk2MDgwQ0UiLCJjbGllbnRfdGltZSI6IjIwMjEtMDUtMjUgMDQ6MDE6MTMiLCJpcCI6IjIxMC40Mi4zMi44IiwibWFudWZhY3R1cmVyIjoiSHVhd2VpIiwiYXJ0aWNsZV9pZCI6IjE5NzU3IiwiYWN0aW9uX3R5cGUiOiIifSwidHlwZSI6InRyYWNrIn19
     */
    @Override
    public Event intercept(Event event) {
        //1. 获取到数据本身
        String text = new String(event.getBody());
        //2. 拆分
        String[] textArray = text.split("-");
        //3. 校验数据的合法性
        byte[] body = null;
        if (textArray.length == 2) {
            try {
                //4. 获取到meta的json字符串
                String meta = new String(Base64.decodeBase64(textArray[0]));
                //5. 获取到数据内容的json字符串
                String content = new String(Base64.decodeBase64(textArray[1]));
                //6. 将meta的json字符串转换json对象
                JSONObject jsonMeta = JSONObject.parseObject(meta);
                //7. 获取jsonMeta对象中的ctime
                String ctime = JSONPath.eval(jsonMeta, "$.ctime").toString();
                //8. 抽取meta的ctime的数据
                SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
                ctime = sdf.format(Double.parseDouble(ctime));
                //9. 将这个时间可以存放到event的header中
                event.getHeaders().put("ctime", ctime);
                //10. 内容数据的json对象
                JSONObject jsonContent = JSONObject.parseObject(content);
                //11. 将jsonMeta和jsonContent合成一个json对象
                JSONObject jsonObject = new JSONObject();
                jsonObject.put("ctime", JSONPath.eval(jsonMeta, "$.ctime"));
                jsonObject.put("project", JSONPath.eval(jsonMeta, "$.project"));
                jsonObject.put("content", JSONPath.eval(jsonContent, "$.content"));
                //12. 获取到json的字节数组
                body = jsonObject.toString().getBytes();
            } catch (Exception e) {
                e.printStackTrace();
                return null;
            }
        }
        //13. 将处理之后的数据封装回event中
        event.setBody(body);
        //14. 返回了
        return event;
    }

    /**
     * 拦截方法
     * 实际这个方法才是真正的会自动的被flume所调用
     */
    @Override
    public List<Event> intercept(List<Event> list) {
        //1. 创建的一个新数组
        ArrayList<Event> intercepted = Lists.newArrayListWithCapacity(list.size());
        //2. 遍历
        for (Event event : list) {
            //3. 处理event
            Event interceptEvent = intercept(event);
            //4. 添加到新的集合中
            if (interceptEvent != null) {
                intercepted.add(interceptEvent);
            }
        }
        return intercepted;
    }

    /**
     * 默认flume创建拦截器,会调用这个内部类的这个里面的方法
     */
    public static class Builder implements Interceptor.Builder {

        @Override
        public Interceptor build() {
            return new B64Intercepter();
        }

        @Override
        public void configure(Context context) {}
    }

    /**
     * 你的拦截器出生了就会立刻调用此方法
     */
    @Override
    public void initialize() {}

    /**
     * 临死之前调用一次
     */
    @Override
    public void close() {}
}

以上是关于离线数仓之flume采集过程设置拦截器的主要内容,如果未能解决你的问题,请参考以下文章

离线数仓之Kerberos基本使用及问题记录

离线数仓之数据监控-Prometheus

离线数仓之数据监控-Grafana

Flume配置项目 离线数仓项目

离线数仓同步数据

数仓 数据漂移问题解决