使用 pyspark 将数据帧写入 Kafka 时出现异常

Posted

技术标签:

【中文标题】使用 pyspark 将数据帧写入 Kafka 时出现异常【英文标题】:Exception while wrting dataframe to Kafka using pyspark 【发布时间】:2021-06-21 15:22:25 【问题描述】:

我正在尝试创建一个数据帧 new_df 并使用 pyspark 将数据帧加载到 Kafka。但是,我几乎没有例外。无法弄清楚究竟是什么问题。任何帮助将不胜感激。

>>> dict = ['name': 'Alice', 'age': 1,'name': 'Again', 'age': 2]
>>> df = spark.createDataFrame(dict)

>>> import time
>>> import datetime
>>> from pyspark.streaming.kafka import KafkaUtils
>>> timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')
>>> type(timestamp)
<class 'str'>

>>> from pyspark.sql.functions import lit,unix_timestamp
>>> timestamp
'2017-08-02 16:16:14'
>>> new_df = df.withColumn('time',unix_timestamp(lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))
>>> new_df.show(truncate = False)
+---+-----+---------------------+
|age|name |time                 |
+---+-----+---------------------+
|1  |Alice|2017-08-02 16:16:14.0|
|2  |Again|2017-08-02 16:16:14.0|
+---+-----+---------------------+

现在我正在尝试将数据帧写入 Kafka 主题

def writeToKafka(outputDF):
    outputDF.selectExpr("CAST(time AS STRING) AS key", "to_json(struct(*)) AS value") \
                .write \
                .format("kafka") \
                .option("kafka.bootstrap.servers", "kafka-svc:9092") \
                .option("topic", "test_topic") \
                .save()



writeToKafka(new_df)

异常(从错误中挑选):

org.apache.spark.SparkException: Job aborted due to stage failure: 
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers

完全错误:

Py4JJavaError:调用 o1811.save 时出错。 : org.apache.spark.SparkException:作业因阶段失败而中止: 阶段 76.0 中的任务 8 失败 1 次,最近一次失败:丢失任务 8.0 in stage 76.0 (TID 110, localhost, executor driver): org.apache.kafka.common.KafkaException: Failed to construction kafka 制片人 org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:432) 在 org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:270) 在 org.apache.spark.sql.kafka010.CachedKafkaProducer$.org$apache$spark$sql$kafka010$CachedKafkaProducer$$createKafkaProducer(CachedKafkaProducer.scala:67) 在 org.apache.spark.sql.kafka010.CachedKafkaProducer$$anon$1.load(CachedKafkaProducer.scala:46) 在 org.apache.spark.sql.kafka010.CachedKafkaProducer$$anon$1.load(CachedKafkaProducer.scala:43) 在 org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) 在 org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) 在 org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) 在 org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) 在 org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000) 在 org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004) 在 org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) 在 org.apache.spark.sql.kafka010.CachedKafkaProducer$.getOrCreate(CachedKafkaProducer.scala:80) 在 org.apache.spark.sql.kafka010.KafkaWriteTask.execute(KafkaWriteTask.scala:44) 在 org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$1.apply$mcV$sp(KafkaWriter.scala:89) 在 org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$1.apply(KafkaWriter.scala:89) 在 org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$1.apply(KafkaWriter.scala:89) 在 org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) 在 org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1.apply(KafkaWriter.scala:89) 在 org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1.apply(KafkaWriter.scala:87) 在 org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980) 在 org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980) 在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) 在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 在 org.apache.spark.scheduler.Task.run(Task.scala:123) 在 org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) 在 org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748) 引起: org.apache.kafka.common.config.ConfigException:无法解决 bootstrap.servers 中给出的引导 URL org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88) 在 org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47) 在 org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:407) ... 31 更多

驱动程序堆栈跟踪:在 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878) 在 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 在 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 在 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927) 在 scala.Option.foreach(Option.scala:257) 在 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050) 在 org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) 在 org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2126) 在 org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:980) 在 org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:978) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:385) 在 org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:978) 在 org.apache.spark.sql.kafka010.KafkaWriter$.write(KafkaWriter.scala:87) 在 org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:254) 在 org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) 在 org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) 在 org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) 在 org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) 在 org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) 在 org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) 在 org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 在 org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) 在 org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) 在 org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83) 在 org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81) 在 org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) 在 org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) 在 org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80) 在 org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127) 在 org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75) 在 org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676) 在 org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285) 在 org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:268) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:498) 在 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 在 py4j.Gateway.invoke(Gateway.java:282) 在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在 py4j.GatewayConnection.run(GatewayConnection.java:238) 在 java.lang.Thread.run(Thread.java:748) 原因: org.apache.kafka.common.KafkaException: 无法构造 kafka 制片人 org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:432) 在 org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:270) 在 org.apache.spark.sql.kafka010.CachedKafkaProducer$.org$apache$spark$sql$kafka010$CachedKafkaProducer$$createKafkaProducer(CachedKafkaProducer.scala:67) 在 org.apache.spark.sql.kafka010.CachedKafkaProducer$$anon$1.load(CachedKafkaProducer.scala:46) 在 org.apache.spark.sql.kafka010.CachedKafkaProducer$$anon$1.load(CachedKafkaProducer.scala:43) 在 org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) 在 org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) 在 org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) 在 org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257) 在 org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000) 在 org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004) 在 org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) 在 org.apache.spark.sql.kafka010.CachedKafkaProducer$.getOrCreate(CachedKafkaProducer.scala:80) 在 org.apache.spark.sql.kafka010.KafkaWriteTask.execute(KafkaWriteTask.scala:44) 在 org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$1.apply$mcV$sp(KafkaWriter.scala:89) 在 org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$1.apply(KafkaWriter.scala:89) 在 org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$1.apply(KafkaWriter.scala:89) 在 org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) 在 org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1.apply(KafkaWriter.scala:89) 在 org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1.apply(KafkaWriter.scala:87) 在 org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980) 在 org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980) 在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) 在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 在 org.apache.spark.scheduler.Task.run(Task.scala:123) 在 org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) 在 org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 更多原因:org.apache.kafka.common.config.ConfigException: bootstrap.servers 中没有给出可解析的引导 URL org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88) 在 org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47) 在 org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:407) ... 31 更多

注意:

我有 3 个 kafka 代理,3 个 kafka zookeeper,它们托管在 Kubernetes 集群上。

【问题讨论】:

你看过这个答案了吗:***.com/questions/47969955/… No resolvable bootstrap urls given in bootstrap.servers... 我们无法解决您的网络问题,但您是否尝试过使用相同地址的任何其他 Kafka 客户端? 【参考方案1】:

新代码:

def writeToKafka(outputDF):
    outputDF.selectExpr("CAST(time AS STRING) AS key", "to_json(struct(*)) AS value") \
                .write \
                .format("kafka") \
                .option("kafka.bootstrap.servers", "kafka-svc.test_namespace:9092") \
                .option("topic", "test_topic") \
                .save()

kafka 代理位于 Kubernetes 集群中的另一个命名空间中。而且,我的 jupyter notebook 位于另一个命名空间中。

一旦我尝试对 kafka.bootstrap.servers 使用“kafka_service.namespace:portno”(即kafka-svc.test_namespace:9092),它就起作用了

kafka-svc - is the kafka service name.
test_namespace - is the name of the namespace where kafka brokers are hosted

【讨论】:

以上是关于使用 pyspark 将数据帧写入 Kafka 时出现异常的主要内容,如果未能解决你的问题,请参考以下文章

使用 PySpark JDBC 将数据帧写入 Azure SQL 数据库时性能下降

使用 PySpark 写入数据帧时出错

kafka 到 pyspark 结构化流,将 json 解析为数据帧

使用 pyspark 将数据从 kafka 写入 hive - 卡住

如何在使用转义或引号在pyspark中的文件中写入数据帧时获得完全匹配? [复制]

如何在 PySpark 中使用 foreach 或 foreachBatch 写入数据库?