如何从 Dataflow 批量(有效)发布到 Pub/Sub?

Posted

技术标签:

【中文标题】如何从 Dataflow 批量(有效)发布到 Pub/Sub?【英文标题】:How to publish to Pub/Sub from Dataflow in batch (efficiently)? 【发布时间】:2021-06-23 13:19:24 【问题描述】:

感谢 Dataflow Job 在批处理模式下,我想将消息发布到具有某些属性的 Pub/Sub 主题。

我的数据流管道是用 python 3.8 和 apache-beam 2.27.0 编写的

这里可以使用@Ankur 解决方案:https://***.com/a/55824287/9455637

但我认为共享 Pub/Sub 客户端可能会更高效:https://***.com/a/55833997/9455637

然而发生了错误:

return StockUnpickler.find_class(self, module, name) AttributeError: 无法在

问题:

    共享发布者实施会提高光束管道性能吗? 是否有其他方法可以避免在我的共享发布者客户端上出现酸洗错误?

我的数据流管道:

import apache_beam as beam
from apache_beam.io.gcp import bigquery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

from google.cloud.pubsub_v1 import PublisherClient

import json
import argparse
import re
import logging


class PubsubClient(PublisherClient):
    def __reduce__(self):
        return self.__class__, (self.batch_settings,)


# The DoFn to perform on each element in the input PCollection.
class PublishFn(beam.DoFn):
    def __init__(self):
        from google.cloud import pubsub_v1

        batch_settings = pubsub_v1.types.BatchSettings(
            max_bytes=1024,  # One kilobyte
            max_latency=1,  # One second
        )

        self.publisher = PubsubClient(batch_settings)
        super().__init__()

    def process(self, element, **kwargs):
        future = self.publisher.publish(
            topic=element["topic"],
            data=json.dumps(element["data"]).encode("utf-8"),
            **element["attributes"],
        )

        return future.result()


def run(argv=None, save_main_session=True):
    """Main entry point; defines and runs the pipeline."""
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--source_table_id",
        dest="source_table_id",
        default="",
        help="BigQuery source table <project>.<dataset>.<table> with columns (topic, attributes, data)",
    )
    known_args, pipeline_args = parser.parse_known_args(argv)

    # We use the save_main_session option because one or more DoFn's in this
    # workflow rely on global context (e.g., a module imported at module level).

    pipeline_options = PipelineOptions(pipeline_args)
    # pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
    bq_source_table = known_args.source_table_id
    bq_table_regex = r"^(?P<PROJECT_ID>[a-zA-Z0-9_-]*)[\.|\:](?P<DATASET_ID>[a-zA-Z0-9_]*)\.(?P<TABLE_ID>[a-zA-Z0-9_-]*)$"

    regex_match = re.search(bq_table_regex, bq_source_table)

    if not regex_match:
        raise ValueError(
            f"Bad BigQuery table id : `bq_source_table` please match bq_table_regex"
        )

    table_ref = bigquery.TableReference(
        projectId=regex_match.group("PROJECT_ID"),
        datasetId=regex_match.group("DATASET_ID"),
        tableId=regex_match.group("TABLE_ID"),
    )

    with beam.Pipeline(options=pipeline_options) as p:

        (
            p
            | "ReadFromBqTable" # 
            >> bigquery.ReadFromBigQuery(table=table_ref, use_json_exports=True) # Each row contains : topic / attributes / data
            | "PublishRowsToPubSub" >> beam.ParDo(PublishFn())
        )


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    run()

【问题讨论】:

有什么理由在 ParDo 中使用您自己的 Publisher,而不是来自 Beam 的那个?不建议在 ParDo 中使用它。另外,如果你想在 ParDo 中做,我建议你使用setup 方法。 我想以批处理模式运行此管道。 Beam 的 PubsubIO 仅适用于流媒体。 ParDo 似乎值得推荐:beam.apache.org/documentation/io/developing-io-overview/#sinks 你完全正确,我不知道 Python Batch 中不提供对 PS 的写入,抱歉。不过,它们在 Java 中是可用的(这就是我感到困惑的原因)。鉴于管道看起来不需要任何特定于 Python 的内容,您是否考虑过使用 Java? +1 到 Iñigo 的所有积分。为避免酸洗错误,您可以在 DoFn 类的 setup() 函数中创建客户端。我不认为使用共享客户端会有所帮助(我不知道 pubsub 客户端是否也是线程安全的) 【参考方案1】:

在对此大惊小怪之后,我想我有一个始终如一的答案,并且即使不是世界一流的性能,至少也可以使用:

import logging

import apache_beam as beam
from apache_beam.io.gcp.pubsub import PubsubMessage

from google.cloud.pubsub_v1 import PublisherClient
from google.cloud.pubsub_v1.types import (
    BatchSettings,
    LimitExceededBehavior,
    PublishFlowControl,
    PublisherOptions,
)


class PublishClient(PublisherClient):
    """
    You have to override __reduce__ to make PublisherClient pickleable ? ? ?

    Props to 'Ankur' and 'Benjamin' on SO for figuring this part out; god knows
    I would not have...
    """

    def __reduce__(self):
        return self.__class__, (self.batch_settings, self.publisher_options)


class PubsubWriter(beam.DoFn):
    """
    beam.io.gcp.pubsub does not yet support batch operations, so
    we do this the hard way.  it's not as performant as the native
    pubsubio but it does the job.
    """

    def __init__(self, topic: str):
        self.topic = topic
        self.window = beam.window.GlobalWindow()
        self.count = 0

        batch_settings = BatchSettings(
            max_bytes=1e6,  # 1MB
            # by default it is 10 ms, should be less than timeout used in future.result() to avoid timeout
            max_latency=1,
        )

        publisher_options = PublisherOptions(
            enable_message_ordering=False,
            # better to be slow than to drop messages during a recovery...
            flow_control=PublishFlowControl(limit_exceeded_behavior=LimitExceededBehavior.BLOCK),
        )

        self.publisher = PublishClient(batch_settings, publisher_options)

    def start_bundle(self):
        self.futures = []

    def process(self, element: PubsubMessage, window=beam.DoFn.WindowParam):
        self.window = window
        self.futures.append(
            self.publisher.publish(
                topic=self.topic,
                data=element.data,
                **element.attributes,
            )
        )

    def finish_bundle(self):
        """Iterate over the list of async publish results and block
        until all of them have either succeeded or timed out.  Yield
        a WindowedValue of the success/fail counts."""

        results = []
        self.count = self.count + len(self.futures)
        for fut in self.futures:
            try:
                # future.result() blocks until success or timeout;
                # we've set a max_latency of 60s upstairs in BatchSettings,
                # so we should never spend much time waiting here.
                results.append(fut.result(timeout=60))
            except Exception as ex:
                results.append(ex)

        res_count = "success": 0
        for res in results:
            if isinstance(res, str):
                res_count["success"] += 1
            else:
                # if it's not a string, it's an exception
                msg = str(res)
                if msg not in res_count:
                    res_count[msg] = 1
                else:
                    res_count[msg] += 1

        logging.info(f"Pubsub publish results: res_count")

        yield beam.utils.windowed_value.WindowedValue(
            value=res_count,
            timestamp=0,
            windows=[self.window],
        )

    def teardown(self):
        logging.info(f"Published self.count messages")

诀窍在于,如果您在 process() 方法中调用 future.result(),您将一直阻塞,直到该单条消息成功发布,因此请收集期货列表,然后在捆绑包的末尾确保它们'重新全部发布或明确超时。使用我们的一个内部管道进行的一些快速测试表明,这种方法可以在大约 200 秒内发布 160 万条消息。

【讨论】:

以上是关于如何从 Dataflow 批量(有效)发布到 Pub/Sub?的主要内容,如果未能解决你的问题,请参考以下文章

使用 Apache Beam 的 Dataflow 批量加载的性能问题

Dataflow 何时确认来自 PubSubIO 的批处理项目消息?

Google Dataflow (Apache beam) JdbcIO 批量插入 mysql 数据库

从 Dataflow 中的 GCS 读取时如何获取正在处理的文件名?

使用 Dataflow 和 Java 删除 Firestore 集合

如何防止来自 Cassandra 的 Dataflow 读取并行度降低