ReadFromPubSub->CloudStorage->BigQuery:订阅的大小永远不会减少,而且似乎只有 0.002% 的订阅到达 BigQuery
Posted
技术标签:
【中文标题】ReadFromPubSub->CloudStorage->BigQuery:订阅的大小永远不会减少,而且似乎只有 0.002% 的订阅到达 BigQuery【英文标题】:ReadFromPubSub->CloudStorage->BigQuery: Subscription never decreases in size and only 0.002% seemingly arrive in BigQuery 【发布时间】:2020-07-03 16:03:03 【问题描述】:管道正在使用ReadFromPubSub
源来读取指向 Cloud Storage blob 的链接,读取存储在每个文件中的事件,然后将它们插入 BigQuery:
with beam.Pipeline(options=pipeline_options) as pipeline:
dlq = DeadletterQueue(known_args.output_dlq)
pipeline = (
pipeline
| "Read PubSub Messages"
>> beam.io.ReadFromPubSub(
topic=topic,
id_label="messageId",
)
| "Read Records" >> ReadCloudStorageEvents(deadletter_queue=dlq)
| "Parse Events" >> ParseEventRecords(deadletter_queue=dlq)
| "window events" >> WindowOnTimeAndSize(60, 10)
| "Upload To BigQuery" >> BigQuerySink(project, deadletter_queue=dlq)
)
问题在于,即使从 PubSub 以极好的速率消耗项目,并且以同样好的速率从云存储中读取文件,它们也根本没有在 bigquery 附近的任何位置插入到 BigQuery流媒体限制。
数据新鲜度和系统延迟不断攀升:。
这样做的副作用是队列项不会被删除。
BigQuerySink
本质上是这样的:
class BigQuerySink(beam.PTransform):
def __init__(self, project: str, deadletter_queue: beam.PTransform):
self.deadletter_queue = deadletter_queue
self.project = project
def expand(self, pcoll):
def yield_from(events: Iterable[Dict]) -> Iterable[Dict]:
for event in events:
yield event
pcoll = (
pcoll
| "flatten events" >> beam.FlatMap(yield_from)
| f"push events to BigQuery"
>> beam.io.WriteToBigQuery(
table=lambda event: f"self.project:events_v2.event['type']",
schema=lambda table: load_schema(table.split(".")[-1]),
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR,
validate=True,
additional_bq_parameters=
"clustering": "fields": ["accountId"],
"timePartitioning": "type": "DAY", "field": "receivedAt",
,
)
)
# # https://***.com/questions/59102519/monitoring-writetobigquery
pcoll[
beam.io.gcp.bigquery.BigQueryWriteFn.FAILED_ROWS
] | "Map to Error" >> beam.Map(
lambda x: Error(
message=f"BigQuery exception",
record="destination": x[0], "index": x[1],
data=None,
stacktrace=None,
)
| self.deadletter_queue
)
使用 200.000 个元素进行测试,引用总共包含大约 10,000,000 个事件的文件导致只有大约 0.002% 的文件进入 BigQuery。我们离配额很近,我没有看到任何错误或任何东西(有时当字段与架构不匹配时,我会看到一些错误,但我什么也没看到)。
非常欢迎任何有关确定问题所在的见解。是否有地方可以查看项目是否在 BigQuery 端未通过某些验证,因此未在订阅中删除?
似乎是瓶颈的步骤是_StreamToBigQuery/CommitInsertIds/ReshufflePerKey/Map(reify_timestamps)
,如下图所示。接近 1000 MB 进入该步骤,只有 5 MB 退出(如果我正确阅读图表):
更新:WindowOnTimeAndSize
class WindowOnTimeAndSize(beam.PTransform):
"""A composite transform that groups Pub/Sub messages based on publish
time and outputs a list of dictionaries, where each contains one message
and its publish timestamp.
"""
def __init__(self, window_size_seconds, after_count):
# Convert minutes into seconds.
self.window_size = int(window_size_seconds)
self.after_count = after_count
def expand(self, pcoll):
return (
pcoll
# Assigns window info to each Pub/Sub message based on its
# publish timestamp.
| "Window into Fixed Intervals"
>> beam.WindowInto(
beam.window.FixedWindows(self.window_size),
trigger=Repeatedly(
AfterAny(
AfterCount(self.after_count),
AfterProcessingTime(self.window_size),
)
),
accumulation_mode=AccumulationMode.DISCARDING,
)
)
【问题讨论】:
也许值得重新考虑您的解决方案并简化它。您实际上并不需要 Pub/Sub 或 Dataflow。当您不需要时,您正在创建额外的跃点,例如读取 GCS 中文件的内容,然后插入到 BigQuery。相反,您可以让云函数监听文件到达时触发的存储桶。发生这种情况时,Cloud Function 会将文件直接加载到 BigQuery 中。这符合您的要求吗? 一个很好的观点。之所以如此,是因为在创建时间获取事件对于我们的目的来说是不够的。我们需要不时重播数据(迁移、新字段很有趣等),因此使用我们控制的格式通过 PubSub 提供事件允许我们这样做。即使我们这样做了,但问题仍然存在,因为瓶颈似乎是在读取文件之后和 WriteToBiguery 内部(我认为) 但我确实错过了您的 Cloud Function 参数。我想我担心当我们开始将剩下的数据扔到这个主题上时,我们基本上会有一个永远触发的云功能,尤其是在非常预期的重放情况下。根据您的经验,成本权衡是什么? 另一种选择是将您的 Dataflow 管道直接指向 GCS,它会在文件到达后读取文件并将其写入 BigQuery。 我看到的问题是,如果我们想重播这些事件(在 GCS 文件中,如果不通过 PubSub 或其他一些中间层,你将如何触发它你可以用一些脚本来打吗? 【参考方案1】:WriteToBigQuery 进行了一些更改,导致转换非常缓慢。在 Beam 2.24.0(几周后即将推出)中,转换应该能够达到更高的性能(我已经测试了每个 CPU 约 500-600 EPS)。
很抱歉给您带来麻烦。您可以立即使用代码,或者等待几周的 Beam 2.24.0。
【讨论】:
这是个好消息@pablo,您之前在数据集上看到的数字是多少?只是想在这里比较数字。加速百分比是多少? 我看到工人的每股收益非常低(约 100 或更少) 好的,我会监控 Beam 版本的更新 - 谢谢。 我可以知道 2.25.0 版本的价格吗?我有 28 名工人(=48cpus),但只是以 500~800 EPS 的速度流式传输到 BQ【参考方案2】:您最好的资源可能是数据流监控控制台,特别是当viewing the pipeline 时,您可以单击各个步骤来确定导致延迟的步骤。不要忘记,可以通过单击小 V 形符号来展开复合变换,以深入了解有问题的部分。
【讨论】:
是的,抱歉,我应该提到现在似乎出现瓶颈的点是在我几乎无法控制的beam.io.WriteToBigQuery
内部。所以我假设这与我是否在做窗口有关,这就是问题所在。
我用信息和似乎是瓶颈的屏幕截图更新了原始帖子
好像是这个Reshuffle GBK。 WindowOnTimeAndSize
中使用的窗口是什么?
嗨@robertwb,我已经用它更新了原始帖子,但我在 60 秒时开窗,但在吞吐量极高的情况下,我想使用 @987654324 输出每 10 个事件@ 和 AfterAny
。主要是因为最初的 60 秒确实没有什么不同。以上是关于ReadFromPubSub->CloudStorage->BigQuery:订阅的大小永远不会减少,而且似乎只有 0.002% 的订阅到达 BigQuery的主要内容,如果未能解决你的问题,请参考以下文章
Associatively Segmenting Instances and Semantics in Point Clouds
Unity3d Realtime Dynamic Volume Clouds Rendering
论文阅读DGCNN:Dynamic Graph CNN for Learning on Point Clouds
自动驾驶 11-2: 激光雷达传感器模型和点云 LIDAR Sensor Models and Point Clouds