如何摆脱 NoSuchMethodError:Spark Streaming + Kafka 中的 org.apache.kafka.clients.consumer.KafkaConsumer.su
Posted
技术标签:
【中文标题】如何摆脱 NoSuchMethodError:Spark Streaming + Kafka 中的 org.apache.kafka.clients.consumer.KafkaConsumer.subscribe 错误【英文标题】:How to get rid of NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe error in Spark Streaming + Kafka 【发布时间】:2018-07-20 09:19:30 【问题描述】:我想使用 Spark Streaming 并将其与 Kafka 连接。但是我仍然得到 NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe 错误,现在我只是不知道下一步该做什么。
我的设置:
Ubuntu 16.04
Scala 2.11
Kafka 2.11-1.0.0(我也试过用2.11-0.10.0.0)
火花 2.2.1
Hadoop 2.9.0
我什至无法运行示例脚本:
from pyspark.sql import SparkSession, Row, SQLContext
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.types import *
sparkSession = SparkSession.builder.appName("example-pyspark-read-and-write").getOrCreate()
# Subscribe to 1 topic
df = sparkSession\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("kafka.partition.assignment.strategy", "range") \
.option("subscribe", "test")\
.option("startingOffsets", "earliest")\
.load()
query = df.writeStream\
.format("console")\
.start()
query.awaitTermination()
我在 Spark 上使用
spark-submit --master local[2] --jars /home/some_path/spark-sql-kafka-0-10_2.11-2.2.1.jar spark_streaming_kafka_example.py
我得到一个错误
Exception in thread "stream execution thread for [id = 38ee73d5-4f20-41d0-ac89-a29c3f3255d1, runId = dadfc8ab-8e4c-464f-b4ef-495426aafc88]" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:63)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:297)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:88)
at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:244)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:158)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:155)
at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:155)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:153)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan$lzycompute(StreamExecution.scala:153)
at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan(StreamExecution.scala:147)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:276)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
Traceback (most recent call last):
File "/home/some_path/spark_streaming_kafka_example.py", line 41, in <module>
query.awaitTermination()
File "/usr/local/spark/spark-2.2.1-bin-without-hadoop/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 106, in awaitTermination
File "/usr/local/spark/spark-2.2.1-bin-without-hadoop/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/usr/local/spark/spark-2.2.1-bin-without-hadoop/python/lib/pyspark.zip/pyspark/sql/utils.py", line 75, in deco
pyspark.sql.utils.StreamingQueryException: u'org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V\n=== Streaming Query ===\nIdentifier: [id = 38ee73d5-4f20-41d0-ac89-a29c3f3255d1, runId = dadfc8ab-8e4c-464f-b4ef-495426aafc88]\nCurrent Committed Offsets: \nCurrent Available Offsets: \n\nCurrent State: INITIALIZING\nThread State: RUNNABLE'
我还尝试在 --jars 中包含 spark-streaming-kafka-0-10-assembly_2.11-2.2.1 (而不是 spark-sql-kafka-0-10_2.11-2.2.1.jar ),但它也没有帮助。
我也尝试将这两行放在 spark-defaults.conf 中
spark.driver.extraClassPath /home/some_path/spark-streaming-kafka-0-10-assembly_2.11-2.2.1.jar
spark.executor.extraClassPath /home/some_path/spark-streaming-kafka-0-10-assembly_2.11-2.2.1.jar
或
spark.driver.extraClassPath /home/some_path/spark-sql-kafka-0-10_2.11-2.2.1.jar
spark.executor.extraClassPath /home/some_path/spark-sql-kafka-0-10_2.11-2.2.1.jar
【问题讨论】:
【参考方案1】:为了将 Spark Structured Streaming 与 Kafka 一起使用,您还需要在 spark.driver.extraClassPath
和 spark.executor.extraClassPath
中添加以下 JAR:
/home/some_path/kafka-clients-0.10.2.1.jar
由于spark-sql-kafka-0-10_2.11-2.2.1.jar
不包含KafkaConsumer
,我们也需要添加上述JAR文件。所以,最终的spark-submit
命令将是这样的:
spark-submit --master local[2] --jars /home/some_path/spark-sql-kafka-0-10_2.11-2.2.1.jar:/home/some_path/kafka-clients-0.10。 2.1.jar spark_streaming_kafka_example.py
此外,由于您使用的是 Spark Structured Streaming 而不是 Spark Streaming,因此您不需要包含任何 spark-streaming
JAR。
希望对你有帮助!
【讨论】:
感谢您的建议。我仔细检查了我已经统一了 kafka 和 sql-streaming 等的所有版本,并将kafka-clients-0.10.2.1.jar
添加到 spark-submit 和 spark.driver.extraClassPath
和 spark.executor.extraClassPath
中,但这并没有多大帮助。我仍然无法构建 kafka 消费者:Traceback (most recent call last): File "/home/some_path/read_kafka_stream_structured.py", line 45, in <module> query.awaitTermination() ... pyspark.sql.utils.StreamingQueryException: u'Failed to construct kafka consumer\n=== Streaming Query ===\ ...
另外,我添加了 kafka-streams-0.10.0.0.jar (kafka-clients jar 是同一版本:kafka-clients-0.10.0.0.jar,我无法再编辑我的上面评论)
你不应该需要kafka-clients
作为核心Kafka已经是一个传递依赖spark-sql-kafka
mvnrepository.com/artifact/org.apache.spark/…
不确定,但我敢打赌这个提交修复了它。 github.com/apache/spark/commit/…
我仍然面临同样的问题......我正在使用: - kafka 2.4.0 - spark 2.4.5【参考方案2】:
尝试在您的 spark2-submit
命令之前运行 export SPARK_KAFKA_VERSION=0.10
。
[Source for solution]
【讨论】:
这个答案应该更像以上是关于如何摆脱 NoSuchMethodError:Spark Streaming + Kafka 中的 org.apache.kafka.clients.consumer.KafkaConsumer.su的主要内容,如果未能解决你的问题,请参考以下文章
在运行 Spring MVC 应用程序时在 Spring Boot 中获取 NoSuchMethodError: javax.servlet.ServletContext.addServlet
spring Boot启动报错Initialization of bean failed; nested exception is java.lang.NoSuchMethodError: org.s
当AT89S51单片机运行出错或程序陷入死循环时,如何摆脱困境?
如何修复 Time4J 库中的 NoSuchMethodError?