揭秘字节跳动埋点数据实时动态处理引擎(附源码)

Posted zhisheng_blog

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了揭秘字节跳动埋点数据实时动态处理引擎(附源码)相关的知识,希望对你有一定的参考价值。

1.序篇-先说结论

宝贝们,还记得前几天博主去的火山引擎大数据场嘛,其中比较令大家感兴趣的就是最后一讲,字节一站式埋点平台的 flink 标准化清洗及拆流任务。

其中大家感觉比较流啤的就是的就是字节做到了:

  1. 不重启任务可以上下线新的拆流及清洗规则,所有的规则变更都不需要涉及到任务的重启。

  2. 清洗 udf,rpc 接口热加载

总的来说就是任务永不停,不可能停止的,好么,beiber。

字节火山引擎 PPT。公众号回复 20210724 获取。

6

本文博主就主要介绍第一点,即做到规则动态变化,可以做到动态添加一个 sink kafka topic,动态删除一个 sink kafka topic,而不重启任务。相信能抛砖引玉,给大家一些启发。

本文从以下几个章节详细介绍框架实现:

  1. 背景篇-为啥需要这么个框架

  2. 定义、目标篇-做这个框架的目标、预期效果是什么

  3. 难点剖析篇-此框架建设的难点、业界目前的实现

  4. 数据建设篇-框架具体方案设计

  5. 数据保障篇-框架的保障方案

  6. 总结与展望篇

2.背景篇-为啥需要这么个框架

首先来看看字节他们做这件事情的背景:

  1. 任务重启造成数据的延迟:对于字节这种企业来说且每天都会新上线很多的埋点,把这些新的埋点拆流条件加入 flink 任务就要重启,但是字节客户端日志流量都是千万级别 qps 的,就意味着这个 flink 任务一旦重启耗时肯定是很长的,这对时延敏感的业务是不可接受的。

  2. 减少对于原始客户端日志的烟囱式消费,节约资源

  3. 统一标准化的埋点平台:用户能通过埋点平台用到正确的数据

  4. 与埋点平台联动的、统一化的、标准化的流式数据处理平台:用户能通过这个平台去获取想要的统一标准化的数据

  5. 数据的分级保障能力:Dump 日志,日志的产出需要分优先级进行保障(死保、尽力保...),用户能放心的用数据

如图:

因此诞生了这个框架。

3.定义、目标篇-做这个框架的目标、预期效果是什么

上述的痛点很多,本节就从最痛的任务重启的延迟角度出发解决问题,揭秘字节动态配置化的 flink 任务的实现。

预期效果如下:

      1.即在任务不停止的情况下可以动态的上线一个动态规则、一个 sink kafka topic,上线某个、某类埋点对应的流数据的 kafka topic

如图左边是修改配置,添加了一个拆流规则以及对应 topic,右边这个规则 topic 就开始产出数据,对应的 console consumer 就消费到了复合规则的数据。(gif 加载可能比较慢)

8

      2.即在任务不停止的情况下可以动态的下线一个动态规则、一个 sink kafka topic,下线某个、某类埋点对应的流数据的 kafka topic

如图左边是修改配置,删除了一个拆流规则以及对应 topic,右边这个规则 topic 就不产出数据了,对应的 console consumer 就没有新数据可以消费了。(gif 加载可能比较慢)

9

      3.总体效果如下:

4.难点剖析篇-此框‍架建设的难点、业界目前的实现

首先带大家分析下,实现这个框架,最基本的模块都需要包含什么:

  1. flink 任务:本身就是一个 Map 任务,逻辑简单

  2. 动态上下线规则配置:肯定得有一个动态配置中心去告诉 flink 任务需要新上下线一个 kafka topic

  3. 动态规则过滤引擎:flink 任务监听到规则发生动态变化之后,要热更新规则,将新的规律规则应用起来。需要一个动态代码执行引擎

  4. 动态上下线 Kafka topic:目前大多数公司用的是 flink 自带的 kafka-connector,一旦涉及到需要添加一个下游,就需要添加一个 kafka producer operator,因为涉及到多加了一个 operator,那肯定得重启任务。需要动态添加删除 producer 的能力。

5.数据建设篇-框架具体方案设计

5.1.方案设计

5.1.1.方案

先说说方案选择的结论:

  1. flink 入口任务:Map 模型使用 ProcessFunction 底层算子

  2. 动态上下线规则配置:配置中心开源的有很多,这里为了实现轻量化,实现简单,使用 zookeeper 作为动态规则配置中心。当然如果对 zk 压力大,也可以使用广播配置实现。

  3. 动态规则引擎:规则引擎很多,比如常见的可以使用 javascript、Groovy、jython、mvel2、freemarker 等等,太多了。考虑到性能、易用性选用 janino 将动态规则动态编译出 class。然后作为动态规则引擎使用。后面会详述选用 janino 的原因。

  4. 动态上下线 Kafka topic:去除 flink-kafka-connector,直接在 ProcessFunction 中使用原生 kafka-clients 输出数据,维护一个 producer 池。

整体方案架构图如图所示:

5.1.2.预期效果

5.1.2.1.上线配置

4

5.1.2.2.下线配置

5

5.2.具体实现

整个任务的实现非常简单。

本地运行,可以参考下面两篇安装 zk 和 kafka。

  • zk:https://www.jianshu.com/p/5491d16e6abd

  • kafka:https://www.jianshu.com/p/dd2578d47ff6

5.2.1.flink 任务入口逻辑

首先来看看整个任务的入口逻辑,ProcessFunction 的功能很简单:

  1. 针对数据源的每一条日志数据,遍历动态规则引擎池

  2. 只要这条数据满足某一条规则的条件,就将这条日志数据写出到规则对应的 topic 中

 env.addSource(new UserDefinedSource())
    .process(new ProcessFunction<ClientLogSource, ClientLogSink>() {
        // 动态规则配置中心
        private ZkBasedConfigCenter zkBasedConfigCenter;
        // kafka producer 管理中心
        private KafkaProducerCenter kafkaProducerCenter;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.zkBasedConfigCenter = ZkBasedConfigCenter.getInstance();
            this.kafkaProducerCenter = KafkaProducerCenter.getInstance();
        }

        @Override
        public void processElement(ClientLogSource clientLogSource, Context context, Collector<ClientLogSink> collector)
                throws Exception {
            
            // 遍历所有的动态规则
            this.zkBasedConfigCenter.getMap().forEach(new BiConsumer<Long, DynamicProducerRule>() {
                @Override
                public void accept(Long id, DynamicProducerRule dynamicProducerRule) {
                    // 验证该条数据是否符合该条规则
                    if (dynamicProducerRule.eval(clientLogSource)) {
                        // 将符合规则的数据发向对应规则的 topic 中
                        kafkaProducerCenter.send(dynamicProducerRule.getTargetTopic(), clientLogSource.toString());
                    }

                }
            });
        }

        @Override
        public void close() throws Exception {
            super.close();
            // 关闭规则池
            this.zkBasedConfigCenter.close();
            // 关闭 producer 池
            this.kafkaProducerCenter.close();
        }
    });

env.execute();

5.2.2.动态上下线规则配置

来看 flink ProcessFunction 中的核心点,第一部分就是 ZkBasedConfigCenter。其功能包含:

  1. 任务启动时,初始化加载 zk 配置,初始化规则池,将规则池中的配置规则编译成 class 可执行规则

  2. 监听 zk 配置变更,将新增配置加入规则池,将下线配置从规则池删除

5.2.2.1.动态规则 schema 设计

动态规则包含的内容与用户需求息息相关:

举例:用户需要将在首页上报 +  id > 300 用户的客户端日志都写入 topic_id_bigger_than_300_and_main_page 的 kafka topic 中。

那么针对这个 flink 任务来说就有以下三项用户的输入:

  1. 动态规则的过滤条件:即上游每一条数据过来之后检验这条数据是否满足规则条件。上面这个例子的条件就是 clientLogSource.getId() > 300 && clientLogSource.getPage().equals("首页");其中 clientLogSource 是原始日志 model

  2. 动态规则要写入的 topic 名称:这条规则过滤出来的数据要写入哪个 topic。上面这个例子的 topic 就是 topic_id_bigger_than_300_and_main_page

  3. 动态规则的唯一 id:唯一标识一个过滤规则的 id

针对上述要求设计动态规则配置的 schema 如下:

{
    "id-数值类型 string": {
  "condition-过滤条件": "1==1",
  "targetTopic-目标 topic 名称": "tuzisir1"
 }
 "1": {
  "condition": "clientLogSource.getId() > 300 && clientLogSource.getPage().equals(\\"首页\\")",
  "targetTopic": "topic_id_bigger_than_300_and_main_page"
 },
 "2": {
  "condition": "clientLogSource.getPage().equals(\\"个人主页\\")",
  "targetTopic": "topic_profile_page"
 }
}

对应动态规则 model 设计如下:

public class DynamicProducerRule implements Evaluable {

    // 具体过滤规则
    private String condition;

    // 具体写入 topic
    private String targetTopic;

    // 使用 janino 编译的规则过滤器
    private Evaluable evaluable;

    public void init(Long id) {
        try {
            // 使用 janino 初始化规则
            Class<Evaluable> clazz = JaninoUtils.genCodeAndGetClazz(id, targetTopic, condition);
            this.evaluable = clazz.newInstance();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public boolean eval(ClientLogSource clientLogSource) {
        return this.evaluable.eval(clientLogSource);
    }

}

重点在于 Evaluable 接口,动态生成代码就是继承了这个接口,用于执行过滤规则的基础接口。

代码动态生成下面会详细介绍。

public interface Evaluable {

    // 动态规则接口过滤方法
    boolean eval(ClientLogSource clientLogSource);

}

5.2.2.2.基于 zk 的动态配置中心

使用了 zk 作为动态配置中心,来动态监听规则配置以及更新规则池。

public class ZkBasedConfigCenter {
    
    // zk config 变化监听器
    private TreeCache treeCache;

    // zk 客户端
    private CuratorFramework zkClient;

    private ZkBasedConfigCenter() {
        try {
            open();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    // !!!规则池!!!规则池!!!规则池
    private ConcurrentMap<Long, DynamicProducerRule> map = new ConcurrentHashMap<>();

    private void open() throws Exception {

        // 初始化规则
        // 初始化 zk config 监听器
        // 当有配置变更时
        // 调用 private void update(String json) 更新规则

    }

    public void close() {
        this.treeCache.close();
        this.zkClient.close();
    }

    private void update(String json) {

        Map<Long, DynamicProducerRule>
                result = getNewMap(json);

        // 1.将新增规则添加进规则池
        // 2.将下线规则从规则池删除
    }

    private Map<Long, DynamicProducerRule> getNewMap(String json) {
        // 将新规则解析,并使用 janino 进行初始化
    }
}

可以使用一个固定路径的配置,如图博主使用的是 /kafka-config 这个路径

7

5.2.3.动态规则引擎

目前字节使用的引擎是 Groovy,但是博主常用 flink sql,sql 中的代码生成是使用 janino 做的,因此就比较了 janino 和 groovy 的性能差异,janino 编译出的原生 class 性能接近原生 class,是 Groovy 的 4 倍左右。其他的引擎不考虑,要么易用性差,要么性能差。

Notes:性能这一点真的是很重要,1:4 的差距可以说是差别很大了。如果你的场景也是大流量,非常耗费性能的场景,建议直接入手 janino!!!

来看看具体的 benchmark case 代码:

// ClientLogSource 是原始日志
boolean eval(flink.examples.datastream._01.bytedance.split.model.ClientLogSource clientLogSource) {
    return String.valueOf(clientLogSource.getId()).equals("1");
}

上面这段代码,在博主 mac 本地执行,每次循环执行 5kw 次,总计执行 5 次 得出的结果如下:

java:847 ms
janino:745 ms
groovy:4110 ms

java:1097 ms
janino:1170 ms
groovy:4052 ms

java:916 ms
janino:1117 ms
groovy:4311 ms

java:915 ms
janino:1112 ms
groovy:4382 ms

java:921 ms
janino:1104 ms
groovy:4321 ms

重复执行了很多次:java object : janino 编译原生 class :groovy :几乎都是 1:1:4 的耗时。所以此处我们选择性能更好的 janino。

public class JaninoUtils {

    public static Class<Evaluable> genCodeAndGetClazz(Long id, String topic, String condition) throws Exception {
        // 动态生成代码
        // 初始化 Class<Evaluable> 并返回
    }

}

5.2.4.动态上下线 Kafka topic

来看入口类中的第二个核心点,就是 KafkaProducerCenter。其功能包含:

  1. 维护所有的 producer 池

  2. 提供消息发送接口

public class KafkaProducerCenter {

    // kafka producer 池
    private final ConcurrentMap<String, Producer<String, String>> producerConcurrentMap
            = new ConcurrentHashMap<>();

    private Producer<String, String> getProducer(String topicName) {

        // 如果 kafka producer 池中有当前 topic 的 producer,则直接返回
        // 如果没有,则初始化一个新的 producer 然后返回

    }

    public void send(String topicName, String message) {

        final ProducerRecord<String, String> record = new ProducerRecord<>(topicName,
                "", message);
        try {
            RecordMetadata metadata = getProducer(topicName).send(record).get();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void close() {
        // 关闭所有 producer 连接
    }
}

上面就是所有的代码、逻辑实现方案。其实整体看下来是非常简单的。

6. 数据保障篇-框架的保障方案

  1. 配置中心挂了怎么办?

为这个任务分配独立的队列资源,每当这个任务加载到最新配置时,都将配置在本地存储一份。当配置中心挂了的时候,还可以直接加载机器本地的配置,不至于什么都产出不了。

  1. 怎么保障用户的配置是无误的?

  • 上线前审批:有专门的埋点管理人员进行逻辑验证及管理

  • 上线前自动化测试:在埋点管理平台自动化验证逻辑正确性,保障上线到 flink 任务里的配置都是正确的

  • AOP 异常处理、报警:在环境中做 AOP 异常处理,将异常数据 dump 到专用异常 topic 中,也需要自动化把报警信息透出

  • 结果验证:针对最终的结果需要有数据准确性验证机制

7. 总结与展望篇

7.1.总结

本文主要揭秘、实现了字节跳动埋点数据实时动态处理引擎。

7.2.展望

  1. 本文主要实现了拆流的动态化,输出数据和输入数据完全相同,但是很多情况下,下游只需要其中的一些字段。因此之后还可以做到对于 sink message 字段、消息的个性化。比如可以加一个动态化的 Map 逻辑,将数据源中的 ClientLogSource 转化为任何用户想要的 Model。比如使用 Dynamic Message 或者使用代码生成去做。

  2. 目前过滤条件完全是 java 语法,之后可以扩展成为 sql 语法,提高可读性

  3. 函数、rpc 热加载

以上是关于揭秘字节跳动埋点数据实时动态处理引擎(附源码)的主要内容,如果未能解决你的问题,请参考以下文章

字节跳动埋点数据流建设与治理实践

揭秘字节跳动云原生Spark History 服务 UIService

火山引擎项亮揭秘字节跳动基于 HPC 的大规模机器学习技术|直播分享报名

免费活动字节跳动背后的音视频技术揭秘

字节跳动大数据开发面试题-附答案

字节跳动大数据开发面试题-附答案