Flink详解系列之五--水位线(watermark)
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flink详解系列之五--水位线(watermark)相关的知识,希望对你有一定的参考价值。
参考技术A 在Flink中,水位线是一种衡量Event Time进展的机制,用来处理实时数据中的乱序问题的,通常是水位线和窗口结合使用来实现。从设备生成实时流事件,到Flink的source,再到多个oparator处理数据,过程中会受到网络延迟、背压等多种因素影响造成数据乱序。在进行窗口处理时,不可能无限期的等待延迟数据到达,当到达特定watermark时,认为在watermark之前的数据已经全部达到(即使后面还有延迟的数据), 可以触发窗口计算,这个机制就是 Watermark(水位线),具体如下图所示。
watermark本质上是一个时间戳,且是动态变化的,会根据当前最大事件时间产生。watermarks具体计算为:
当watermark时间戳大于等于窗口结束时间时,意味着窗口结束,需要触发窗口计算。
水位线生产的最佳位置是在尽可能靠近数据源的地方,因为水位线生成时会做出一些有关元素顺序相对时间戳的假设。由于数据源读取过程是并行的,一切引起Flink跨行数据流分区进行重新分发的操作(比如:改变并行度,keyby等)都会导致元素时间戳乱序。但是如果是某些初始化的filter、map等不会引起元素重新分发的操作,可以考虑在生成水位线之前使用。
周期性分配水位线比较常用,是我们会指示系统以固定的时间间隔发出的水位线。在设置时间为事件时间时,会默认设置这个时间间隔为200ms, 如果需要调整可以自行设置。比如下面的例子是手动设置每隔1s发出水位线。
周期水位线需要实现接口:AssignerWithPeriodicWatermarks,下面是示例:
定点水位线不是太常用,主要为输入流中包含一些用于指示系统进度的特殊元组和标记,方便根据输入元素生成水位线的场景使用的。
由于数据流中每一个递增的EventTime都会产生一个Watermark。
在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。
水位线可以用于平衡延迟和结果的完整性,它控制着执行某些计算需要等待的时间。这个时间是预估的,现实中不存在完美的水位线,因为总会存在延迟的记录。现实处理中,需要我们足够了解从数据生成到数据源的整个过程,来估算延迟的上线,才能更好的设置水位线。
如果水位线设置的过于宽松,好处是计算时能保证近可能多的数据被收集到,但由于此时的水位线远落后于处理记录的时间戳,导致产生的数据结果延迟较大。
如果设置的水位线过于紧迫,数据结果的时效性当然会更好,但由于水位线大于部分记录的时间戳,数据的完整性就会打折扣。
所以,水位线的设置需要更多的去了解数据,并在数据时效性和完整性上有一个权衡。
mongo 3.4分片集群系列之五:详解平衡器
这个系列大致想跟大家分享以下篇章(我会持续更新的↖(^ω^)↗):
2、mongo 3.4分片集群系列之二:搭建分片集群--哈希分片
3、mongo 3.4分片集群系列之三:搭建分片集群--哈希分片 + 安全
4、mongo 3.4分片集群系列之四:搭建分片集群--哈希分片 + 安全 + 区域
6、mongo 3.4分片集群系列之六:详解配置数据库
7、mongo 3.4分片集群系列之七:配置数据库管理
8、mongo 3.4分片集群系列之八:分片管理
这篇为理论篇
这篇章主要讲一下以下几个点:
1) 简要说说平衡器是啥吧
2) 平衡器在分片集群中是干什么的,也就是平衡器的作用
3) 这里还会仔细说明分片集群中迁移块的过程
4) 平衡器的定时启用和关闭
5) 怎么减少平衡器对分片集群的性能影响
6) 关于平衡器的一些命令
1、 简要说说平衡器是啥
MongoDB的平衡器是一个后台进程,监视每个分片上块的数目。当给定分片上的块数达到特定的迁移阈值时,平衡器会尝试在分片之间自动迁移块,使得每个分片达到相等数量的块。平衡器将比较多块的分片中的块迁移到块数量较少的分片中。平衡器迁移块,直到集合在分片之间的块均匀分布。
平衡器具有以下阈值:
块数 |
迁移阈值 |
少于20 |
2 |
20-79 |
4 |
多于80 |
8 |
分片集群的平衡过程对用户和应用层完全透明,尽管在执行过程中可能会有一些性能影响。
2、 平衡器在分片集群中是干什么的,也就是平衡器的作用
其中,上面已经介绍了,说得直白点,就是用来在分片集群中迁移块的。
3、 分片集群中迁移块的过程
所有块迁移使用以下过程:
- 平衡器进程将moveChunk命令发送到源分片。
- 源分片使用内部moveChunk 命令启动移动。在迁移过程中,块的操作路由到源分片。源分片负责块的写入操作。
- 目标分片生成目标分片所不存在的源分片所需的所有索引。
- 目标分片开始在块中请求文档,并开始接收数据的副本。
- 在块中收到最终文档后,目标分片将启动同步过程,以确保它具有对迁移过程中发生的已迁移文档的更改。
- 当完全同步时,源分片连接到 配置数据库,并使用块的新位置更新集群元数据。
- 源分片完成元数据的更新后,一旦块上没有打开光标,源分片将删除其文档副本。
注意: 如果平衡器需要从源分片执行其他的块迁移,则平衡器可以启动下一个块迁移,而不必等待当前的迁移过程来完成此删除步骤。
注意:不能移动的块:如果块中的文档数超过了250000个文档,或者配置的块大小除以平均文档大小的1.3倍,则MongoDB不能移动块 。 db.collection.stats() 的avgObjSize字段,表示集合中平均文档的大小。
在大多数情况下,应该让平衡器在分片之间 自动迁移块。但是,mongoDB也允许手动迁移块:
迁移单个块
下面的示例假定在一个myapp 数据库有users集合,分片键是字段username
为smith的块 。在mongoshell中使用以下命令迁移该块 。
db.adminCommand( { moveChunk : "myapp.users",
find : {username : "smith"},
to : "mongodb-shard3.example.net" } )
此命令将包含分片键值“smith”的块迁移到分片名为mongodb-shard3.example.net。
4、 平衡器的定时启用和关闭
利用计划平衡窗口,可以达到让平衡器定时启用和关闭。
4.1. 启用一个计划平衡窗口
以下过程指定了activeWindow,使得平衡器能够在特定的时间范围迁移块:
1) 使用mongo shell连接到群集中的任何一个mongos。
2) 切换到配置数据库。
use config
3) 确保平衡器的状态不是stopped。
sh.setBalancerState( true )
如果在activeWindow时间范围之外,平衡器将无法启动。
4) 修改平衡器的窗口。
使用update()设置activeWindow,如下所示:
db.settings.update(
{ _id: "balancer" },
{ $set: { activeWindow : { start : "<start-time>", stop : "<stop-time>" } } },
{ upsert: true }
)
使用两位数小时和分钟值(即HH:MM)指定平衡窗口的开始和结束边界时间来替换<start-time>和<end-time>。
-
- 对于HH值,使用小时值范围00- 23。
- 对于MM值,使用分钟值范围00- 59。
注意:平衡器窗口必须足以完成上次禁用平衡器时插入的所有数据的迁移。
以上过程设置之后,平衡器则会在平衡窗口规定的时间start—stop之间运行,在这段时间之外平衡器是被禁用的。
4.2. 移除一个平衡计划窗口
那么当这个平衡窗口不用时,怎么移除它呢?
如果设置了平衡窗口并希望删除计划,以使平衡器始终运行,请使用$unset以清除activeWindow,如下所示:
use config
db.settings.update({ _id : "balancer" }, { $unset : { activeWindow : true } })
5、 怎么减少平衡器对分片集群的性能影响
5.1. 系统自带机制减少平衡器对分片集群性能的影响
块的迁移在带宽和工作负载方面带来一些开销,这两者都可能影响数据库性能。平衡器 试图最小化的该影响:
-
- 在任何给定的时间将分片限制在最多一个迁移; 即分片不能同时参与多个块迁移。要从分片中迁移多个块,平衡器一次移动一个块。
版本3.4更改:从MongoDB 3.4开始,MongoDB可以执行并行块迁移。观察分片一次可以参与多达一个迁移的限制,对于具有n个分 片的分片集群,MongoDB可以同时执行最多n / 2(四舍五入)进行的块迁移。
-
- 只有当分片数量最多的分片和分片数量最少的分片之间的块数差异达到迁移阈值时才开始平衡。
5.2. 我们可以采取的措施
临时禁用平衡器进行维护(可以看第6点)。
可以设置计划平衡窗口(可以看第4点),以限制平衡器运行的窗口,以防止其影响生产流量。平衡窗口的规范是相对于配置服务器副本集的主节点本地时区。
6、 关于平衡器的一些命令
检查平衡器是否启用(即平衡器被允许运行) sh.getBalancerState()
检查平衡器是否在运行 sh.isBalancerRunning()
禁用平衡器 sh.stopBalancer()
启用平衡器 sh.startBalancer()或sh.setBalancerState(true)
在一个集合禁用平衡器 sh.disableBalancing("databaseName.collectionName")
sh.disableBalancing()方法接收集合的完整命名空间作为其参数。
在一个集合启用平衡器 sh.enableBalancing("students.grades")
sh.enableBalancing()方法接收集合的完整命名空间作为其参数。
--------------------------------------- over ------------------------------------------------------
以上资料,大部分是参考官网的资料,在此表示感谢。
https://docs.mongodb.com/manual/core/sharding-balancer-administration/
以上是关于Flink详解系列之五--水位线(watermark)的主要内容,如果未能解决你的问题,请参考以下文章
Flink处理函数实战之五:CoProcessFunction(双流处理)