Spark Streaming:Spark Structured Streaming 中不允许使用 Kafka 组 ID

Posted

技术标签:

【中文标题】Spark Streaming:Spark Structured Streaming 中不允许使用 Kafka 组 ID【英文标题】:Spark Streaming: Kafka group id not permitted in Spark Structured Streaming 【发布时间】:2018-02-28 23:07:04 【问题描述】:

我正在 PySpark 中编写 Spark 结构化流应用程序以从 Kafka 读取数据。

但是,目前 Spark 的版本是 2.1.0,它不允许我将 group id 设置为参数,并且会为每个查询生成一个唯一的 id。但是 Kafka 连接是基于组的授权,需要预先设置的组 ID。

因此,是否有任何解决方法来建立连接无需将 Spark 更新到 2.2,因为我的团队不想要它。

我的代码:

if __name__ == "__main__":
    spark = SparkSession.builder.appName("DNS").getOrCreate()
    sc = spark.sparkContext
    sc.setLogLevel("WARN")

    # Subscribe to 1 topic
    lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:9092").option("subscribe", "record").option('kafka.security.protocol',"SASL_PLAINTEXT").load()
    print(lines.isStreaming) #print TRUE
    lines.selectExpr("CAST(value AS STRING)")
    # Split the lines into words
    words = lines.select(
    explode(
        split(lines.value, " ")
        ).alias("word")
    )
    # Generate running word count
    wordCounts = words.groupBy("word").count()

    # Start running the query that prints the running counts to the console
    query = wordCounts \
        .writeStream \
        .outputMode("complete") \
        .format("console") \
        .start()

    query.awaitTermination()

【问题讨论】:

我认为你也不能在 Spark 2.2 中设置 group.id - spark.apache.org/docs/latest/… 据此Databricks doc 从Spark 2.2开始,你可以选择设置组id。但是,使用它时要格外小心,因为这可能会导致意外行为。 奇怪!因为根据 Spark 2.2 文档,我们不能。可能是两个文档不匹配。 是的,但无论如何,我不打算更新 Spark 我不确定每个查询的唯一 ID。 【参考方案1】:

KafkaUtils 类将覆盖"group.id" 的参数值。它将从原始组 ID 中连接 "spark-executor-"

以下是来自 KafkaUtils 的代码:

// driver and executor should be in different consumer groups
    val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
    if (null == originalGroupId) 
      logError(s"$ConsumerConfig.GROUP_ID_CONFIG is null, you should probably set it")
    
    val groupId = "spark-executor-" + originalGroupId
    logWarning(s"overriding executor $ConsumerConfig.GROUP_ID_CONFIG to $groupId")
    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)

我们遇到了同样的问题。 Kafka 基于 ACL 并带有预设组 id,所以唯一的事情就是在 kafka 配置中更改组 id。在我们原来的组 ID 之外,我们输入了"spark-executor-" + originalGroupId

【讨论】:

我使用的是 Spark Structured Streaming(上面的代码),它直接从 kafka 读取流数据而不创建流上下文 @ELI - 你解决了这个问题吗.. 在结构化流中添加组 ID? 结构化流无法添加组ID【参考方案2】:

现在可以使用 Spark 3.x 设置 group.id。请参阅Structured Streaming + Kafka Integration Guide,其中写道:

kafka.group.id:从 Kafka 读取时在 Kafka 消费者中使用的 Kafka 组 ID。请谨慎使用。默认情况下,每个查询都会生成一个唯一的组 id 用于读取数据。这确保了每个 Kafka 源都有自己的消费者组,不会受到任何其他消费者的干扰,因此可以读取其订阅主题的所有分区。在某些场景下(例如 Kafka 基于组的授权),您可能希望使用特定的授权组 id 来读取数据。您可以选择设置组 ID。但是,请谨慎执行此操作,因为它可能会导致意外行为。并发运行的查询(批处理和流式处理)或具有相同组 ID 的源可能会相互干扰,导致每个查询仅读取部分数据。这也可能在快速连续启动/重新启动查询时发生。为了尽量减少此类问题,请将 Kafka 消费者会话超时(通过设置选项“kafka.session.timeout.ms”)设置为非常小。设置后,选项“groupIdPrefix”将被忽略。

但是,这个 group.id 仍然不用于将偏移量提交回 Kafka,并且偏移量管理保留在 Spark 的检查点文件中。我在回答中提供了更多详细信息(也适用于 Spark

How to manually set group.id and commit kafka offsets in spark structured streaming? How to use kafka.group.id in spark 3.0

【讨论】:

以上是关于Spark Streaming:Spark Structured Streaming 中不允许使用 Kafka 组 ID的主要内容,如果未能解决你的问题,请参考以下文章

删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?

混合 Spark Structured Streaming API 和 DStream 写入 Kafka

关于IDEA开发环境下的Kafka+Spark Streaming的classpath配置方式

如何从 Spark-Structured-Streaming 中的嵌套 XML_String 中提取信息

Spark 系列(十六)—— Spark Streaming 整合 Kafka

.Spark Streaming(上)--实时流计算Spark Streaming原理介