多表同步 ES 的问题
Posted lovesqcc
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多表同步 ES 的问题相关的知识,希望对你有一定的参考价值。
原始需求
对跨业务域数据提供联查搜索能力。
- 比如:对退款单提供根据退款单、退款状态、发货状态的联查,其中退款状态和发货状态是跨业务域。
- 比如:对订单提供根据订单号、订单状态、退款状态的联查,其中订单状态和退款状态是跨业务域。
为什么要上溯需求层面 ?要优化现有方案,容易局限在现有方案的框架里。上溯到需求层面,能够跳出现有方案框架,在更大的范围内搜索解决方案,亦可对现有方案的部分设计与实现的前提和约束有更为清晰的认识。
目标
将多源数据存储 (S1,S2,...,Sn) 的数据同步到具备联查能力的目标数据存储 T,且必须满足:
- T 中的不变数据与 Si 中对应的不变数据应当在指定计算关系下保持一致;
- 对于每一次同步,T 中的可变数据与 Si 中对应的可变数据在最新状态下保持一致。
在现实场景下,源数据存储 Si 通常是数据库 DB , 而 T 则有不同选择。 比如,有赞选择了 ES 。选择不同的存储,对解决方案的影响也是非常大的。
注意:需求和目标是存在差异的。目标是原始需求在当前环境约束下所能转化的第二层需求。如果跨业务域数据都存储在一张源数据存储中,也就是 源数据存储-目标数据存储 是 1:1 的关系,那么目标就是单数据存储的同步了,解决方案也会简单得多。
但是,在规范的设计理念的指引下,不同业务域的数据通常存储在不同的数据表里,因此才会有多表同步的需求。
制定目标时,需要考虑当前环境约束的影响。如果当前环境约束过强,则应适当考虑能否改造环境约束,使得设计方案有更灵活的空间可供选择。环境约束改变了,同样需求的目标也会发生变化。
设计关键
数据状态变更的每一次最新状态同步必须保持强一致性。
这里的难点在于可变状态的同步。因为不可变状态只需要原样同步即可,且在任何时刻进行均可。
可变状态的同步怎么弄呢 ? 假设状态是不可逆的,且数值是递增的,比如订单状态的数值只能从 99 -> 100 ,不可能从 100 -> 99, 这样,通过数值的比较(而不是时间戳),就可以过滤掉那些过时的状态。解决方案也会相对简单。
现实场景中,状态的逆转往往不可避免,状态数值的递增关系也不一定满足,因此,通用的方案不能以状态数字的递增关系为前提(否则就会削弱方案的通用性),而要以状态变更的原始时间戳为准。
ES 的前提
ES 存储的数据是一个大的 JSON 串,没有字段级别的版本控制,只有针对整个 JSON 串的一个版本控制。 在多表情形下,同步 ES 需要考虑全局版本控制问题。
ES 数据存储: https://elasticsearch.cn/article/6178
ES 版本乐观控制:https://es.xiaoleilu.com/030_Data/40_Version_control.html
多表同步的现有方案: https://tech.youzan.com/you-zan-ding-dan-tong-bu-de-tan-suo-yu-shi-jian-2/
同步ES的基本逻辑
情形一
假设有一个 索引 E 含有 (refund_id, order_no, rp_status, version)
有一个 DB 表 r (refund_id, order_no, rp_status, version)
同步方案:
只要按照 version 字段,同步到 ES 即可。对于 T1 M1, T2 M2 ,如果 T1.version < T2.verison ,将 T2 时刻的记录同步写入。 此时,使用非顺序队列即可。
结论:
理想的情形下, 一个 ES 表完全仅对应一个 DB , DB 含有 version 字段,可以作为 ES 表的 version 版本控制,使用非顺序队列。 做搜索索引,应尽可能符合这种情形。
情形二
现在假设有 t (order_no, shop_name) ,要把 shop_name 同步到 E 中。 shop_name 是不变的。
这时,只要在同步处理消息的时候, 关联 t 表,将 shop_name 读取,写入到 E 即可。 只是增加了一次 DB 访问,仍然可以使用非顺序队列。
结论:
将 DB1 , DB2 同步到 E 中, 以 DB1 为主, 获取 DB2 的不变字段,依然可以使用非顺序队列。DB1 的 version 字段作为版本控制。
需要注意的是,对于分库分表来说, 这里的 version 必须是全局递增的 version ,而不是某个分表的 version 。因为某个分表的 version 是不能满足递增特性的。
情形三
现在假设有 t (order_no, order_status) , 需要把 order_status 同步到索引 E 中。这时候,如果用 version 是不够的。
存在这样的情形:
假设退款单 R001 (T1 M1)→ (T2,M2) 发生了 rp_status = 1 → 3 ; 订单状态 order_status (T1‘, M1‘) → (T2‘, M2‘) 发生了 order_status = 1 → 5。
现在 T2 时刻的退款单消息 M2 先于 M1 抵达,实时获取了 T1‘ 的订单状态 1 ;得到了 T2R = (R001, E001, rp_status=3, order_status=1, version =3)
然后 T1 时刻的退款单消息 M1 抵达,实时获取了 T2‘ 的订单状态 5, 得到了 T1R =(R001, E001, rp_status=1, order_status=5, version=1)
由于 T2R.version > T1R.verison, 因此 T2R 写入。 然而,此时,order_status 的同步是错误的。
结论:
当 DB1 (主),DB2 (辅) 均要同步到索引 E 时,如果 DB1 和 DB2 所需同步的字段都存在变化,那么,使用 DB1 的 version 字段控制版本号是不可行的。这将导致 DB2 的字段同步变更存在错误。
此时,就同步 ES 来说,应当尽量避免这种情形。在设计方案的时候:
避免将两个表的可变状态同时同步到一个索引里。
在业务层就能做到把 DB2 的可变字段冗余到 DB1 里。不过,这样会增加 DB1 设计和业务更新的复杂度,且事先也不会想到这种冗余方法。
现有解决方案
要解决多表同步的问题,有两种现有方案:
使用顺序队列
上述的问题引发的原因是, DB1 的后更新的消息先抵达先处理。 使用顺序队列,使得 DB1 的先更新的消息始终先处理,这样,就不会导致 后更新的消息获取到过时的 DB2 的字段状态了。 使用顺序队列的原理是,设置 DB1 的主键 ID 作为顺序队列的排序 key 。 顺序队列的优势,是让业务方处理容易了;但顺序队列的并发吞吐量取决于队列分区数,且容易因为一条消息处理出错而阻塞后续的处理。
使用非顺序队列
使用非顺序队列,需要中间存储,自定义的全局版本号 G 和 一个全局的 存储 S
理想情况下,无论谁先到达,都应该写入 最新的数据。那么,这个全局存储 S 和 全局版本号 应该具备什么特征呢 ?
全局存储 S,应当含有同步 ES 所需的所有字段;
每次通过全局存储 S 和 全局版本号 G 的处理 GlobalHandler, 总能拿到所有字段的最新值 FVnew;
每次通过全局存储 S 和 全局版本号 G 的处理 GlobalHandler, 总能拿到递增的全局版本号 G;
以 G 为 ES 的版本号控制,总能将递增的 G 和 最新值 FVnew 写入。
实现思路:
第一点相对容易,梳理一下所需要同步的字段即可。举上为例:<refund_id, order_no, rp_status, order_status, G>
第二点:全局存储 S,具备字段级别的过滤能力,能够根据时间戳过滤掉状态过时的字段值;也就是说,对于要同步的字段 Fi (i= 1,2,..., N), 如果 Fi.timestamp_t1 > Fi.timestamp_t2 ,则 Fi_value_timestamp_t2 被丢弃。每次写入 S 之后,再取出 S 的最新值。 这正是 HBaseFilter 的功能。
比如说, M2 先抵达,T2 rp_status = 3 被同步到 S ,然后取出最新的值写入 E; 接着, M1 抵达,由于 rp_status = 1 的时间戳 T1 < T2 ,因此, rp_status = 1 会被 S 过滤掉,不起作用,然后取出最新的 S 写入 ES ; 接着 M3 抵达,将 order_status = 5 写入 S,再取出 S 的最新值写入 E。
第三点: 全局版本号 G 的计算,保证消息写入的递增性。 同一个表 比如 DB1 的时间戳是有先后的,比如 T2 > T1 , 但是 不同表的时间戳是没有先后的,比如 DB1 T2 与 T3 是无法确定谁大虽小的。
假设 Gf 是消息所带时间戳 T 的函数,Gf = G(T,Ginit) ,Ginit 是全局版本号 G 的初始值,那么 Gf 应当满足什么条件呢 ?
首先 G1 = Gf(T1, Ginit) , G2 = Gf(T2,G1) ,此时 应始终满足: G1 > Ginit , G2 > G1。
即:Gm = Gf(T1, Ginit) > Ginit, Gf(T2, Gm) > Gm 。
也就是说,对于任意的 t 及 Gm, 都有 Gf(t, Gm) > Gm。一个简单的实现是,Gf(t, Gm) = a * t + Gm + b, a 和 b 为常数。
考虑 t = 0 , 则 b > 0 。 取 b = 1. Gf(t, Gm) = a * t + Gm + 1
方案对比:
顺序队列方案更简单,只需要一个任务,所有同步逻辑都在这个任务里,且流程更符合自然思维;但顺序队列方案容易阻塞,吞吐量有瓶颈。适合中小型业务量。
非顺序队列方案,吞吐量更优,不会因为某个消息消费阻塞;不过方案也更复杂一点,需要多个任务,额外全局存储,且同步逻辑较为分散,不容易直接理解。适合大型业务量。
【未完待续】
以上是关于多表同步 ES 的问题的主要内容,如果未能解决你的问题,请参考以下文章
kettle利用excel文件增量同步一个库的数据(多表一次增量同步)
flink cdc MySQL2Doris 案例分享 解决分库多表同步
flink cdc MySQL2Doris 案例分享 解决分库多表同步