Flink:如何在 flink 中处理外部应用配置更改

Posted

技术标签:

【中文标题】Flink:如何在 flink 中处理外部应用配置更改【英文标题】:Flink: How to handle external app configuration changes in flink 【发布时间】:2016-09-26 02:33:29 【问题描述】:

我的要求是在一天内流式传输数百万条记录,并且它对外部配置参数有很大的依赖性。例如,用户可以随时在 Web 应用程序中更改所需的设置,并且在进行更改后,必须使用新的应用程序配置参数进行流式传输。这些是应用级别的配置,我们还有一些动态排除参数,每个数据都必须通过和过滤。

我看到 flink 没有在所有任务管理器和子任务之间共享的全局状态。拥有一个集中式缓存是一种选择,但对于每个参数,我都必须从缓存中读取它,这会增加延迟。请就处理此类场景的更好方法以及其他应用程序如何处理它提出建议。谢谢。

【问题讨论】:

【参考方案1】:

更新正在运行的流应用程序的配置是常见的要求。在 Flink 的 DataStream API 中,这可以使用所谓的CoFlatMapFunction 来完成,它处理两个输入流。其中一个流可以是数据流,另一个可以是控制流。

以下示例展示了如何动态调整过滤掉超过一定长度的字符串的用户函数。

val data: DataStream[String] = ???
val control: DataStream[Int] = ???

val filtered: DataStream[String] = data
  // broadcast all control messages to the following CoFlatMap subtasks
  .connect(control.broadcast)
  // process data and control messages
  .flatMap(new DynLengthFilter)


class DynLengthFilter extends CoFlatMapFunction[String, Int, String] with Checkpointed[Int] 

  var length = 0

  // filter strings by length
  override def flatMap1(value: String, out: Collector[String]): Unit = 
    if (value.length < length) 
      out.collect(value)
    
  

  // receive new filter length
  override def flatMap2(value: Int, out: Collector[String]): Unit = 
    length = value
  

  override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): Int = length

  override def restoreState(state: Int): Unit = 
    length = state
  

DynLengthFilter 用户函数实现了过滤器长度的Checkpointed 接口。如果发生故障,此信息会自动恢复。

【讨论】:

嗨,我想这现在可以通过广播状态模式来实现,对吧? 是的,Flink 以后的版本可以使用 BroadcastState 来动态分发和管理配置参数。

以上是关于Flink:如何在 flink 中处理外部应用配置更改的主要内容,如果未能解决你的问题,请参考以下文章

Flink 如何解析与传递参数

FLINK 基于1.15.2的Java开发-如何使用外部配置文件

Flink 实战系列Flink 使用 ParameterTool 动态加载外部配置文件

Flink如何加载其他目录的jar包

Flink(1.12.1)日志配置Logback实现日志切分和kafka发送

会话窗口 flink