随着执行的推进 + 意外的侧输入行为,数据流管道吞吐量急剧下降

Posted

技术标签:

【中文标题】随着执行的推进 + 意外的侧输入行为,数据流管道吞吐量急剧下降【英文标题】:Dataflow pipeline throughput decreases drastically as execution advances + unexpected side input behavior 【发布时间】:2020-11-30 01:11:44 【问题描述】:

我有一个数据流管道处理大约 1Gb 的数据输入,其中两个 dicts 作为 side_inputs。目标是借助这两个 side_input 从主数据集中计算特征。

管道整体结构如下:

    # First side input, ends up as a 2GB dict with 3.5 million keys
    side_inp1 = ( p | 
        "read side_input1" >> beam.io.ReadFromAvro("$PATH/*.avro") | 
        "to list of tuples" >> beam.Map(lambda row: (row["key"], row["value"]))
    )

    # Second side input, ends up as a 1.6GB dict with 4.5 million keys
    side_inp2 = (p | 
        "read side_input2" >> beam.io.ReadFromAvro("$PATH2/*.avro") | 
        "to list of tuples" >> beam.Map(lambda row: (row["key"], row["value"]))
    )
    
    # The main part of the pipeline, reading an avro dataset of 1 million rows -- 20GB
    (p |
     "read inputs" >> beam.io.ReadFromAvro("$MainPath/*.avro") | 
     "main func" >> beam.Map(MyMapper, pvalue.AsDict(side_inp1), pvalue.AsDict(side_inp2))
     )

这是数据流图:

“特征化”步骤展开:

所以 Featurize 是一个函数,它在侧输入中查找 ids,.gets 向量,并且喜欢 180 种不同的向量点积方法来计算一些特征。这是一个完全受 CPU 限制的进程,预计会比管道的其余部分花费更长的时间,但这里的停顿是很奇怪的事情。

我的问题有两个:

    随着数据流管道在流程中进一步移动,数据流管道似乎急剧变慢。我不知道是什么原因以及如何缓解这个问题。可以在下面看到MyMapper 步骤的吞吐量图表,我想知道吞吐量下降(最终从 ~400 行/秒到接近 ~1 行/秒)。

    side_inputs 的行为对我来说也很奇怪。我希望 side_inputs 只读一次,但是当我查看 Job Metrics / Throughput 图表时,我观察到了以下图表。可以看出,管道不断地在 side_inputs 中读取,而我想要的只是两个保存在内存中的 dicts。

其他作业配置

区域:us-central-1a machine_type:m1-ultramem-40(40 个 CPU 内核,960GB RAM) 磁盘类型/大小:ssd/50GB 实验:启用随机服务。 max_num_workers:1 有助于简化计算和指标,并且不会因自动缩放而变化。

额外观察

我经常在 LogViewer 中看到如下日志条目:[INFO] Completed workitem: 4867151000103436312 in 1069.056863785 seconds" 到目前为止,所有已完成的 workItems 都花费了大约 1000-1100 秒,这是另一个令人困惑的原因,为什么吞吐量会下降而处理 workItems 需要与以前相同的时间?并行度是否因某种原因而下降? (可能是一些我无法控制的隐藏线程阈值,例如 harness_threads?)。

在管道的后面部分,查看日志,它看起来执行模式非常顺序(似乎它正在执行 1 个工作项,完成它,转到下一个,这对我来说很奇怪,考虑到有 1TB可用内存和 40 核)

有0个错误甚至警告

【问题讨论】:

这种行为通常意味着磁盘大小/内存的限制。使用 Ultramem,我怀疑内存是个问题。我的建议:尝试查看管道卡住的 FStep,查找 OoM 以及类似于“Memtable full”或“L0 too many files”(类似)的日志。如果你有这么大尺寸的侧输入,也许你需要指定--workerCacheMb=<XXX>doc @Iñigo 感谢您指出文档,这是我将尝试并在这里报告的内容。在 OOM/内存问题方面,作业运行的 4 小时内不存在任何故障。 @Iñigo 你知道 Python SDK 是否存在该标志吗?我在代码中找不到。 确实,它看起来并不存在于 Python 中,抱歉。也许您可以尝试使用更大的磁盘? 50gb 对于工人的体型来说看起来并不多。 我不知道为什么数据流需要更多的磁盘空间,除了缓存 side_inputs,我已经给了它这么多内存,我什至不希望它这样做。无论如何,我也尝试了 100GB 并且没有改变行为 【参考方案1】:

第 1 点中的吞吐量图表很好地表明您的工作性能有所下降。

侧面输入应为in memory;但是,我不太确定只有 1 个 highmem 节点的管道是一种好方法。由于只有一个节点,管道可能存在难以识别的瓶颈,例如网络或操作系统限制(如在操作系统中打开的max number of files 与加载到内存中的文件相关)。由于梁的架构,我认为即使启用了autoscaling,您也可以拥有更多节点不是问题,因为我们发现自动缩放会自动选择运行您的工作所需的适当数量的工作程序实例。如果您出于其他原因担心计算和指标,请分享。

关于第 2 点,我认为预计会在图表上找到活动,因为侧面输入(在内存中)是正在处理的 read by each element。但是,如果这对您没有意义,您可以随时添加完整的作业图,以便我们了解管道步骤的任何其他详细信息。

我的建议是添加更多工作人员来分配工作负载,因为 PCollection 是一个分布式数据集,将分布在可用节点之间。您可以尝试拥有更多节点的类似计算资源,例如 4 个实例 n2d-highmem-16 (16vCPU 128GB)。通过这种变化,任何瓶颈都可能消失或得到缓解;另外,你可以用同样的方式监控新的工作:

请记住在您的管道中使用check errors,这样您就可以识别正在发生/导致性能问题的任何其他问题。

在数据流 UI 中检查 CPU and Memory usage。如果在作业级别发生内存错误,Stackdriver 应将它们显示为 memory errors,但还应检查 memory in the host instance 以确保它没有因其他原因达到操作系统中的限制。

您可能希望检查 this example 并将侧面输入作为字典。我不是专家,但您可以按照示例中的最佳做法进行操作。

更新

如果机器 n2d-highmem-16 有 OOM,在我看来,每个线束线程都可能使用字典的副本。不太确定配置线程数是否有帮助,但您可以尝试在管道选项中设置number_of_worker_harness_threads

另一方面,你能扩展步骤Featurize吗? wall time 在此步骤中非常高(约 6 天),让我们检查吸收这种延迟的 composite transforms。对于有问题的复合变换,让我们知道代码 sn-p。要确定可能出现问题的复合转换,请参阅Side Inputs Metrics,尤其是Time spent writingTime spent reading

【讨论】:

感谢您的回答!所以我选择 ultramem 机器的原因是,当我使用较小的机器运行时,我遇到了 OOM 故障。我想这与数据流如何选择线束线程本身的数量(==CPU 核心)并且不允许对其进行配置有关!在我看来,这导致了我们的 OOM,因为我们在每个线程中都有约 4Gb 的恒定 side_input 数据,因此实际任务的 RAM 并不多。如果我假设每个harness_thread 复制side_inputs 是错误的并且实际上每个worker 只读取一次,那么这是不正确的 当您读取边输入时,整个 PCollection 对每个工作人员都可用,因此更多工作人员不会分配负载。有一些技术可以实现这种规模,例如缓存和使用 KV 存储来进行大型地图侧输入,但这些还没有为 Python 实现。您可以自己在 DoFn 实例上缓存它,可能。 @KennKnowles 谢谢,但我不完全理解你的意思。所以我将工作人员的数量限制为 1,以便在此处删除多个工作人员的概念。现在只有 1 个工人,我们只有线程。如果整个side_input PCollection 只计算一次,然后可供所有线程使用,我的问题是它是一个共享内存方案,其中所有“n”线程共享side_input,或者side_input 被复制每个线程(整体 side_input 内存 = 线程数 * 大小(side_input))?同样在多工人设置中,side_inputs 是如何提供给其他工人的? 我会用我找到的更多信息更新我的答案。 @rsantiago 谢谢,我添加了Featurize 步骤的扩展形式,如您所见,这没什么,只是从一行数据中计算出190 个特征的def。没有分组,没有洗牌,什么都没有。只是基于 CPU 的计算和向量乘法。我的猜测是,因为每个线程都创建了自己的 side_inputs 副本,而且它们非常大,随着创建更多包的可用内存量变为零,然后它开始从内存中弹出 side_inputs,这反过来又导致重新读取和这极大地限制了管道,我很想知道 wyt。

以上是关于随着执行的推进 + 意外的侧输入行为,数据流管道吞吐量急剧下降的主要内容,如果未能解决你的问题,请参考以下文章

Azure 数据工厂:执行管道活动无法引用调用管道,需要循环行为

构建管道中的 NPM“解析附近时 JSON 输入意外结束”

如何避免回显关闭 FIFO 命名管道? - Unix FIFO 的有趣行为

管道控制Telnet

NSFetchRequest 的意外行为

CouchDB意外减少/重新减少行为