Spark 2.3.1结构化流kafka ClassNotFound [重复]
Posted
技术标签:
【中文标题】Spark 2.3.1结构化流kafka ClassNotFound [重复]【英文标题】:Spark 2.3.1 structured streaming kafka ClassNotFound [duplicate] 【发布时间】:2018-07-14 14:09:06 【问题描述】:我正在尝试将 Spark 2.3.1 结构化流与 Kafka 一起使用。得到以下错误:
java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:635)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:190)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
... 49 elided
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:618)
... 51 more
请求建议。
Scala 代码(使用 IntelliJ 2018.1):
import org.apache.log4j.Logger._
import org.apache.spark.sql.SparkSession
object test
def main(args: Array[String]): Unit =
println("test3")
import org.apache.log4j._
getLogger("org").setLevel(Level.ERROR)
getLogger("akka").setLevel(Level.ERROR)
val spark = SparkSession.
builder.
master("local").
appName("StructuredNetworkWordCount").
getOrCreate()
import spark.implicits._
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "t_tweets")
.load()
lines.selectExpr("CAST(value AS STRING)")
.as[(String)]
// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
Build.sbt:
name := "scalaSpark3"
version := "0.1"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.1"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.1"
// https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.1" % "provided"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "1.1.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.3.1" % "provided"
完整的错误日志:
objc[7301]: Class JavaLaunchHelper is implemented in both /Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/bin/java (0x10c6334c0) and /Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/libinstrument.dylib (0x10c6bf4e0). One of the two will be used. Which one is undefined.
test3
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:635)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:159)
at test$.main(test.scala:33)
at test.main(test.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:618)
... 3 more
我的代码基于此处的示例代码: https://spark.apache.org/docs/2.3.1/structured-streaming-programming-guide.html https://spark.apache.org/docs/2.3.1/structured-streaming-kafka-integration.html
【问题讨论】:
【参考方案1】:Kafka 集成未包含在 Spark 类路径中
因此,删除provided
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.1" % "provided"
并确保在运行 Spark Submit 之前创建一个 uber jar
【讨论】:
以上是关于Spark 2.3.1结构化流kafka ClassNotFound [重复]的主要内容,如果未能解决你的问题,请参考以下文章
结构化流 Kafka 2.1->Zeppelin 0.8->Spark 2.4:spark 不使用 jar
如何从 Spark 结构化流中的 Cassandra 等外部存储读取 Kafka 和查询?
Spark2 Kafka结构化流Java不知道from_json函数
如何在 Spark 结构化流中手动设置 group.id 并提交 kafka 偏移量?