使用 Spark Structured Streaming 从多个 Kafka 主题读取并写入不同接收器的最佳方式是啥?

Posted

技术标签:

【中文标题】使用 Spark Structured Streaming 从多个 Kafka 主题读取并写入不同接收器的最佳方式是啥?【英文标题】:What is the optimal way to read from multiple Kafka topics and write to different sinks using Spark Structured Streaming?使用 Spark Structured Streaming 从多个 Kafka 主题读取并写入不同接收器的最佳方式是什么? 【发布时间】:2020-10-02 15:12:55 【问题描述】:

我正在尝试编写一个 Spark Structured Streaming 作业,该作业从多个 Kafka 主题(可能是 100 个)中读取,并根据主题名称将结果写入 S3 上的不同位置。我已经开发了这个 sn-p 的代码,它当前从多个主题中读取并将结果输出到控制台(基于循环)并且它按预期工作。但是,我想了解性能影响是什么。这是推荐的方法吗?不建议有多个 readStream 和 writeStream 操作吗?如果是,推荐的方法是什么?

my_topics = ["topic_1", "topic_2"]

for i in my_topics:
    df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", bootstrap_servers) \
        .option("subscribePattern", i) \
        .load() \
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    output_df = df \
        .writeStream \
        .format("console") \
        .option("truncate", False) \
        .outputMode("update") \
        .option("checkpointLocation", "s3://<MY_BUCKET>/".format(i)) \
        .start()

【问题讨论】:

为什么要为每个主题设置不同的 checkpointLocation 位置,所有主题都可以使用一个? Kafka Connect 通常是 Kafka -> S3 的更好方法。如果有用的话,我可以提供一个答案。 @Srinivas 如果我需要通过清除检查点位置来重新启动/重置特定主题,最好有单独的检查点位置以避免与其他主题的检查点? @RobinMoffatt 我已经探索了使用 Kafka Connect 的选项,但是,我想使用 Spark Structured Streaming 来扩展接收器的数量。 (Kafka Connect 可以处理正则表达式主题列表,如果您担心的话。) 【参考方案1】:

以下方法的优点。

    通用 多线程,所有线程将单独工作。 易于维护代码并为任何问题提供支持。 如果一个主题失败,对生产中的其他主题没有影响。你只需要专注于失败的那个。 如果您想提取特定主题的所有数据,您只需停止该主题的作业,更新或更改配置并重新启动相同的作业。

注意 - 下面的代码不是完整的通用代码,您可能需要更改或调整下面的代码。

topic="" // Get value from input arguments
sink="" // Get value from input arguments

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", bootstrap_servers) \
        .option("subscribePattern", topic) \
        .load() \
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    output_df = df \
        .writeStream \
        .format("console") \
        .option("truncate", False) \
        .outputMode("update") \
        .option("checkpointLocation", sink) \
        .start()        

以下方法存在问题。

    如果一个主题失败,它将终止整个程序。 线程有限。 难以维护代码、调试和支持任何问题。 如果您想从 kafka 中提取特定主题的所有数据,这是不可能的,因为任何配置更改都将适用于所有主题,因此它的操作成本太高。
my_topics = ["topic_1", "topic_2"]

for i in my_topics:
    df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", bootstrap_servers) \
        .option("subscribePattern", i) \
        .load() \
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    output_df = df \
        .writeStream \
        .format("console") \
        .option("truncate", False) \
        .outputMode("update") \
        .option("checkpointLocation", "s3://<MY_BUCKET>/".format(i)) \
        .start()

【讨论】:

感谢您的回复!如果我试图从许多 Kafka 主题中阅读,是拥有多个 Spark 结构化流式处理作业(每个主题 1 个)还是更少的具有多个主题的作业更好?当我在一个集群上同时运行多个作业时,对 Spark 集群的性能有何影响? @Srinivas,我是否需要为每个 Kafka 主题指定不同的 checkpointLocation 路径以避免在重新提交 spark 作业时读取重复偏移量?这是一种好的做法吗? 最好为每个 kafka 主题指定不同的检查点位置【参考方案2】:

每个驱动程序节点运行多个 # 并发流当然是合理的。

每个 .start() 在 spark 中都会消耗一定的驱动资源。您的限制因素将是驱动节点上的负载及其可用资源。 100 个以高速连续运行的主题需要分布在多个驱动程序节点上[在 Databricks 中,每个集群有一个驱动程序]。正如您所提到的,Spark 的优势在于多个接收器以及用于转换的统一批处理和流式 API。

另一个问题是处理您可能最终对 S3 进行的小型写入和文件一致性。查看 delta.io 以处理对 S3 的一致且可靠的写入。

【讨论】:

因此,如果每个驱动程序节点运行多个并发流作业是合理的,那么似乎需要根据集群的大小找到合适的平衡。我也知道您可以订阅多个主题(并调用一个 .start() 调用:spark.apache.org/docs/latest/…)。这是否会减少负载(即 spark 中的驱动程序资源量)? @Brandon 是的,应该。一个处理多个流的处理和计划与多个处理,每个处理一个。您的里程可能会有所不同。

以上是关于使用 Spark Structured Streaming 从多个 Kafka 主题读取并写入不同接收器的最佳方式是啥?的主要内容,如果未能解决你的问题,请参考以下文章

无法使用Spark Structured Streaming在Parquet文件中写入数据

如何使用Spark Structured Streaming连续监视目录

Spark Streaming:Spark Structured Streaming 中不允许使用 Kafka 组 ID

如何使用 Python 在 Spark Structured Streaming 中查看特定指标

删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?

Spark 2.1 Structured Streaming - 使用 Kakfa 作为 Python 的源 (pyspark)