Spark Streaming:Spark Structured Streaming 中不允许使用 Kafka 组 ID
Posted
技术标签:
【中文标题】Spark Streaming:Spark Structured Streaming 中不允许使用 Kafka 组 ID【英文标题】:Spark Streaming: Kafka group id not permitted in Spark Structured Streaming 【发布时间】:2018-02-28 23:07:04 【问题描述】:我正在 PySpark 中编写 Spark 结构化流应用程序以从 Kafka 读取数据。
但是,目前 Spark 的版本是 2.1.0,它不允许我将 group id 设置为参数,并且会为每个查询生成一个唯一的 id。但是 Kafka 连接是基于组的授权,需要预先设置的组 ID。
因此,是否有任何解决方法来建立连接无需将 Spark 更新到 2.2,因为我的团队不想要它。
我的代码:
if __name__ == "__main__":
spark = SparkSession.builder.appName("DNS").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
# Subscribe to 1 topic
lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:9092").option("subscribe", "record").option('kafka.security.protocol',"SASL_PLAINTEXT").load()
print(lines.isStreaming) #print TRUE
lines.selectExpr("CAST(value AS STRING)")
# Split the lines into words
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# Generate running word count
wordCounts = words.groupBy("word").count()
# Start running the query that prints the running counts to the console
query = wordCounts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
【问题讨论】:
我认为你也不能在 Spark 2.2 中设置group.id
- spark.apache.org/docs/latest/…
据此Databricks doc 从Spark 2.2开始,你可以选择设置组id。但是,使用它时要格外小心,因为这可能会导致意外行为。
奇怪!因为根据 Spark 2.2 文档,我们不能。可能是两个文档不匹配。
是的,但无论如何,我不打算更新 Spark
我不确定每个查询的唯一 ID。
【参考方案1】:
KafkaUtils
类将覆盖"group.id"
的参数值。它将从原始组 ID 中连接 "spark-executor-"
。
以下是来自 KafkaUtils 的代码:
// driver and executor should be in different consumer groups
val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
if (null == originalGroupId)
logError(s"$ConsumerConfig.GROUP_ID_CONFIG is null, you should probably set it")
val groupId = "spark-executor-" + originalGroupId
logWarning(s"overriding executor $ConsumerConfig.GROUP_ID_CONFIG to $groupId")
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
我们遇到了同样的问题。 Kafka 基于 ACL 并带有预设组 id,所以唯一的事情就是在 kafka 配置中更改组 id。在我们原来的组 ID 之外,我们输入了"spark-executor-" + originalGroupId
【讨论】:
我使用的是 Spark Structured Streaming(上面的代码),它直接从 kafka 读取流数据而不创建流上下文 @ELI - 你解决了这个问题吗.. 在结构化流中添加组 ID? 结构化流无法添加组ID【参考方案2】:现在可以使用 Spark 3.x 设置 group.id。请参阅Structured Streaming + Kafka Integration Guide,其中写道:
kafka.group.id:从 Kafka 读取时在 Kafka 消费者中使用的 Kafka 组 ID。请谨慎使用。默认情况下,每个查询都会生成一个唯一的组 id 用于读取数据。这确保了每个 Kafka 源都有自己的消费者组,不会受到任何其他消费者的干扰,因此可以读取其订阅主题的所有分区。在某些场景下(例如 Kafka 基于组的授权),您可能希望使用特定的授权组 id 来读取数据。您可以选择设置组 ID。但是,请谨慎执行此操作,因为它可能会导致意外行为。并发运行的查询(批处理和流式处理)或具有相同组 ID 的源可能会相互干扰,导致每个查询仅读取部分数据。这也可能在快速连续启动/重新启动查询时发生。为了尽量减少此类问题,请将 Kafka 消费者会话超时(通过设置选项“kafka.session.timeout.ms”)设置为非常小。设置后,选项“groupIdPrefix”将被忽略。
但是,这个 group.id 仍然不用于将偏移量提交回 Kafka,并且偏移量管理保留在 Spark 的检查点文件中。我在回答中提供了更多详细信息(也适用于 Spark
How to manually set group.id and commit kafka offsets in spark structured streaming? How to use kafka.group.id in spark 3.0【讨论】:
以上是关于Spark Streaming:Spark Structured Streaming 中不允许使用 Kafka 组 ID的主要内容,如果未能解决你的问题,请参考以下文章
删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?
混合 Spark Structured Streaming API 和 DStream 写入 Kafka
关于IDEA开发环境下的Kafka+Spark Streaming的classpath配置方式
如何从 Spark-Structured-Streaming 中的嵌套 XML_String 中提取信息