谷歌数据流流式传输管道在窗口化后不会将工作负载分配给多个工作人员

Posted

技术标签:

【中文标题】谷歌数据流流式传输管道在窗口化后不会将工作负载分配给多个工作人员【英文标题】:Google dataflow streaming pipeline is not distributing workload over several workers after windowing 【发布时间】:2019-07-12 19:53:41 【问题描述】:

我正在尝试在 python 中设置数据流流式传输管道。我在批处理管道方面有相当多的经验。我们的基本架构如下所示:

第一步是进行一些基本处理,每条消息大约需要 2 秒才能到达窗口。我们正在使用 3 秒和 3 秒间隔的滑动窗口(稍后可能会更改,因此我们有重叠窗口)。作为最后一步,我们的 SOG 预测需要大约 15 秒来处理,这显然是我们的瓶颈转换。

所以,我们似乎面临的问题是,工作量在窗口化之前完全分布在我们的工作人员身上,但最重要的转换根本没有分布。所有窗口一次处理一个,似乎只有 1 个工作人员,而我们有 50 个可用。

日志向我们显示,sog 预测步骤每 15 秒输出一次,如果窗口将由更多工作人员处理,情况就不应该如此,因此随着时间的推移,这会产生巨大的延迟,这是我们不希望的。对于 1 分钟的消息,最后一个窗口有 5 分钟的延迟。当分发工作时,这应该只有大约 15 秒(SOG 预测时间)。所以在这一点上我们一无所知..

有没有人知道我们的代码是否有问题或者如何防止/规避这个问题? 这似乎是谷歌云数据流内部发生的事情。这是否也发生在 java 流管道中?

在批处理模式下,一切正常。在那里,可以尝试进行重新洗牌以确保不会发生融合等。但这在流式处理窗口后是不可能的。

args = parse_arguments(sys.argv if argv is None else argv)
pipeline_options = get_pipeline_options(project=args.project_id,
                                        job_name='XX',
                                        num_workers=args.workers,
                                        max_num_workers=MAX_NUM_WORKERS,
                                        disk_size_gb=DISK_SIZE_GB,
                                        local=args.local,
                                        streaming=args.streaming)

pipeline = beam.Pipeline(options=pipeline_options)

# Build pipeline
# pylint: disable=C0330
if args.streaming:
    frames = (pipeline | 'ReadFromPubsub' >> beam.io.ReadFromPubSub(
        subscription=SUBSCRIPTION_PATH,
        with_attributes=True,
        timestamp_attribute='timestamp'
    ))

    frame_tpl = frames | 'CreateFrameTuples' >> beam.Map(
        create_frame_tuples_fn)

crops = frame_tpl | 'MakeCrops' >> beam.Map(make_crops_fn, NR_CROPS)
bboxs = crops | 'bounding boxes tfserv' >> beam.Map(
    pred_bbox_tfserv_fn, SERVER_URL)

sliding_windows = bboxs | 'Window' >> beam.WindowInto(
                  beam.window.SlidingWindows(
                        FEATURE_WINDOWS['goal']['window_size'],
                        FEATURE_WINDOWS['goal']['window_interval']),
                  trigger=AfterCount(30),
                  accumulation_mode=AccumulationMode.DISCARDING)

# GROUPBYKEY (per match)
group_per_match = sliding_windows | 'Group' >> beam.GroupByKey()
_ = group_per_match | 'LogPerMatch' >> beam.Map(lambda x: logging.info(
    "window per match per timewindow: # %s, %s", str(len(x[1])), x[1][0][
        'timestamp']))

sog = sliding_windows | 'Predict SOG' >> beam.Map(predict_sog_fn,
                                                SERVER_URL_INCEPTION,
                                                SERVER_URL_SOG )

pipeline.run().wait_until_finish()

【问题讨论】:

【参考方案1】:

在 beam 中,并行度单位是键——给定键的所有窗口都将在同一台机器上生成。但是,如果您有 50 多个密钥,它们应该分配给所有工作人员。

您提到无法在流媒体中添加 Reshuffle。这应该是可能的;如果您遇到错误,请在 https://issues.apache.org/jira/projects/BEAM/issues 提交错误。重新窗口化到 GlobalWindows 是否会使重新洗牌的问题消失?

【讨论】:

对于改组,错误如下:org.apache.beam.sdk.transforms.windowing.IntervalWindow cannot be cast to org.apache.beam.sdk.transforms.windowing.GlobalWindow跨度> 您好 Robert,我发布了一个替代解决方案,使用 CombineGlobally,也许您可​​以就我的回答提出建议,在这种情况下如何优化跨多个工作人员的负载分配 @robertwb,根据您的回答,我们找到了一个非常肮脏的解决方法来使其正常工作。谢谢!还有一个问题,并行化不基于键AND窗口是否有原因?对于像我们这样的小延迟应用程序来说,不考虑窗口似乎很不合逻辑。我们现在必须在通过窗口之前将窗口添加到键中(幸运的是,在这种情况下,我们知道元素将被放入哪个窗口),然后我们才能让它并行化而不会增加延迟。 对于某些窗口功能(例如会话),在所有键都配置好之前,窗口是未知的。对于那些已知的(例如 FixedWindows)这是可能的,只是它还没有成为任何跑步者进行的足够重要的优化。【参考方案2】:

看起来您不一定需要 GroupByKey,因为您总是在同一个键上分组。相反,您可以使用 CombineGlobally 来附加窗口内的所有元素,而不是 GroupByKey(始终使用相同的键)。

combined = values | beam.CombineGlobally(append_fn).without_defaults()
combined | beam.ParDo(PostProcessFn())

我不确定使用 CombineGlobally 时负载分配是如何工作的,但由于它不处理键值对,我希望另一种机制来进行负载分配。

【讨论】:

以上是关于谷歌数据流流式传输管道在窗口化后不会将工作负载分配给多个工作人员的主要内容,如果未能解决你的问题,请参考以下文章

OpenCV 不会流式传输/更新我的视频。如何更新 imshow 窗口? [复制]

Flink:窗口不会在流结束时处理数据

在 python 中使用 BigQuery 接收器流式传输管道

CDN/地理负载均衡

如何在谷歌云数据流中停止流式传输管道

在谷歌应用引擎中将数据流式传输到 bigquery - java