KafkaConsumer 对于多线程访问 pyspark 是不安全的

Posted

技术标签:

【中文标题】KafkaConsumer 对于多线程访问 pyspark 是不安全的【英文标题】:KafkaConsumer is not safe for multi-threaded access pyspark 【发布时间】:2018-10-06 00:33:27 【问题描述】:

我在 Kafka 中使用 spark 结构化流,但是当我尝试将流写入控制台时出现错误:

Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

这是我的代码:

def group_obs(obs_df):
        obs = obs_df.select(f.col("obs.payload.after").alias("obs"))

        filtered_obs_with_value = obs \
            .union(obs.filter("obs.value_datetime is not null")
                   .withColumn("value", f.col("obs.value_datetime"))
                   .withColumn("value_type", f.lit("datetime")))


        grouped_by_obsgroup = filtered_obs_with_value\
                             .groupBy("obs.obs_group_id", "obs.encounter_id")
                             .agg(f.struct(f.col("obs.obs_group_id"),f.collect_list("tempObs").alias("obs")).alias("obs"))

        query = grouped_by_obsgroup \
                .writeStream \
                .outputMode("update") \
                .format("console") \
                .start()

        query.awaitTermination()

raw_obs = kafka_stream.select(from_json(col("value").cast("string"),mySchema)
transformed_obs = group_obs(raw_obs)

【问题讨论】:

【参考方案1】:

您的代码没有什么特别的问题。

这是SPARK-23636 跟踪的一个已知错误。 SPARK-19185 跟踪的直接DStream 也存在类似问题。

根据JIRA票证:

唯一的解决方法是使用 executor-cores = 1 启动我们的应用程序,并启用动态资源分配。

在您的情况下可能会或可能不会接受。

【讨论】:

@user9714305 感谢您的回答。尝试将 executor cores 设置为 1,但我仍然得到同样的错误。 在尝试将执行程序核心设置为 1 之后,您是否能够提出解决方案? @fali

以上是关于KafkaConsumer 对于多线程访问 pyspark 是不安全的的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Consumer多线程实例续篇

Kafka consumer在项目中的多线程处理方式

Kafka 0.10 KafkaConsumer流程简述

Kafka 0.10 KafkaConsumer流程简述

使用多线程 + 多处理的 Python 日志记录

原创Kafka Consumer 多线程 实例