GCP Dataflow + Apache Beam - 缓存问题

Posted

技术标签:

【中文标题】GCP Dataflow + Apache Beam - 缓存问题【英文标题】:GCP Dataflow + Apache Beam - caching question 【发布时间】:2021-04-29 17:35:03 【问题描述】:

我是 GCP、Dataflow、Apache Beam、Python 和 OOP 的新手。我来自函数式 javascript 领域,为了上下文。

现在我有一个使用 Apache Beam python sdk 构建的流式传输管道,并将其部署到 GCP 的 Dataflow。管道的源是一个 pubsub 订阅,而接收器是一个数据存储。

管道从 pubsub 订阅中获取消息,根据配置对象 + 消息内容做出决定,然后根据它做出的决定将其放在数据存储中的适当位置。目前一切正常。

现在我处于当前硬编码的配置对象需要更加动态的情况。我的意思是:我们现在不只是硬编码配置对象,而是要进行一个返回配置的 API 调用。这样,我们可以更新配置而无需重新部署管道。这目前也有效。

但是!我们预计会出现大量流量,因此获取传入的每条消息的配置并不理想。因此我们将获取移动到开始,就在实际管道开始之前。但这意味着我们立即失去了让它来自 API 调用的价值,因为 API 调用只在管道启动时发生一次。

以下是我们目前所做的(为了清楚起见,去掉了不相关的部分):

def run(argv=None):

    options = PipelineOptions(
        streaming=True,
        save_main_session=True
    )

    configuration = get_configuration() # api call to fetch config

    with beam.Pipeline(options=options) as pipeline:

        # read incoming messages from pubsub
        incoming_messages = (
            pipeline
            | "Read Messages From PubSub"
            >> beam.io.ReadFromPubSub(subscription=f"our subscription here", with_attributes=True))

         # make a decision based off of the message + the config
         decision_messages = (
                incoming_messages
                | "Create Decision Messages" >> beam.FlatMap(create_decision_message, configuration)
        )

create_decision_message 接收来自流 + 配置文件的传入消息,然后,您猜对了,做出决定。这是非常简单的逻辑。想想“如果消息是苹果,并且配置说我们只关心橘子,那么对消息什么都不做”。我们需要能够即时更新它以说“没关系,我们现在突然也关心苹果了”。

我需要想办法让管道知道它需要每 15 分钟重新获取该配置文件。我不完全确定使用我正在使用的工具来做到这一点的最佳方法是什么。如果是 javascript,我会这样做:

(请原谅伪代码,不确定这是否会真正运行,但你明白了)


let fetch_time = Date.now()  // initialized when app starts
let expiration = 900 // 900 seconds = 15 mins 
let config = getConfigFromApi() // fetch config right when app starts

function fetchConfig(now)
    if (fetch_time + expiration < now)  
      // if fetch_time + expiration is less than the current time, we need to re-fetch the config
      config = getConfigFromApi() // assign new value to config var
      fetch_time = now // assign new value to fetch_time var
   
    return config 


...

const someLaterTime = Date.now() // later in the code, within the pipeline, I need to use the config object
const validConfig = fetchConfig(someLaterTime) // i pass in the current time and get back either the memory-cached config, or a just-recently-fetched config

我不太确定如何将这个概念翻译成 python,我也不确定是否应该这样做。这是一个合理的尝试吗?还是这种类型的行为与我正在使用的堆栈不一致?我所处的位置是我团队中唯一一个从事此工作的人,而且这是一个新建项目,因此没有任何地方可以说明过去是如何完成的。我不确定我是否应该尝试解决这个问题,或者我应该说“对不起老板,我们需要另一个解决方案”。

感谢任何帮助,无论多么小...谢谢!

【问题讨论】:

【参考方案1】:

我认为有多种方法可以实现您想要实现的目标,最直接的方法可能是通过有状态处理,其中您通过有状态 DoFn 中的状态记录您的配置并设置循环计时器以刷新记录。

您可以在此处阅读有关状态处理的更多信息https://beam.apache.org/blog/timely-processing/

beam 编程指南中有关状态和计时器的更多信息:https://beam.apache.org/documentation/programming-guide/#types-of-state。

我想你可以定义你的处理逻辑,这需要 ParDo 中的配置,例如:

class MakeDecision(beam.DoFn):
  CONFIG = ReadModifyWriteState('config', coders.StrUtf8Coder())
  REFRESH_TIMER = TimerSpec('output', TimeDomain.REAL_TIME)

  def process(self,
              element,
              config=DoFn.StateParam(CONFIG),
              timer=DoFn.TimerParam(REFRESH_TIMER)):
    valid_config=
    if config.read():
      valid_config=json.loads(config.read())
    else: # config is None and hasn't been fetched before.
      valid_config=fetch_config() # your own fetch function.
      config.write(json.dumps(valid_config))
      timer.set(Timestamp.now() + Duration(seconds=900))
    # Do what ever you need with the config.
    ...
  
  @on_timer(REFRESH_TIMER)
  def refresh_config(self,
                     config=DoFn.StateParam(CONFIG),
                     timer=DoFn.TimerParam(REFRESH_TIMER)):
      valid_config=fetch_config()
      config.write(json.dumps(valid_config))
      timer.set(Timestamp.now() + Duration(seconds=900))

然后您现在可以使用 Stateful DoFn 处理您的消息。

with beam.Pipeline(options=options) as pipeline:
    pipeline
            | "Read Messages From PubSub"
            >> beam.io.ReadFromPubSub(subscription=f"our subscription here", with_attributes=True))
            | "Make decision" >> beam.ParDo(MakeDecision())

【讨论】:

以上是关于GCP Dataflow + Apache Beam - 缓存问题的主要内容,如果未能解决你的问题,请参考以下文章

在 GCP Dataflow 上的 python apache 光束中使用 scipy

JAVA - Apache BEAM- GCP:GroupByKey 与 Direct Runner 一起工作正常,但与 Dataflow runner 一起失败

Spring Cloud Dataflow 与 Apache Beam/GCP 数据流说明

Dataflow GCP(Apache Beam)-连续读取大量文件(OutOfMemory)

从 Apache Beam(GCP 数据流)写入 ConfluentCloud

Dataflow API 不会在 GCP 中激活