从 BigQuery 缓慢更改查找缓存 - Dataflow Python 流式 SDK

Posted

技术标签:

【中文标题】从 BigQuery 缓慢更改查找缓存 - Dataflow Python 流式 SDK【英文标题】:Slowly Changing Lookup Cache from BigQuery - Dataflow Python Streaming SDK 【发布时间】:2019-07-30 00:04:00 【问题描述】:

我正在尝试遵循使用 Python SDK for Apache Beam on DataFlow 的流式管道的缓慢更改查找缓存 (https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1) 的设计模式。

我们的查找缓存参考表位于 BigQuery 中,我们能够读取它并将其作为辅助输入传递给 ParDo 操作,但无论我们如何设置触发器/窗口,它都不会刷新。

class FilterAlertDoFn(beam.DoFn):
  def process(self, element, alertlist):

    print len(alertlist)
    print alertlist

    …  # function logic

alert_input = (p | beam.io.Read(beam.io.BigQuerySource(query=ALERT_QUERY))
                        | ‘alert_side_input’ >> beam.WindowInto(
                            beam.window.GlobalWindows(),
                            trigger=trigger.RepeatedlyTrigger(trigger.AfterWatermark(
                                late=trigger.AfterCount(1)
                            )),
                            accumulation_mode=trigger.AccumulationMode.ACCUMULATING
                          )
                       | beam.Map(lambda elem: elem[‘SOMEKEY’])
)

...


main_input | ‘alerts’ >> beam.ParDo(FilterAlertDoFn(), beam.pvalue.AsList(alert_input))

根据此处的 I/O 页面 (https://beam.apache.org/documentation/io/built-in/),它说 Python SDK 仅支持 BigQuery Sink 的流式传输,这是否意味着 BQ 读取是有界源,因此无法在此方法中刷新?

尝试在源上设置非全局窗口会导致侧输入中的 PCollection 为空。


更新: 在尝试实施 Pablo 的回答建议的策略时,使用侧面输入的 ParDo 操作不会运行。

有一个输入源连接到两个输出,其中一个使用侧输入。 Non-SideInput 仍将到达其目的地,并且 SideInput 管道不会进入 FilterAlertDoFn()。

通过将侧输入替换为虚拟值,管道将进入函数。是不是在等待一个不存在的合适窗口?

使用与上面相同的 FilterAlertDoFn(),我的 side_input 和 call 现在看起来像这样:

def refresh_side_input(_):
   query = 'select col from table'
   client = bigquery.Client(project='gcp-project')
   query_job = client.query(query)

   return query_job.result()


trigger_input = ( p | 'alert_ref_trigger' >> beam.io.ReadFromPubSub(
            subscription=known_args.trigger_subscription))


bigquery_side_input = beam.pvalue.AsSingleton((trigger_input
         | beam.WindowInto(beam.window.GlobalWindows(),
                           trigger=trigger.Repeatedly(trigger.AfterCount(1)),
                           accumulation_mode=trigger.AccumulationMode.DISCARDING)
         | beam.Map(refresh_side_input)
        ))

...

# Passing this as side input doesn't work
main_input | 'alerts' >> beam.ParDo(FilterAlertDoFn(), bigquery_side_input)

# Passing dummy variable as side input does work
main_input | 'alerts' >> beam.ParDo(FilterAlertDoFn(), [1])

我尝试了几个不同版本的 refresh_side_input(),它们在检查函数内部的返回时报告了预期的结果。


更新 2:

我对 Pablo 的代码做了一些小的修改,得到了相同的行为 - DoFn 永远不会执行。

在下面的示例中,每当我发布到 some_other_topic 时,我都会看到“in_load_conversion_data”,但在发布到 some_topic

时永远不会看到“in_DoFn”
import apache_beam as beam
import apache_beam.transforms.window as window

from apache_beam.transforms import trigger
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions


def load_my_conversion_data():
    return 'EURUSD': 1.1, 'USDMXN': 4.4


def load_conversion_data(_):
    # I will suppose that these are currency conversions. E.g.
    # 'EURUSD': 1.1, 'USDMXN' 20,
    print 'in_load_conversion_data'
    return load_my_conversion_data()


class ConvertTo(beam.DoFn):
    def __init__(self, target_currency):
        self.target_currency = target_currency

    def process(self, elm, rates):
        print 'in_DoFn'
        elm = elm.attributes
        if elm['currency'] == self.target_currency:
            yield elm
        elif ' % s % s' % (elm['currency'], self.target_currency) in rates:
            rate = rates[' % s % s' % (elm['currency'], self.target_currency)]
            result = .update(elm).update('currency': self.target_currency,
            'value': elm['value']*rate)
             yield result
         else:
             return  # We drop that value


pipeline_options = PipelineOptions()
pipeline_options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=pipeline_options)

some_topic = 'projects/some_project/topics/some_topic'
some_other_topic = 'projects/some_project/topics/some_other_topic'

with beam.Pipeline(options=pipeline_options) as p:

    table_pcv = beam.pvalue.AsSingleton((
      p
      | 'some_other_topic' >>  beam.io.ReadFromPubSub(topic=some_other_topic,  with_attributes=True)
      | 'some_other_window' >> beam.WindowInto(window.GlobalWindows(),
                        trigger=trigger.Repeatedly(trigger.AfterCount(1)),
                        accumulation_mode=trigger.AccumulationMode.DISCARDING)
      | beam.Map(load_conversion_data)))


    _ = (p | 'some_topic' >> beam.io.ReadFromPubSub(topic=some_topic)
         | 'some_window' >> beam.WindowInto(window.FixedWindows(1))
         | beam.ParDo(ConvertTo('USD'), rates=table_pcv))

【问题讨论】:

这个问题很有趣 - 我会尽量在明天之前回复您。 嘿 Pablo - 你有机会看看这个。我也对它感兴趣;-) 啊是的 - 抱歉耽搁了。一秒…… 嗨@hulahoof,你能找到解决方案吗?我也面临与 python 相同的问题 【参考方案1】:

正如您所指出的,Java SDK 允许您使用更多的流式处理工具,例如计时器和状态。这些实用程序有助于实施此类管道。

Python SDK 缺少其中一些实用程序,尤其是计时器。出于这个原因,我们需要使用 hack,其中可以通过在 PubSub 中的 some_other_topic 中插入消息来触发侧输入的重新加载。

这也意味着您必须手动执行对 BigQuery 的查找。您或许可以使用 apache_beam.io.gcp.bigquery_tools.BigQueryWrapper 类直接在 BigQuery 中执行查找。

以下是刷新某些货币换算数据的管道示例。我还没有测试过它,但我有 90% 的把握它只需要很少的调整就可以工作。让我知道这是否有帮助。

pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)

def load_conversion_data(_):
  # I will suppose that these are currency conversions. E.g. 
  # ‘EURUSD’: 1.1, ‘USDMXN’ 20, …
  return external_service.load_my_conversion_data()

table_pcv = beam.pvalue.AsSingleton((
  p
  | beam.io.gcp.ReadFromPubSub(topic=some_other_topic)
  | WindowInto(window.GlobalWindow(),
               trigger=trigger.Repeatedly(trigger.AfterCount(1),
               accumulation_mode=trigger.AccumulationMode.DISCARDING)
  | beam.Map(load_conversion_data)))


class ConvertTo(beam.DoFn):
  def __init__(self, target_currency):
    self.target_currenct = target_currency

  def process(self, elm, rates):
    if elm[‘currency’] == self.target_currency:
      yield elm
    elif ‘%s%s’ % (elm[‘currency’], self.target_currency) in rates:
      rate = rates[‘%s%s’ % (elm[‘currency’], self.target_currency)]
      result = .update(elm).update(‘currency’: self.target_currency,
                                      ‘value’: elm[‘value’]*rate)
      yield result
    else:
      return  # We drop that value


_ = (p 
     | beam.io.gcp.ReadFromPubSub(topic=some_topic)
     | beam.WindowInto(window.FixedWindows(1))
     | beam.ParDo(ConvertTo(‘USD’), rates=table_pcv))

【讨论】:

感谢您回复我 pablo,让我试试上面的方法,如果它解决了我的问题,请告诉您:) 嗨 Pablo,不幸的是,使用上面的方法我的 ParDo 无法运行 - 我将更新我的问题以提供更多详细信息 嗨@Pablo,我已经更新了我的问题,尝试修改您的代码以供我使用,并简单地运行您的示例(使用一些额外的必要位)。您能否尝试运行 UPDATE 2 中的代码,如果您观察到相同的行为,请告诉我? 我相信我遇到了同样的问题。 @Pablo 有这个模式的参考会很棒,因为它可能是一个常见的要求。 让 DoFn 中的代码执行的一个简单修复方法是对侧输入和主管道使用相同的窗口方案(即beam.WindowInto(window.FixedWindows(1)))。这对你有用吗?我认为您仍然应该能够使用辅助 Pub/Sub 主题的消息频率来控制刷新率。代码here 但变化很小......

以上是关于从 BigQuery 缓慢更改查找缓存 - Dataflow Python 流式 SDK的主要内容,如果未能解决你的问题,请参考以下文章

缓存动态查询 PHP + BigQuery

谷歌数据流运行极其缓慢缓慢

BigQuery 请求查找表之间的更改

BigQuery 中的联接性能缓慢

BigQuery - 使用更改/删除的记录更新表

如何修复 BigQuery 中缓慢的 _TABLE_SUFFIX 查询?