Flink 实践 | B站流式传输架构的前世今生
Posted Java可可
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink 实践 | B站流式传输架构的前世今生相关的知识,希望对你有一定的参考价值。
01 背景
Lancer是B站的实时流式传输平台,承载全站服务端、客户端的数据上报/采集、传输、集成工作,秒级延迟,作为数仓入口是B站数据平台的生命线。目前每日峰值 5000w/s rps, 3PB/天, 4K+条流的数据同步能力。
服务如此大的数据规模,对产品的可靠性、可扩展性和可维护性提出了很高的要求。流式传输的实现是一个很有挑战的事情,聚焦快、准、稳的需求, Lancer整体演进经历了大管道模型、BU粒度管道模型、单流单作业模型三个阶段的演进,下面我们娓娓道来。
02 关键词说明
logid:每个业务方上报的数据流以logid进行标识,logid是数据在传输+集成过程中的元信息标识。
数据源:数据进入到lancer的入口,例如:log-agent,bfe-agent,flink cdc
lancer-gateway(数据网关):接收数据上报的网关。
数据缓冲层:也叫做内部kafka,用于解耦数据上报和数据分发。
lancer-collector(数据分发层):也叫做数据同步,可以根据实际场景完成不同端到端的数据同步。
03 技术演进
整个B站流式数据传输架构的演进大致经历了三个阶段。
3.1 架构V1.0-基于flume的
大管道数据传输架构(2019之前)
B站流式传输架构建立之初,数据流量和数据流条数相对较少,因此采用了全站的数据流混合在一个管道中进行处理,基于flume二次定制化的数据传输架构,架构如下:
-
整个架构从数据生成到落地分为:数据源、数据网关、数据缓冲、数据分发层。
-
数据上报端基本采用sdk的方式直接发送http和grpc请求上报。
-
数据网关lancer-gateway是基于flume二次迭代的数据网关,用于承载数据的上报,支持两种协议:http用于承载公网数据上报(web/app),grpc用于承载IDC内服务端数据上报。
-
数据缓冲层使用kafka实现,用于解耦数据上报和数据分发。
-
数据分发层lancer-collector同样是基于flume二次迭代的数据分发层,用于将数据从缓冲层同步到ODS。
v1.0架构在使用中暴露出一些的痛点:
1. 数据源端对于数据上报的可控性和容错性较差,例如:
-
数据网关故障情况下,数据源端缺少缓存能力,不能直接反压,存在数据丢失隐患。
-
重SDK:SDK中需要添加各种适配逻辑以应对上报异常情况
2. 整体架构是一个大管道模型,资源的划分和隔离不明确,整体维护成本高,自身故障隔离性差。
3. 基于flume二次迭代的一些缺陷:
-
逻辑复杂,性能差,我们需要的功能相对单一
-
hdfs分发场景,不支持exactly once语义,每次重启,会导致数据大量重复
3.2 架构V2.0-BU粒度的
管道化架构(2020-2021)
针对v1.0的缺陷,我们引入了架构v2.0,架构如下:
此架构的关键细节如下:
1. 强化了数据上报源端的边缘可控能力
-
服务器上部署log-agent承载服务端数据上报。
-
cdn上部署bfe-agent用于承载公网(web端、app端)数据上报。
-
log-agent/bfe-agent中集成数据缓冲、预聚合、流控、重试、降级等能力,数据上报sdk只需专注数据的生成和上报逻辑。
-
agent端基于logid的BU属性,将数据路由到不同的管道。
2. 数据管道以BU为粒度搭建,管道间资源隔离,每个管道包含整套独立的完整数据传输链路,并且数据管道支持基于airflow快速搭建。故障隔离做到BU级别。
3. 数据网关升级到自研lancer-gateway2.0,逻辑精简,支持流控反压,并且适配kafka failover, 基于k8s进行部署。
4. hdfs分发基于flink jar进行实现:支持exactly once语义保证。
V2.0架构相对于v1.0, 重点提升了数据上报边缘的可控力、BU粒度管道间的资源划分和隔离性。但是随着B站流式数据传输规模的快速增加,对数据传输的时效性、成本、质量也提出了越来越高的要求,V2.0也逐渐暴露出了一些缺陷:
1. logid级别隔离性差:
-
单个管道内部某个logid流量陡增,几倍甚至几十倍,依然会造成整个管道的数据分发延迟,
-
单个管道内分发层组件故障重启,例如:hdfs分发对应的flink jar作业挂掉重启,从checkpoint恢复,此管道内所有的logid的hdfs分发都会存在归档延迟隐患。
2. 网关是异步发送模型,极端情况下(组件崩溃),存在数据丢失风险。
3. ods层局部热点/故障影响放大
-
由于分发层一个作业同时分发多个logid,这种大作业模型更易受到ods层局部热点的影响,例如:hdfs某个datanode热点,会导致某个分发作业整体写阻塞,进而影响到此分发作业的其他logid, kafka分发同理。
-
hdfs单个文件块的所有副本失效,会导致对应分发任务整体挂掉重启。
4. hdfs小文件问题放大
- hdfs分发对应的flink jar作业为了保证吞吐,整体设置的并发度相对较大。因此对于管道内的所有logid,同一时刻都会打开并发度大小的文件数,对于流量低的logid,就会造成小文件数量变大的问题。
针对上述痛点,最直接的解决思路就是整体架构做进一步的隔离,以单logid为维度实现数据传输+分发。面临的挑战主要有以下几个方面:
-
如何保证全链路以logid为单位进行隔离,如何在资源使用可控的情况下合理控流并且保证数据流之间的隔离性
-
需要与外部系统进行大量的交互,如何适配外部系统的各种问题:局部热点、故障
-
集成作业的数量指数级增加,如何保障高性能、稳定性的同时并且高效的进行管理、运维、质量监控。
3.3 架构V3.0-基于Flink SQL的
单流单作业数据集成方案
在V3.0架构中,我们对整体传输链路进行了单作业单数据流隔离改造,并且基于Flink SQL支撑数据分发场景。架构如下:
相比v2.0, 资源池容量管理上依然以BU为粒度,但是每个logid的传输和分发相互独立,互不影响。具体逻辑如下 :
-
agent:整体上报SDK和agent接收+发送逻辑按照logid进行隔离改造,logid间采集发送相互隔离。
-
lancer-gateway3.0:logid的请求处理之间相互隔离,当kafka发送受阻,直接反压给agent端,下面详细介绍。
-
数据缓冲层:每个logid对应一个独立的内部kafka topic,实现数据的缓冲。
-
数据分发层:分发层对每个logid的启动独立的flink sql作业进行数据的分发,单个logid处理受阻,只会导致当个logid的数据堆积。
相较于之前的实现,v3.0架构具有以下的优势:
1. 可靠性:
- 功能质量上整理链路可以保证数据不丢失,网关层以同步方式发送数据,可以保证数据被持久化到内部kafka;flink支持状态恢复和exactly once的语义,同样保证数据不丢。
2. 可维护性上:
-
隔离性上logid之间相互隔离,一个logid出现问题,其他logid不受影响。
-
资源分配以logid为最小单位,可以精确控制单个logid的资源使用。
3. 可扩展性:
- 可以以单个logid为单位灵活管控:灵活的扩缩资源
04 V3.0架构具体实现
我们重点介绍下,当前V3.0结构各个分层的实现。
4.1 数据上报边缘层
4.1.1 log-agent
基于go自研,插件化架构,部署于物理机,可靠、高效的支持服务端数据上报。
时间架构分为收集、处理、发送三层,具有以下主要特性:
-
支持文件采集和unix sock两种数据上报方式
-
与网关GRPC通信:ACK+退避重试+流控
-
整体上报SDK和agent接收+发送逻辑按照logid进行隔离改造,单logid处理相互隔离:每个logid启动独立的pipeline进行采集、解析、发送。
-
网关基于服务发现,自适应网关的调整
-
发送受阻情况下,基于磁盘进行本地堆积
-
logid粒度的埋点监控,实时监控数据的处理状态
-
CGroup资源限制:CPU + 内存
-
数据聚合发送,提升传输效率
-
支持物理机和容器日志此采集,配置随应用发布,自适应配置的增、删、改。
4.1.2 bfe-agent
基于go自研,部署于cdn,用于承载公网数据上报。
边缘cdn节点,cdn服务器上部署nginx和bfe-agent,bfe-agent整体实现架构与log-agent类似,对于web和app端数据上报请求QPS高、突发性强的特点,主要强化了以下能力:
-
应对流量陡增:基于边缘节点的本地缓冲起到削峰作用
-
策略(降级、流控)前置,增强可控力
-
logid级别分流隔离, 支持等级划分
-
聚合压缩回传以提升数据传输效率、降低成本,回源QPS降低90%以上。
4.2 数据上报网关层
v3.0方案中,数据数据网关的架构如下:
数据网关功能特性如下:
-
kafka的通用代理层:支持grpc /http协议
-
基于kafka send callback实现了同步发送模型,保证数据不丢:数据写入kafka后,再对请求返回ack
-
请求不拆分:基于agent的聚合机制,只支持单次请求单条记录,因此一条记录对应一条缓存层kakfa的消息
-
lancer-gateway3.0根据请求的topic信息,发送请求到对应的kafka集群
-
lancer-gateway3.0适配kafka集群的局部热点:支持partition动态剔除
-
logid与topic一一对应,处理流程中相互隔离:一个topic发送受阻,不影响其他的topic
整个数据网关中的实现难点是:单gateway承载多logid处理的过程中如何保证隔离性和公平性,我们参考了Golang 中GMP的机制,整体数据流程如下:
1. 收到的请求,会把请求放到logid对应的请求队列,如果队列满,直接拒绝请求
2. 每个kafka集群,会初始化一个N大小的kafka producer pool,其中每个producer会遍历所有的队列,进行数据的发送。
3. 对于每个logid的请求队列,会从两个维护限制资源的占用,以保证公平性和隔离性
-
限制当个logid队列绑定的producer数量
-
基于时间片限定当个producer服务于单个队列的时间长度
4.3 数据上报分发层
随着flink在实时计算领域的成熟,其高性能、低延迟、exactly once语义保证、批流一体、丰富的数据源支持、活跃的社区等优势,促使我们选择了以flink sql作为数据分发层的解决方案。当前我们主要支持了kafka→hive, kafka→kafka, cdc→kafka->hudi/hive三种场景:
1. kafka→hive
-
以流式方式,实时导入数据到hive。
-
file rolling on check,保证exactly once。
-
按照event time写入分区和归档,归档延迟小于15min
-
支持text+lzo(行存)和 orc+zstd(列存)两种存储格式。
-
支持下游作业增量同步。
2. kafka→kafka
-
以流式方式,支持数据的实时同步
-
支持kafka header metadata信息的透传
3. cdc→kafka->hudi/hive
-
以实时流的方式同步全量和增量数据,整个cdc的使用场景分为两个环节
-
cdc → kafka
-
基于cdc 2.1,同步mysql的全量和增量binlog同步
-
单sql作业支持分库分表、多库多表的同步。
-
支持根据db和table自定义策略分流到不同的数据缓冲层kafka topic
-
kafka→hudi/hive
-
消费单topic同步到单张hudi/hive表,支持event_time落分区。
-
保证数据最终一致性
05 Flink connector功能迭代
在Flink SQL数据分发场景的支持中,针对我们遇到的实际需求,对社区原生connector进行了对应的优化,我们针对性的介绍下。
5.1 hive sink connector优化
断流空分区提交
背景:B站离线作业的拉起依赖上游分区提交,HDFS分区提交的判断依赖于作业整体watermark的推进,但是某些logid在断流的情况下,如何进行分区的提交呢
解决办法:
如图所示:当所有的StreamFileWriter连续两次checkpoint内未处理任何数据的情况下,StreamingFileCommiter会判定发生了断流,按照当前时间提交分区。
支持下游增量数据同步
背景:传统方式ods到dwd的数据同步只有当ods层分区ready之后才会开始,时效性较差,如何加速数据的同步?
解决办法:
-
不依赖ods层分区ready,当ods目录中文件生成后,即可开始数据的处理,以增量的方式读取数据文件。
-
通过HDFS的list操作来获取需要读取的文件,对NameNode压力较大,为此我们提供了文件list列表索引(包括文件名和数据条数),下游只需要读取索引,即可获取增量文件列表。
-
实现中索引文件状态被持久化到state中,snapshot中生成.inflight状态临时文件,notifyCheckpointComplete中将文件rename成commit正式文件, 提供exactly once语义保证。
-
下游作业读取文件索引,支持ods到dwd的增量数据同步。
orc+zstd
背景:相较于行式存储,列式存储在压缩比上有着显著的优势。
解决办法:支持orc+zstd, 经过测试,相较于text+lzo,空间节省在40%以上。
hdfs异步close
背景:snapshot阶段flush数据,close文件经常因为个别文件慢拖累整体吞吐。
解决办法:
-
将close超时的文件扔到异步队列中。也就是 close 的动作不会去堵塞整个主链路的处理,提升hdfs局部热点情况下的吞吐。异步close 文件列表保存到pendingPartsForCurrentCheckpoint,并且持久化到 state 当中。故障恢复时,也能继续对文件进行关闭。
-
异步close的引入,会引入分区提前创建的隐患,为此引入了对于bucket状态的判断。对于某分区,只有当隶属于此分区的所有bucket中的pendingPartsForCurrentCheckpoint为空(所有文件都进行了关闭),才在commit算子中进行分区的提交。
小文件合并
背景:rolling on checkpoint的滚动策略,会导致文件数量的膨胀,对namenode产生较大的压力。
解决办法:
-
引入了小文件合并功能,在checkpoint完成后,由 Streaming writer 的 notifyCheckpointComplete 方法触发合并操作,向下游发送EndCheckpoint信号。
-
coordinator 收到每个writer的EndCheckpoint后,开始进行文件的分组,封装成一个个compactunit广播下游,全部unit发送完之后,再广播EndCompaction。
-
compact operator找到属于自己的任务后开始处理,当收到EndCompaction后,往下游发送分区提交信息。
5.2 kafka connector优化
支持protobuf format
背景:用户有处理protobuf格式数据的需求
解决办法:
-
使用protoc 生成java类,打包jar,上传到实时计算平台。
-
实现对应的DeserializationSchema和SerializationSchema,动态加载pb类并通过反射调用方法,完成pb bytes与RowData的互转。
kafka sink支持自定义分流
背景:用户希望在一个sql作业中根据需要,灵活定制将消息发送到指定kafka 集群和topic。
解决办法:
-
支持用户自定义udf,灵活选择sql中的字段作为udf的入参,在udf内部,用户根据业务场景定制逻辑,返回topic或者broker list。最终sink内部发送到对应的kafka集群和topic。
-
kakfa sink内部动态加载udf,通过反射机制实时获取对应的broker和topic,同时支持结果的缓存。
-
例子:
CREATE TABLE sink_test (
broker_name_arg varchar,
topic_name_arg varchar,
message string,
t1 string
) WITH(
'bootstrapServers' = 'BrokerUdf(broker_name_arg)', // 根据broker_name_arg作为udf参数计算brokers
'bootstrapServers.udf.class' = 'com.bilibili.lancer.udf.BrokerUdf', // 获取brokers Udf
'topic' = 'TopicUdf(broker_name_arg, topic_name_arg)', // 根据broker_name_arg和topic_name_arg作为udf参数计算topic
'topic.udf.class' = 'com.bilibili.lancer.udf.TopicUdf', // 计算topoc Udf
'udf.cache.min' = '1', // 缓存时间
'exclude.udf.field' = 'false', // udf的相关字段是否输出
'connector' = 'kafka-diversion'
);
5.3 cdc connector优化
sql场景下多库多表场景支持
背景:原生的flink cdc source在单个sql任务中,只能同步相同DDL定义的表,如果需要同步异构DDL,不得不启动多个独立的job进行同步。这样会存在资源的额外开销。
解决办法:
-
sql定义去DDL:
原生flink cdc source会对所有监听到的数据在反序列化时根据sql ddl定义做column转换和解析,以RowData的形式传给下游。我们在cdc-source中新增了一种的format方式:changelog bytes序列化方式。该format在将数据反序列化时在不再进行column转换和解析,而是将所有column直接转换为changelog-json二进制传输,外层将该二进制数据直接封装成RowData再传给下游。对下游透明,下游在消费kafka数据的时候可以直接通过changelog-json反序列化进行数据解析。并且由于该改动减少了一次column的转换和解析工作,通过实际测试下来发现除自动感知schema变更外还能提升1倍的吞。在kafka sink connector中,根据db和table进行分流,可以支持发送到不同的topic。
-
扩展metadata,添加sequence:
将增量数据同步到kafka中,由于kafka存在多分区,因此必然会导致消息乱序问题。因此需要提供一个单任务内严格单调递增的sequence,用于下游消费者进行排序,保证数据的最终一致性。最终我们提取binlog中的gtid作为binlog消息的sequence id,通过metadata的方式暴露处理来,写入kafka record的header中,对于全量数据,sequence设置为0。
断流场景分区提交支持
背景:由于整个cdc方案存在上游和下游两个独立的job,并且都是基于event time推进watermark做分区的提交,下游watermark的推进受阻可能受到数据正常断流或者上游作业异常两种原因的影响,如果正确判断呢?
解决办法:
- 在cdc source connector内定义一种新类型的record HeartbeatRecord,此record时间为当前时间。当发现某张表数据停止发送时,定期mock心跳数据进行发送。正常断流情况下,下游作业可以根据心跳信息正常推进watermark,并且可以过滤丢弃此信息。
- 最终cdc connector sql样例:
CREATE TABLE mysql_binlog (
host_name STRING METADATA FROM 'host_name' VIRTUAL,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP(3) METADATA FROM 'op_ts' VIRTUAL,
sequence BIGINT METADATA FROM 'sequence' VIRTUAL, // sequence严格单调递增
heartbeat BOOLEAN METADATA FROM 'heartbeat'VIRTUAL, // 对于心跳信息标识为true
mtime TIMESTAMP(3) METADATA FROM 'mtime'VIRTUAL, // 提取mtime,用于下游推进watermark
id BIGINT NOT NULL,
filed_list BYTES NOT NULL, // 去DDL,在source内部数据全部按照changelog-json格式进行序列化、
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'xxxx',
'port' = '3552',
'username' = 'datacenter_cdc',
'password' = 'xxx',
'database-name' = 'xxx',
'debezium.format' = 'bytes',
'table-name' = 'xxx',
'server-time-zone' = 'Asia/Shanghai',
'heartbeat.enable'='true',
'scan.incremental.snapshot.chunk.size' = '80960'
);
06 架构稳定性优化
为了保障流式传输稳定和高效运行,我们在以下几个方面做了一些优化,分别介绍下:
6.1 管道热点优化
作业在正常运行的过程中,经常遇到局部热点问题,例如kafka/hdfs io热点导致局部并行度消费速度下降或者写入受阻、yarn队列机器load不均匀导致作业部分并行度消费能力不如,虽然原因多种多样,但是本质看,这些问题的一个共性就是由于局部热点导致局部数据延迟。针对这个问题,我们分别从局部流量调度和全局流量调度两个维度进行优化。
局部流量调度
局部流量调度的优化思路是在单个producer和task内部,分区之间进行流量的重分配。目前在两个点就行了优化:
-
bsql Task manager内部subtask上下游通信优化:
集成作业并没有keyby的需求,基于Flink Credit-based Flow Control反压机制,可以通过Backlog Size判断下游任务的处理负载,那么我们就可以将Round-robin发送的方式修改为根据Channel的Backlog Size信息选择负载更低的下游Channel发送的方式。注意:此种策略只有source和sink端之间是rebalance/rescale时,才有效果。会造成一定的序列化开销,但是测试下来可以接受。
-
kafka producer partition自动剔除机制:
kafka producer在发送数据callback异常(绝大多数是timeout)超出一定的阈值,会将对应tp从available partition list中进行剔除,后续record将不再发送到剔除的tp。同时,被剔除tp后续将进行恢复性测试,如果数据可以正常发送,将重新放入到available partition list中。目前此机制在flink kafka sink connector和标准kafka client都进行了实现。
全局流量调度
全局流量调度的优化思路是整个传输链路层级之间的流量调配,目前我们将生产者(lancer-gateway)与消费者(flink sql kafka source)进行联动,当消费者出现tp消费lag的情况,通过注册黑名单(lag partition)到zookeeper,上游生产者感知黑名单,停止向高lag partition中继续发送数据。
Flink kafka source中基于flink AggregateFunction机制,kafka source subtask上报lag到job manager,job manager基于全局lag判断注册黑名单到zookeeper
黑名单判断逻辑:当单tp lag > min(全局lag平均值,全局lag中位数)* 倍数 && 单tp lag 大于 lag绝对值, 其中 "单tp lag 大于 lag绝对值" 是为了规避此机制过于敏感,"单tp lag > min(全局lag平均值,全局lag中位数)* 倍数" 用于筛选出头部的lag tp。为了防止黑名单比例过大,黑名单剔除的tp数量上限不得大于全部tp数量的一定比例。
局部流量调度和全局流量调度在管道热点优化效果上存在一定的互补性,但是也各有优势。
6.2 全链路埋点质量监控
数据质量是重要一环,通常数据质量包含完整性、时效性、准确性、一致性、唯一性等方面,对于数据传输场景,当面我们重点关注完整性和时效性两个方面
整体质量方案大致包含监控数据采集和规则配置两个大的方向,整体架构如下:
监控数据采集
我们自研了trace系统:以logid为单位,我们在数据处理流程中的每一层都进行了监控埋点
-
每层埋点包含三个方面:接收、发送、内部错误。所有埋点数据以数据创建时间(ctime)进行窗口对齐,并且通过更新utime以统计层间和层内的处理耗时。
-
通过监控埋点可以实时统计出:端到端、层级间、层级内部的数据处理耗时、完整性、错误数。
-
当前方案缺陷:flink sql挂掉从ck恢复,监控数据不能保证幂等,后续需要进一步改进。
监控报警规则
我们针对数据流进行了分级,每个等级指定了不同的保障级别(SLA),SLA破线,报警通知oncall同学处理。
延迟归档报警:hdfs分区提交延迟,触发报警。
实时完整性监控:基于trace数据,实时监控端到端的完整性,接收条数/落地条数
离线数据完整性:hdfs分区ready后,触发dqc规则运行,对比接收条数(trace数据)/落地条数(hive查询条数)
传输延迟监控:基于trace数据,计算端到端数据传输延迟的分位数。
DQC阻塞:离线数据完整性异常后,阻塞下游作业的调度。
6.3 kafka同步断流重复优化
相对比2.0方案中flume方案,基于flink sql的kafka到kafka的实现方案明显的一个变化就是作业的重启、故障恢复会导致整体的断流和一定比例的数据重复(从checkpoint恢复),因此如何降低用户对此类问题的感知,至关重要。
首先梳理下可能造成问题出现的原因:1)作业升级重启 2)task manager故障 3)job manager 故障 4)checkpoint连续失败,同时根据flink job整体提交流程,影响作业恢复速度的关键环节是资源的申请。根据上述分析和针对性测试,针对kafka同步场景的断流重复采用了如下优化方式:
-
checkpoint interval设置成10s:降低从checkpoint恢复导致的数据重复比例
-
基于session模式提交作业:作业重启无需重复申请资源
-
jobmanager.execution.failover-strategy=region,单个tm挂掉后,只恢复对应的region,不用恢复整个作业。集成作业DAG相对简单,可以尽量规避rebalance的出现,降低恢复的比例。
-
使用小资源粒度task manager(2core cpu,8GB memory,2 slot):同等资源规模下,tm数量变多,单tm挂掉影响程度明显变低。
-
针对高优作业冗余task manager:冗余一个tm,当单个tm挂掉情况下,流量几乎没受影响
-
基于zookeeper实现job manager ha:在开启jm ha后,jm挂掉任务未断流
-
针对checkpoint连续失败的场景,我们引入了regional checkpoint,以region(而不是整个topology)作为checkpoint管理的单位,防止个别task的ck失败造成整个作业的失败,可以有效防止在个别task的ck连续失败的情况下需要回溯的数据量,减小集群波动(网络,HDFS IO等)对checkpoint的影响
经过上述优化,经过测试一个(50core,400GB memory,50 slot)规模的作业,优化效果如下:
6.4 kafka流量动态failover能力
为了保证数据及时上报,Lancer对于数据缓冲层的kafka的发送成功率依赖性很高,经常遇到的case是高峰期或者流量抖动导致的kafka写入瓶颈。参考Netflix Hystrix 熔断原理,我们在网关层实现了一种动态 kafka failover机制:网关可以根据实时的数据发送情况计算熔断率,根据熔断率将流量在normal kafka和failover kafka之间动态调节。
- 基于滑动时间窗口计算熔断比例:滑动窗口的大小为10,每个窗口中统计1s内成功和失败的次数。
-
熔断器状态:关闭/打开/半开,熔断率=fail_total/sum_total , 为避免极端情况流量全切到 failover,熔断率需要有一个上限配置。熔断后的降级策略:normal kafka 熔断后尝试切 failover,failover kafka 如果也熔断的话就切回 normal
-
判断逻辑:
6.5 全链路流控、反压、降级
从端上上报到数据落地的整个流程中,为了保证稳定性和可控性,除了前述手段,我们还引入了整体流控、反压、降级等技术手段,下面综合介绍下。
从后向前,分为几个环节:
1. 数据分发层:
-
如果出现消费延迟,数据反压到数据缓冲层kafka
-
单作业内部通过backlog反压做subtask之间的流量均衡
2. 数据网关层:
-
如果写入kafka延迟,直接返回流控码(429)给数据上报端
-
数据网关层和数据分发层之间通过 kafka tp级别流控调度适配局部tp处理延迟。
3. 数据上报层:
-
适配数据网关的流控返回:做退避重试
-
基于本地磁盘进行数据的堆积
-
配置动态推送生效主动采样/降级堆积
6.6 开发阶段质量验证
为了在开发阶段保证整体服务的正确性和稳定性,开发阶段我们设计了一套完整的测试框架。
-
新版本上线之前,我们会同时双跑新旧两条作业链路,将数据分别落入两张hive表,并且进行全分区的条数和内容md5校验,校验结果以小时级别/天级别报表的形式发出。此测试框架保证了版本迭代的过程中,端到端的正确性。
-
同时为了保证异常极端情况下数据的准确性,我们也引入了混沌测试,主动注入一些异常。异常包括:job manager挂掉,taskmanager挂掉、作业随机重启、局部热点、脏数据等等。
07 未来展望
-
链路架构升级,接入公司级的数据网关(Databus),架构统一并且可以涵盖更多的数据上报场景。
-
云原生,拥抱K8S,面向用户quota管理,并且实现自动资源AutoScale。
-
拥抱批流一体,强化增量化集成,覆盖离线批集成场景,打造统一基于Flink的统一化集成框架。
Apache Flink 在 bilibili 的多元化探索与实践
摘要:本文由 bilibili 大数据实时平台负责人郑志升分享,本次分享核心讲解万亿级传输分发架构的落地,以及 AI 领域如何基于 Flink 打造一套完善的预处理实时 Pipeline。本次分享主要围绕以下四个方面:
B 站实时的前世与今生
Flink On Yarn 的增量化管道的方案
Flink 和 AI 方向的一些工程实践
未来的发展与思考
Tips:点击文末「阅读原文」即可回顾作者原版分享视频~
GitHub 地址
https://github.com/apache/flink
欢迎大家给 Flink 点赞送 star~
一、B 站实时的前世与今生
1. 生态场景辐射
说起实时计算的未来,关键词就在于数据的实效性。首先从整个大数据发展的生态上,来看它的核心场景辐射:在大数据发展的初期,核心是以面向天为粒度的离线计算的场景。那时候的数据实效性多数都是以运算以天为单位,它更加注重时间和成本的平衡。
随着数据应用,数据分析以及数据仓库的普及与完善,越来越多的人对数据的实效性提出了更高的要求。比如,当需要做一些数据的实时推荐时,数据的实效将决定它的价值。在这种情况下,整个实时计算的场景就普遍诞生。
但在实际的运作过程当中,也遇到了很多场景 ,其实并没有对数据有非常高的实时性要求,在这种情况下必然会存在数据从毫秒,秒或者天的新的一些场景,实时场景数据更多是以分钟为粒度的一些增量计算的场景。对于离线计算,它更加注重成本;对实时计算,它更加注重价值实效;而对于增量计算,它更加注重去平衡成本,以及综合的价值和时间。
2. B 站的时效性
在三个维度上,B 站的划分是怎样的?对于 B 站而言 ,目前有 75% 的数据是通过离线计算来进行支撑的,另外还有 20% 的场景是通过实时计算, 5% 是通过增量计算。
对于实时计算的场景, 主要是应用在整个实时的机器学习、实时推荐、广告搜索、数据应用、实时渠道分析投放、报表、olap、监控等;
对于离线计算,数据辐射面广,主要以数仓为主;
对于增量计算,今年才启动一些新的场景,比如说 binlog 的增量 Upsert 场景。
3. ETL 时效性差
对于实效性问题 ,其实早期遇到了很多痛点 ,核心集中在三个方面:
第一,传输管道缺乏计算能力。早期的方案,数据基本都是要按天落到 ODS ,DW 层是凌晨过后的第二天去扫描前一天所有 ODS 层的数据,也就是说,整体数据没办法前置清洗;
第二,含有大量作业的资源集中爆发在凌晨之后,整个资源编排的压力就会非常大;
第三、实时和离线的 gap 是比较难满足的,因为对于大部分的数据来说,纯实时的成本过高,纯离线的实效又太差。同时,MySQL 数据的入仓时效也不太够。举个例子,好比 B 站的弹幕数据 ,它的体量非常夸张,这种业务表的同步往往需要十几个小时,而且非常的不稳定。
4. AI 实时工程复杂
除了实效性的问题 早期还遇到了 AI 实时工程比较复杂的问题:
第一,是整个特征工程计算效率的问题。同样的实时特征的计算场景, 也需要在离线的场景上进行数据的回溯,计算逻辑就会重复开发;
第二,整个实时链路比较长。一个完整的实时推荐链路, 涵盖了 N 个实时和 M 个离线的十几个作业组成,有时候遇到问题排查,整个链路的运维和管控成本都非常高;
第三、随着 AI 人员的增多,算法人员的投入,实验迭代很难横向扩展。
5. Flink 做了生态化的实践
在这些关键痛点的背景下,我们集中针对 Flink 做了生态化的实践,核心包括了整个实时数仓的应用以及整个增量化的 ETL 管道,还有面向 AI 的机器学习的一些场景。本次的分享会更加侧重增量管道以及 AI 加 Flink 的方向上。下图展示了整体的规模,目前,整个传输和计算的体量,在万亿级的消息规模有 30000+ 计算核数,1000+ job 数以及 100 多个用户。
二、Flink On Yarn 的增量化管道的方案
1. 早期的架构
先来看一下整个管道早期的架构,从下图可以看出,数据其实主要是通过 Flume 来消费 Kafka 落到 HDFS。Flume 用它的事务机制,来确保数据从 Source 到 Channel, 再到 Sink 时候的一致性,最后数据落到 HDFS 之后,下游的 Scheduler 会通过扫描目录下有没有 tmp 文件,来判断数据是否 Ready,以此来调度拉起下游的 ETL 离线作业。
2. 痛点
在早期遇到了不少痛点:
第一个比较关键的是数据质量。
最先用的是 MemoryChannel,它会存在数据的丢失,之后也试过用 FileChannel 的模式,但性能上无法达到要求。此外在 HDFS 不太稳定的情况下,Flume 的事务机制就会导致数据会 rollback 回滚到 Channel,一定程度上会导致数据不断的重复。在 HDFS 极度不稳定的情况下,最高的重复率会达到百分位的概率;
Lzo 行存储,早期的整个传输是通过分隔符的形式,这种分隔符的 Schema 是比较弱约束的,而且也不支持嵌套的格式。
第二点是整个数据的时效,无法提供分钟级的查询,因为 Flume 不像 Flink 有 Checkpoint 斩断的机制,更多是通过 idle 机制来控制文件的关闭;
第三点是下游的 ETL 联动。前文有提到,我们更多是通过扫描 tmp 目录是否 ready 的方案,这种情况下 scheduler 会大量的和 NameNode 调用 hadoop list 的 api,这样会导致 NameNode 的压力比较大。
3. 稳定性相关的痛点
在稳定性上也遇到很多问题:
第一,Flume 是不带状态的,节点异常或者是重启之后,tmp 没法正常关闭;
第二,早期没有依附大数据的环境,是物理部署的模式,资源伸缩很难去把控,成本也会相对偏高;
第三,Flume 和 HDFS 在通信上有问题。比如说当写 HDFS 出现堵塞的情况,某一个节点的堵塞会反压到 Channel,就会导致 Source 不会去 Kafka 消费数据,停止拉动 offset,一定程度上就会引发 Kafka 的 Rebalance,最后会导致全局 offset 不往前推进,从而导致数据的堆积。
4. 万亿级的增量管道 DAG 视图
在如上的痛点下,核心方案基于 Flink 构建了一套万亿级的增量管道,下图是整个运行时的 DAG 视图。
首先,在 Flink 架构下,KafkaSource 杜绝了 rebalance 的雪崩问题,即便整个 DAG 视图中有某个并发度出现数据写 HDFS 的堵塞,也不会导致全局所有 Kafka 分区的堵塞。此外的话,整个方案本质是通过 Transform 的模块来实现可扩展的节点。
第一层节点是 Parser,它主要是做数据的解压反序列化等的解析操作;
第二层是引入提供给用户的定制化 ETL 模块,它可以实现数据在管道中的定制清洗;
第三层是 Exporter 模块,它支持将数据导出到不同的存储介质。比如写到 HDFS 时,会导出成 parquet;写到 Kafka,会导出成 pb 格式。同时,在整个 DAG 的链路上引入了 ConfigBroadcast 的模块来解决管道元数据实时更新、热加载的问题。此外,在整个链路当中,每分钟会进行一次 checkpoint,针对增量的实际数据进行 Append,这样就可以提供分钟级的查询。
5. 万亿级的增量管道整体视图
Flink On Yarn 的整体架构,可以看出其实整个管道视图是划分以 BU 为单位的。每个 Kafka 的 topic,都代表了某一种数据终端的分发,Flink 作业就会专门负责各种终端类型的写入处理。视图里面还可以看到,针对 blinlog 的数据,还实现了整个管道的组装,可以由多个节点来实现管道的运作。
6. 技术亮点
接下来来看一下整个架构方案核心的一些技术亮点,前三个是实时功能层面的一些特色,后三个主要是在一些非功能性层面的一些优化。
对于数据模型来说,主要是通过 parquet,利用 Protobuf 到 parquet 的映射来实现格式收敛;
分区通知主要是因为一条管道其实是处理多条流,核心解决的是多条流数据的分区 ready 的通知机制;
CDC 管道更多是利用 binlog 和 HUDI 来实现 upsert 问题的解决;
小文件主要是在运行时通过 DAG 拓扑的方式来解决文件合并的问题;
HDFS 通信实际是在万亿级规模下的很多种关键问题的优化;
最后是分区容错的一些优化。
■ 6.1 数据模型
业务的开发主要是通过拼装字符串,来组装数据的一条条记录的上报。后期则是通过了模型的定义和管理,以及它的开发来组织的,主要是通过在平台的入口提供给用户去录制每一条流、每个表,它的 Schema ,Schema 会将它生成 Protobuf 的文件,用户可以在平台上去下载 Protobuf 对应的 HDFS 模型文件,这样,client 端的开发完全就可以通过强 Schema 方式从 pb 来进行约束。
来看一下运行时的过程,首先 Kafka 的 Source 会去消费实际上游传过来的每一条 RawEvent 的记录,RawEvent 里面会有 PBEvent 的对象,PBEvent 其实是一条条的 Protobuf 的记录。数据从 Source 流到的 Parser 模块,解析后会形成 PBEvent,PBEvent 会将用户在平台录入的整个 Schema 模型,存储在 OSS 对象系统上,Exporter 模块会动态去加载模型的变更。然后通过 pb 文件去反射生成的具体事件对象,事件对象最后就可以映射落成 parquet 的格式。这里主要做了很多缓存反射的优化,使整个 pb 的动态解析性能达到六倍的提升。最后,我们会将数据会落地到 HDFS,形成 parquet 的格式。
■ 6.2 分区通知优化
前面提到管道会处理上百条流,早期 Flume 的架构,其实每个 Flume 节点,很难去感应它自己处理的进度。同时,Flume 也没办法做到全局进度的处理。但是基于 Flink,就可以通过 Watermark 的机制来解决。
首先在 Source 会基于消息当中的 Eventime 来生成 Watermark,Watermark 会经过每一层的处理传递到 Sink,最后会通过 Commiter 模块,以单线程的方式来汇总所有 Watermark 消息的进度。当它发现全局 Watermark 已经推进到下个小时的分区的时候,它会下发一条消息到 Hive MetStore,或者是写入到 Kafka, 来通知上小时分区数据 ready,从而可以让下游的调度可以更快的通过消息驱动的方式来拉起作业的运行。
■ 6.3 CDC管道上的优化
下图右侧其实是整个 cdc 管道完整的链路。要实现 MySQL 数据到 Hive 数据的完整映射,就需要解决流和批处理的问题。
首先是通过 Datax 将 MySQL 的数据全量一次性同步到的 HDFS。紧接着通过 spark 的 job,将数据初始化成 HUDI 的初始快照,接着通过 Canal 来实现将 Mysql 的 binlog 的数据拖到的 Kafka 的 topic,然后是通过 Flink 的 Job 将初始化快照的数据结合增量的数据进行增量更新,最后形成 HUDI 表。
整个链路是要解决数据的不丢不重,重点是针对 Canal 写 Kafka 这块,开了事务的机制,保证数据落 Kafka topic 的时候,可以做到数据在传输过程当中的不丢不重。另外,数据在传输的上层其实也有可能出现数据的重复和丢失,这时候更多是通过全局唯一 id 加毫秒级的时间戳。在整个流式 Job 中,针对全局 id 来做数据的去重,针对毫秒级时间来做数据的排序,这样能保证数据能够有序的更新到的 HUDI。
紧接着通过 Trace 的系统基于 Clickhouse 来做存储,来统计各个节点数据的进出条数来做到数据的精确对比。
■ 6.4 稳定性 - 小文件的合并
前面提到,改造成 Flink 之后,我们是做了每分钟的 Checkpoint,文件数的放大非常严重。主要是在整个 DAG 当中去引入 merge 的 operater 来实现文件的合并,merge 的合并方式主要是基于并发度横向合并,一个 writer 会对应一个 merge。这样每五分钟的 Checkpoint,1 小时的 12 个文件,都会进行合并。通过种方式的话,可以将文件数极大的控制在合理的范围内。
■ 6.5 HDFS 通信
实际运作过程当中经常会遇到整个作业堆积比较严重的问题,实际分析其实主是和 HDFS 通信有很大的关系。
其实 HDFS 通讯,梳理了四个关键的步骤:初始化 state、Invoke、Snapshot 以及 Notify Checkpoint complete。
核心问题主要发生在 Invoke 阶段,Invoke 会达到文件的滚动条件,这时候会触发 flush 和 close。close 实际和 NameNode 通信的时候,会经常出现堵塞的情况。
Snapshot 阶段同样会遇到一个问题,一个管道上百条流一旦触发 Snapshot,串行执行 flush 和 close 也会非常的慢。
核心优化集中在三个方面:
第一,减少了文件的斩断,也就是 close 的频次。在 Snapshot 阶段,不会去 close 关闭文件,而更多的是通过文件续写的方式。这样,在初始化 state 的阶段,就需要做文件的 Truncate 来做 Recovery 恢复。
第二,是异步化 close 的改进,可以说是 close 的动作不会去堵塞整个总链路的处理,针对 Invoke 和 Snapshot 的 close,会将状态管理到 state 当中,通过初始化 state 来进行文件的恢复。
第三,针对多条流,Snapshot 还做了并行化的处理,每 5 分钟的 Checkpoint, 多条流其实就是多个 bucket,会通过循环来进行串行的处理,那么通过多线程的方式来改造,就可以减少 Checkpoint timeout 的发生。
■ 6.6 分区容错的一些优化
实际在管道多条流的情况下,有些流的数据并不是每个小时都是连续的。
这种情况会带来分区,它的 Watermark 没有办法正常推进,引发空分区的问题。所以我们在管道的运行过程当中,引入 PartitionRecover 模块,它会根据 Watermark 来推进分区的通知。针对有些流的 Watermark,如果在 ideltimeout 还没有更新的情况下,Recover 模块来进行分区的追加。它会在每个分区的末尾到达的时候,加上 delay time 来扫描所有流的 Watermark,由此来进行兜底。
在传输过程当中,当 Flink 作业重启的时候,会遇到一波僵尸的文件,我们是通过在 DAG 的 commit 的节点,去做整个分区通知前的僵尸文件的清理删除,来实现整个僵尸文件的清理,这些都属于非功能性层面的一些优化。
三、Flink 和 AI 方向的一些工程实践
1. 架构演进时间表
下图是 AI 方向在实时架构完整的时间线。
早在 2018 年,很多算法人员的实验开发都是作坊式的。每个算法人员会根据自己熟悉的语言,比如说 Python,php 或 c++ 来选择不同的语言来开发不同的实验工程。它的维护成本非常大,而且容易出现故障;
2019 年上半年,主要是基于 Flink 提供了 jar 包的模式来面向整个算法做一些工程的支持,可以说在整个上半年的初期,其实更多是围绕稳定性,通用性来做一些支持;
2019 年的下半年,是通过自研的 BSQL,大大降低了模型训练的门槛,解决 label 以及 instance 的实时化来提高整个实验迭代的效率;
2020 年上半年,更多是围绕整个特征的计算,流批计算打通以及特征工程效率的提升,来做一些改进;
到2020 年的下半年,更多是围绕整个实验的流程化以及引入 AIFlow,方便的去做流批 DAG。
2. AI 工程架构回顾
回顾一下整个 AI 工程,它的早期的架构图其实体现的是整个 AI 在 2019 年初的架构视图,其本质是通过一些 single task 的方式,各种混合语言来组成的一些计算节点,来支撑着整个模型训练的链路拉起。经过 2019 年的迭代,将整个近线的训练完全的替换成用 BSQL 的模式来进行开发和迭代。
3. 现状痛点
在 2019 年底,其实又遇到了一些新的问题,这些问题主要集中在功能和非功能两个维度上。
在功能层面:
首先从 label 转到产生 instance 流,以及到模型训练,到线上预测,乃至真正的实验效果,整个链路非常的长且复杂;
第二,整个实时的特征、离线特征、以及流批的一体,涉及到非常多的作业组成,整个链路很复杂。同时实验和 online 都要做特征的计算,结果不一致会导致最终的效果出现问题。此外,特征存在哪里也不好找,没办法去追溯。
在非功能性层面,算法的同学经常会遇到,不知道 Checkpoint 是什么,要不要开,有啥配置。此外,线上出问题的时候也不好排查,整个链路都非常的长。
所以第三点就是,完整的实验进度需要涉及的资源是非常多的,但是对算法来说它根本就不知道这些资源是什么以及需要多少,这些问题其实都都对算法产生很大的困惑。
4. 痛点归结
归根结底,集中在三个方面:
第一是一致性的问题。从数据的预处理,到模型训练,再到预测,各个环节其实是断层的。当中包括数据的不一致,也包括计算逻辑的不一致;
第二,整个实验迭代非常慢。一个完整的实验链路,其实对算法同学来说,他需要掌握东西非常多。同时实验背后的物料没办法进行共享。比如说有些特征,每个实验背后都要重复开发;
第三,是运维和管控的成本比较高。
完整的实验链路,背后其实是包含实时的一条工程加离线的一条工程链路组成,线上的问题很难去排查。
5. 实时 AI 工程的雏形
在这样的一些痛点下,在 20 年主要是集中在 AI 方向上去打造实时工程的雏形。核心是通过下面三个方面来进行突破。
第一是在 BSQL 的一些能力上,对于算法,希望通过面向 SQL 来开发以此降低工程投入;
第二是特征工程,会通过核心解决特征计算的一些问题来满足特征的一些支持;
第三是整个实验的协作,算法的目的其实在于实验,希望去打造一套端到端的实验协作,最终希望做到面向算法能够“一键实验”。
6. 特征工程-难点
我们在特征工程中遇到了一些难点。
第一是在实时特征计算上,因为它需要将结果利用到整个线上的预测服务,所以它对延迟以及稳定性的要求都非常的高;
第二是整个实时和离线的计算逻辑一致,我们经常遇到一个实时特征,它需要去回溯过去 30 天到到 60 天的离线数据,怎么做到实时特征的计算逻辑能同样在离线特征的计算上去复用;
第三是整个离线特征的流批一体比较难打通。实时特征的计算逻辑经常会带有窗口时序等等一些流式的概念,但是离线特征是没有这些语义的。
7. 实时特征
这里看一下我们怎么去做实时特征,图中的右侧是最典型的一些场景。比如说我要实时统计用户最近一分钟、6 小时、12 小时、24 小时,对各个 UP 主相关视频的播放次数。针对这样场景,其实里面有两个点:
第一、它需要用到滑动窗口来做整个用户过去历史的计算。此外,数据在滑动计算过程当中,它还需要去关联 UP 主的一些基础的信息维表,来获取 UP 主的一些视频来统计他的播放次数。归根结底,其实遇到了两个比较大的痛。
用 Flink 原生的滑动窗口,分钟级的滑动,会导致窗口比较多,性能会损耗比较大。
同时细粒度的窗口也会导致定时器过多,清理效率比较差。
第二是维表查询,会遇到是多个 key 要去查询 HBASE 的多个对应的 value,这种情况需要去支持数组的并发查询。
在两个痛点下,针对滑动窗口,主要是改造成为 Group By 的模式,加上 agg 的 UDF 的模式,将整个一小时、六小时、十二小时、二十四小时的一些窗口数据,存放到整个 Rocksdb 当中。这样通过 UDF 模式,整个数据触发机制就可以基于 Group By 实现记录级的触发,整个语义、时效性都会提升的比较大。同时在整个 AGG 的 UDF 函数当中,通过 Rocksdb 来做 state,在 UDF 当中来维护数据的生命周期。此外还扩展了整个 SQL 实现了数组级别的维表查询。最后的整个效果其实可以在实时特征的方向上,通过超大窗口的模式来支持各种计算场景。
8. 特征-离线
接下来看一下离线,左侧视图上半部分是完整的实时特征的计算链路,可以看出要解决同样的一条 SQL,在离线的计算上也能够复用,那就需要去解决相应的一些计算的 IO 都能够复用的问题。比如在流式上是通过 Kafka 来进行数据的输入,在离线上需要通过 HDFS 来做数据的输入。在流式上是通过 KFC 或者 AVBase 等等的一些 kv 引擎来支持,在离线上就需要通过 hive 引擎来解决,归根结底,其实需要去解决三个方面的问题:
第一,需要去模拟整个流式消费的能力,能够支持在离线的场景下去消费 HDFS 数据;
第二,需要解决 HDFS 数据在消费过程当中的分区有序的问题,类似 Kafka 的分区消费;
第三,需要去模拟 kv 引擎维表化的消费,实现基于 hive 的维表消费。还需要解决一个问题,当从 HDFS 拉取的每一条记录,每一条记录其实消费 hive 表的时候都有对应的 Snapshot,就相当于是每一条数据的时间戳,要消费对应数据时间戳的分区。
9. 优化
■ 9.1 离线-分区有序
分区有序的方案其实主要是基于数据在落 HDFS 时候,前置做了一些改造。首先数据在落 HDFS 之前,是传输的管道,通过 Kafka 消费数据。在 Flink 的作业从 Kafka 拉取数据之后,通过 Eventtime 去提取数据的 watermark,每一个 Kafka Source 的并发度会将 watermark 汇报到 JobManager 当中的 GlobalWatermark 模块,GlobalAgg 会汇总来自每一个并发度 Watermark 推进的进度,从而去统计 GlobalWatermark 的进展。根据 GlobalWatermark 的进展来计算出当中有哪些并发度的 Watermark 计算过快的问题,从而通过 GlobalAgg 下发给 Kafka Source 控制信息,Kafka Source 有些并发度过快的情况下,它的整个分区推进就降低速度。这样,在 HDFS Sink 模块,在同时间片上收到的数据记录的整个 Event time 基本上有序的,最终落到 HDFS 还会在文件名上去标识它相应的分区以及相应的时间片范围。最后在 HDFS 分区目录下,就可以实现数据分区的有序目录。
■ 9.2 离线-分区增量消费
数据在 HDFS 增量有序之后,实现了 HDFStreamingSource,它会针对文件做 Fecher 分区,针对每个文件都有 Fecher 的线程,且每个 Fecher 线程会统计每一个文件。它 offset 处理了游标的进度,会将状态根据 Checkpoint 的过程,将它更新到的 State 当中。
这样就可以实现整个文件消费的有序推进。在回溯历史数据的时候,离线作业就会涉及到整个作业的停止。实际是在整个 FileFetcher 的模块当中去引入一个分区结束的标识,且会在每一个线程去统计每一个分区的时候,去感应它分区的结束,分区结束后的状态最后汇总到的 cancellationManager,并进一步会汇总到 Job Manager 去更新全局分区的进度,当全局所有的分区都到了末尾的游标时候,会将整个 Flink 作业进行 cancel 关闭掉。
■ 9.3 离线 - Snapshot 维表
前面讲到整个离线数据,其实数据都在 hive 上,hive 的 HDFS 表数据的整个表字段信息会非常的多,但实际做离线特征的时候,需要的信息其实是很少的,因此需要在 hive 的过程先做离线字段裁剪,将一张 ODS 的表清洗成 DW 的表,DW 的表会最后通过 Flink 运行 Job,内部会有个 reload 的 scheduler,它会定期的去根据数据当前推进的 Watermark 的分区,去拉取在 hive 当中每一个分区对应的表信息。通过去下载某 HDFS 的 hive 目录当中的一些数据,最后会在整个内存当中 reload 成 Rocksdb 的文件,Rocksdb 其实就是最后用来提供维表 KV 查询的组件。
组件里面会包含多个 Rocksdb 的 build 构建过程,主要是取决于整个数据流动的过程当中的 Eventtime,如果发现 Eventtime 推进已经快到小时分区结束的末尾时候,会通过懒加载的模式去主动 reload,构建下一个小时 Rocksdb 的分区,通过这种方式,来切换整个 Rocksdb 的读取。
10. 实验流批一体
在上面三个优化,也就是分区有序增量,类 Kafka 分区 Fetch 消费,以及维表 Snapshot 的基础下,最终是实现了实时特征和离线特征,共用一套 SQL 的方案,打通了特征的流批计算。紧接着来看一下整个实验,完整的流批一体的链路,从图中可以看出最上面的粒度是整个离线的完整的计算过程。第二是整个近线的过程,离线过程其实所用计算的语义都是和近线过程用实时消费的语义是完全一致的,都是用 Flink 来提供 SQL 计算的。
来看一下近线,其实 Label join 用的是 Kafka 的一条点击流以及展现流,到了整个离线的计算链路,则用的一条 HDFS 点击的目录和 HDFS 展现目录。特征数据处理也是一样的,实时用的是 Kafka 的播放数据,以及 Hbase 的一些稿件数据。对于离线来说,用的是 hive 的稿件数据,以及 hive 的播放数据。除了整个离线和近线的流批打通,还将整个近线产生的实时的数据效果汇总到 OLAP 引擎上,通过 superset 来提供整个实时的指标可视化。其实从图可以看出完整的复杂流批一体的计算链路,当中包含的计算节点是非常的复杂和庞多的。
11. 实验协作 - 挑战
下阶段挑战更多是在实验协作上,下图是将前面整个链路进行简化后的抽象。从图中可以看出,三个虚线的区域框内,分别是离线的链路加两个实时的链路,三个完整的链路构成作业的流批,实际上就是一个工作流最基本的过程。里面需要去完成工作流完整的抽象,包括了流批事件的驱动机制,以及,对于算法在 AI 领域上更多希望用 Python 来定义完整的 flow,此外还将整个输入,输出以及它的整个计算趋于模板化,这样可以做到方便整个实验的克隆。
12. 引入 AIFlow
整个工作流上在下半年更多是和社区合作,引入了 AIFlow 的整套方案。
右侧其实是整个 AIFlow 完整链路的 DAG 视图,可以看出整个节点,其实它支持的类型是没有任何限制的,可以是流式节点,也可以是离线节点。此外的话,整个节点与节点之间通信的边是可以支持数据驱动以及事件驱动的。引入 AIFlow 的好处主要在于,AIFlow 提供基于 Python 语义来方便去定义完整的 AIFlow 的工作流,同时还包括整个工作流的进度的调度。
在节点的边上,相比原生的业界的一些 Flow 方案,他还支持基于事件驱动的整个机制。好处是可以帮助在两个 Flink 作业之间,通过 Flink 当中 watermark 处理数据分区的进度去下发一条事件驱动的消息来拉起下一个离线或者实时的作业。
此外还支持周边的一些配套服务,包括通知的一些消息模块服务,还有元数据的服务,以及在 AI 领域一些模型中心的服务。
13. Python 定义 Flow
来看一下基于 AIFlow 是如何最终定义成 Python 的工作流。右边的视图是一个线上项目的完整工作流的定义。第一、是整个是 Spark job 的定义,当中通过配置 dependence 来描述整个下游的依赖关系,它会下发一条事件驱动的消息来拉起下面的 Flink 流式作业。流式作业也同样可以通过消息驱动的方式来拉起下面的 Spark 作业。整个语义的定义非常的简单,只需要四个步骤,配置每节点的 confg 的信息,以及定义每节点的 operation 的行为,还有它的 dependency 的依赖,最后去运行整个 flow 的拓扑视图。
14. 基于事件驱动流批
接下来看一下完整的流批调度的驱动机制,下图右侧是完整的三个工作节点的驱动视图。第一个是从 Source 到 SQL 到 Sink。引入的黄色方框是扩展的 supervisor,他可以收集全局的 watermark 进度。当整个流式作业发现 watermark 可以推进到下一个小时的分区的时候,它会下发一条消息,去给到 NotifyService。NotifyService 拿到这条消息之后,它会去下发给到下一个作业,下一个作业主要会在整个 Flink 的 DAG 当中去引入 flow 的 operator,operator 在没有收到上个作业下发了消息之前,它会堵塞整个作业的运行。直到收到消息驱动之后,就代表上游其实上一个小时分区已经完成了,这时下个 flow 节点就可以驱动拉起来运作。同样,下个工作流节点也引入了 GlobalWatermark Collector 的模块来汇总收集它的处理的进度。当上一个小时分区完成之后,它也会下发一条消息到 NotifyService,NotifyService 会将这条消息去驱动调用 AIScheduler 的模块,从而去拉起 spark 离线作业来做 spark 离线的收尾。从里你们可以看出,整个链路其实是支持批到批,批到流以及流到流,以及流到批的四个场景。
15. 实时 AI 全链路的雏形
在流和批的整个 flow 定义和调度的基础上,在 2020 年初步构建出来了实时 AI 全链路的雏形,核心是面向实验。算法同学也可以基于 SQL 来开发的 Node 的节点,Python 是可以定义完整的 DAG 工作流。监控,告警以及运维是一体化的。
同时,支持从离线到实时的打通,从数据处理到模型训练,从模型训练到实验效果的打通,以及面向端到端的打通。右侧是整个近线实验的链路。下面是将整个实验链路产出的物料数据提供给在线的预测训练的服务。整体会有三个方面的配套:
一是基础的一些平台功能,包括实验管理,模型管理,特征管理等等;
其次也包括整个 AIFlow 底层的一些 service 的服务;
再有是一些平台级的 metadata 的元数据服务。
四、未来的一些展望
在未来的一年,我们还会更加集中在两个方面的一些工作。
第一是数据湖的方向上,会集中在 ODS 到 DW 层的一些增量计算场景,以及 DW 到 ADS 层的一些场景的突破,核心会结合 Flink 加 Iceberg 以及 HUDI 来作为该方向的落地。
在实时 AI 平台上,会进一步去面向实验来提供一套实时的 AI 协作平台,核心是希望打造高效,能够提炼简化算法人员的工程平台。
以上是关于Flink 实践 | B站流式传输架构的前世今生的主要内容,如果未能解决你的问题,请参考以下文章