KafkaConsumer 对于多线程访问 pyspark 是不安全的
Posted
技术标签:
【中文标题】KafkaConsumer 对于多线程访问 pyspark 是不安全的【英文标题】:KafkaConsumer is not safe for multi-threaded access pyspark 【发布时间】:2018-10-06 00:33:27 【问题描述】:我在 Kafka 中使用 spark 结构化流,但是当我尝试将流写入控制台时出现错误:
Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
这是我的代码:
def group_obs(obs_df):
obs = obs_df.select(f.col("obs.payload.after").alias("obs"))
filtered_obs_with_value = obs \
.union(obs.filter("obs.value_datetime is not null")
.withColumn("value", f.col("obs.value_datetime"))
.withColumn("value_type", f.lit("datetime")))
grouped_by_obsgroup = filtered_obs_with_value\
.groupBy("obs.obs_group_id", "obs.encounter_id")
.agg(f.struct(f.col("obs.obs_group_id"),f.collect_list("tempObs").alias("obs")).alias("obs"))
query = grouped_by_obsgroup \
.writeStream \
.outputMode("update") \
.format("console") \
.start()
query.awaitTermination()
raw_obs = kafka_stream.select(from_json(col("value").cast("string"),mySchema)
transformed_obs = group_obs(raw_obs)
【问题讨论】:
【参考方案1】:您的代码没有什么特别的问题。
这是SPARK-23636 跟踪的一个已知错误。 SPARK-19185 跟踪的直接DStream
也存在类似问题。
根据JIRA票证:
唯一的解决方法是使用 executor-cores = 1 启动我们的应用程序,并启用动态资源分配。
在您的情况下可能会或可能不会接受。
【讨论】:
@user9714305 感谢您的回答。尝试将 executor cores 设置为 1,但我仍然得到同样的错误。 在尝试将执行程序核心设置为 1 之后,您是否能够提出解决方案? @fali以上是关于KafkaConsumer 对于多线程访问 pyspark 是不安全的的主要内容,如果未能解决你的问题,请参考以下文章