如何从 Dataflow 批量(有效)发布到 Pub/Sub?
Posted
技术标签:
【中文标题】如何从 Dataflow 批量(有效)发布到 Pub/Sub?【英文标题】:How to publish to Pub/Sub from Dataflow in batch (efficiently)? 【发布时间】:2021-06-23 13:19:24 【问题描述】:感谢 Dataflow Job 在批处理模式下,我想将消息发布到具有某些属性的 Pub/Sub 主题。
我的数据流管道是用 python 3.8 和 apache-beam 2.27.0 编写的
这里可以使用@Ankur 解决方案:https://***.com/a/55824287/9455637
但我认为共享 Pub/Sub 客户端可能会更高效:https://***.com/a/55833997/9455637
然而发生了错误:
return StockUnpickler.find_class(self, module, name) AttributeError: 无法在
问题:
-
共享发布者实施会提高光束管道性能吗?
是否有其他方法可以避免在我的共享发布者客户端上出现酸洗错误?
我的数据流管道:
import apache_beam as beam
from apache_beam.io.gcp import bigquery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from google.cloud.pubsub_v1 import PublisherClient
import json
import argparse
import re
import logging
class PubsubClient(PublisherClient):
def __reduce__(self):
return self.__class__, (self.batch_settings,)
# The DoFn to perform on each element in the input PCollection.
class PublishFn(beam.DoFn):
def __init__(self):
from google.cloud import pubsub_v1
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024, # One kilobyte
max_latency=1, # One second
)
self.publisher = PubsubClient(batch_settings)
super().__init__()
def process(self, element, **kwargs):
future = self.publisher.publish(
topic=element["topic"],
data=json.dumps(element["data"]).encode("utf-8"),
**element["attributes"],
)
return future.result()
def run(argv=None, save_main_session=True):
"""Main entry point; defines and runs the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
"--source_table_id",
dest="source_table_id",
default="",
help="BigQuery source table <project>.<dataset>.<table> with columns (topic, attributes, data)",
)
known_args, pipeline_args = parser.parse_known_args(argv)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
# pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
bq_source_table = known_args.source_table_id
bq_table_regex = r"^(?P<PROJECT_ID>[a-zA-Z0-9_-]*)[\.|\:](?P<DATASET_ID>[a-zA-Z0-9_]*)\.(?P<TABLE_ID>[a-zA-Z0-9_-]*)$"
regex_match = re.search(bq_table_regex, bq_source_table)
if not regex_match:
raise ValueError(
f"Bad BigQuery table id : `bq_source_table` please match bq_table_regex"
)
table_ref = bigquery.TableReference(
projectId=regex_match.group("PROJECT_ID"),
datasetId=regex_match.group("DATASET_ID"),
tableId=regex_match.group("TABLE_ID"),
)
with beam.Pipeline(options=pipeline_options) as p:
(
p
| "ReadFromBqTable" #
>> bigquery.ReadFromBigQuery(table=table_ref, use_json_exports=True) # Each row contains : topic / attributes / data
| "PublishRowsToPubSub" >> beam.ParDo(PublishFn())
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
run()
【问题讨论】:
有什么理由在 ParDo 中使用您自己的 Publisher,而不是来自 Beam 的那个?不建议在 ParDo 中使用它。另外,如果你想在 ParDo 中做,我建议你使用setup
方法。
我想以批处理模式运行此管道。 Beam 的 PubsubIO 仅适用于流媒体。
ParDo 似乎值得推荐:beam.apache.org/documentation/io/developing-io-overview/#sinks
你完全正确,我不知道 Python Batch 中不提供对 PS 的写入,抱歉。不过,它们在 Java 中是可用的(这就是我感到困惑的原因)。鉴于管道看起来不需要任何特定于 Python 的内容,您是否考虑过使用 Java?
+1 到 Iñigo 的所有积分。为避免酸洗错误,您可以在 DoFn 类的 setup() 函数中创建客户端。我不认为使用共享客户端会有所帮助(我不知道 pubsub 客户端是否也是线程安全的)
【参考方案1】:
在对此大惊小怪之后,我想我有一个始终如一的答案,并且即使不是世界一流的性能,至少也可以使用:
import logging
import apache_beam as beam
from apache_beam.io.gcp.pubsub import PubsubMessage
from google.cloud.pubsub_v1 import PublisherClient
from google.cloud.pubsub_v1.types import (
BatchSettings,
LimitExceededBehavior,
PublishFlowControl,
PublisherOptions,
)
class PublishClient(PublisherClient):
"""
You have to override __reduce__ to make PublisherClient pickleable ? ? ?
Props to 'Ankur' and 'Benjamin' on SO for figuring this part out; god knows
I would not have...
"""
def __reduce__(self):
return self.__class__, (self.batch_settings, self.publisher_options)
class PubsubWriter(beam.DoFn):
"""
beam.io.gcp.pubsub does not yet support batch operations, so
we do this the hard way. it's not as performant as the native
pubsubio but it does the job.
"""
def __init__(self, topic: str):
self.topic = topic
self.window = beam.window.GlobalWindow()
self.count = 0
batch_settings = BatchSettings(
max_bytes=1e6, # 1MB
# by default it is 10 ms, should be less than timeout used in future.result() to avoid timeout
max_latency=1,
)
publisher_options = PublisherOptions(
enable_message_ordering=False,
# better to be slow than to drop messages during a recovery...
flow_control=PublishFlowControl(limit_exceeded_behavior=LimitExceededBehavior.BLOCK),
)
self.publisher = PublishClient(batch_settings, publisher_options)
def start_bundle(self):
self.futures = []
def process(self, element: PubsubMessage, window=beam.DoFn.WindowParam):
self.window = window
self.futures.append(
self.publisher.publish(
topic=self.topic,
data=element.data,
**element.attributes,
)
)
def finish_bundle(self):
"""Iterate over the list of async publish results and block
until all of them have either succeeded or timed out. Yield
a WindowedValue of the success/fail counts."""
results = []
self.count = self.count + len(self.futures)
for fut in self.futures:
try:
# future.result() blocks until success or timeout;
# we've set a max_latency of 60s upstairs in BatchSettings,
# so we should never spend much time waiting here.
results.append(fut.result(timeout=60))
except Exception as ex:
results.append(ex)
res_count = "success": 0
for res in results:
if isinstance(res, str):
res_count["success"] += 1
else:
# if it's not a string, it's an exception
msg = str(res)
if msg not in res_count:
res_count[msg] = 1
else:
res_count[msg] += 1
logging.info(f"Pubsub publish results: res_count")
yield beam.utils.windowed_value.WindowedValue(
value=res_count,
timestamp=0,
windows=[self.window],
)
def teardown(self):
logging.info(f"Published self.count messages")
诀窍在于,如果您在 process()
方法中调用 future.result()
,您将一直阻塞,直到该单条消息成功发布,因此请收集期货列表,然后在捆绑包的末尾确保它们'重新全部发布或明确超时。使用我们的一个内部管道进行的一些快速测试表明,这种方法可以在大约 200 秒内发布 160 万条消息。
【讨论】:
以上是关于如何从 Dataflow 批量(有效)发布到 Pub/Sub?的主要内容,如果未能解决你的问题,请参考以下文章
使用 Apache Beam 的 Dataflow 批量加载的性能问题
Dataflow 何时确认来自 PubSubIO 的批处理项目消息?
Google Dataflow (Apache beam) JdbcIO 批量插入 mysql 数据库
从 Dataflow 中的 GCS 读取时如何获取正在处理的文件名?