pyspark结构化流kafka - py4j.protocol.Py4JJavaError:调用o41.save时发生错误

Posted

技术标签:

【中文标题】pyspark结构化流kafka - py4j.protocol.Py4JJavaError:调用o41.save时发生错误【英文标题】:pyspark structured streaming kafka - py4j.protocol.Py4JJavaError: An error occurred while calling o41.save 【发布时间】:2022-01-13 00:45:01 【问题描述】:

我有一个简单的 PySpark 程序,可以将数据发布到 kafka 中。 当我进行 spark-submit 时,它给出了错误

正在运行的命令:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.13:3.2.0 ~/PycharmProjects/Kafka/PySpark_Kafka_SSL.py

错误:

Traceback (most recent call last):
  File "/Users/karanalang/PycharmProjects/Kafka/PySpark_Kafka_SSL.py", line 33, in <module>
    df.write.format('kafka')\
  File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 738, in save
  File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/py4j-0.10.9.2-src.zip/py4j/java_gateway.py", line 1309, in __call__
  File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
  File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/py4j-0.10.9.2-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o41.save.
: java.lang.NoClassDefFoundError: scala/$less$colon$less
    at org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:180)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: scala.$less$colon$less
    at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
    ... 42 more

Spark 版本 - 3.2.0; 我已经在我的 m/c 上安装了 confluent kafka,这是版本:

Karans-MacBook-Pro:confluent-6.2.1 karanalang$ confluent local services kafka version
The local commands are intended for a single-node development environment only,
NOT for production usage. https://docs.confluent.io/current/cli/index.html

6.2.1-ce

代码如下:

import sys, datetime, time, os
from pyspark.sql.functions import col, rank, dense_rank, to_date, to_timestamp, format_number, row_number, lead, lag,monotonically_increasing_id
from pyspark.sql import SparkSession, Window


spark = SparkSession.builder.appName('StructuredStreaming_KafkaProducer').getOrCreate()

kafkaBrokers='host:port'
# CA Root certificate ca.crt
caRootLocation='/Users/karanalang/Documents/Technology/strimzi/gcp-certs-dec3/caroot.pem'
# user public (user.crt)
certLocation='/Users/karanalang/Documents/Technology/strimzi/gcp-certs-dec3/my-bridge/my-bridge-user-crt.pem'
# user.key
keyLocation='/Users/karanalang/Documents/Technology/strimzi/gcp-certs-dec3/my-bridge/user-with-certs.pem'
password='passwd'
topic = "my-topic"

df = spark.read.csv("data/input.txt", header=False)

df.write.format('kafka')\
    .option("kafka.bootstrap.servers",kafkaBrokers)\
    .option("security.protocol","SSL")\
    .option("ssl.ca.location",caRootLocation)\
    .option("ssl.certificate.location", certLocation)\
    .option("ssl.key.location",keyLocation)\
    .option("ssl.key.password",password)\
    .option("subscribe", topic) \
    .save()

任何想法是什么问题? Spark 版本似乎与 jar 匹配 蒂亚!

【问题讨论】:

【参考方案1】:

错误:

原因:java.lang.ClassNotFoundException: scala.$less$colon$less

通常在 Scala 版本出现问题时弹出。

如果你运行spark-shell,你会得到输出:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.0
      /_/
         
Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 1.8.0_292)

上面写着:Using Scala version 2.12.15

它还提到:“对于 Scala API,Spark 3.2.0 使用 Scala 2.12。您需要使用兼容的 Scala 版本 (2.12.x)”,docs。

但是当我们查看 Maven 存储库中的 spark-sql-kafka-0-10_2.13:3.2.0 时:Kafka 0.10+ Source For Structured Streaming » 3.2.0 它说:Scala 目标:Scala 2.13

我会尝试在spark-sql-kafka 中指定 Scala 版本,您可以通过“查看所有目标”找到所需的 Scala 版本。

试试:Kafka 0.10+ Source For Structured Streaming » 3.2.0:

注意变化: spark-sql-kafka-0-10_2.13:3.2.0 -> spark-sql-kafka-0-10_2.12:3.2.0

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 ~/PycharmProjects/Kafka/PySpark_Kafka_SSL.py

【讨论】:

感谢您的输入,这很有意义并帮助我解决了问题。但是,想知道为什么 maven 存储库显示 scala 2.13 和 2.12 支持 Spark 3.2.0 请参考 - mvnrepository.com/artifact/org.apache.spark/…,任何对此有何想法? @KaranAlang 不客气,我很高兴能帮上忙!老实说,我不确定,我必须检查一下,也许他们预计 Scala 版本会出现问题。另外,如果这回答了您的问题,请将其标记为已回答。

以上是关于pyspark结构化流kafka - py4j.protocol.Py4JJavaError:调用o41.save时发生错误的主要内容,如果未能解决你的问题,请参考以下文章

如何在 pyspark 结构化流中使用 maxOffsetsPerTrigger?

我可以使用spark 2.3.0和pyspark从Kafka进行流处理吗?

pyspark结构化流kafka - py4j.protocol.Py4JJavaError:调用o41.save时发生错误

Spark 结构化流:Scala 中的模式推理

使 Spark 结构化流中的 JSON 可以在 python (pyspark) 中作为没有 RDD 的数据帧访问

我可以使用 spark 2.3.0 和 pyspark 从 Kafka 进行流处理吗?