Spark 2.3.1结构化流kafka ClassNotFound [重复]

Posted

技术标签:

【中文标题】Spark 2.3.1结构化流kafka ClassNotFound [重复]【英文标题】:Spark 2.3.1 structured streaming kafka ClassNotFound [duplicate] 【发布时间】:2018-07-14 14:09:06 【问题描述】:

我正在尝试将 Spark 2.3.1 结构化流与 Kafka 一起使用。得到以下错误:

java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:635)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:190)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
  ... 49 elided
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
  at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
  at scala.util.Try$.apply(Try.scala:192)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
  at scala.util.Try.orElse(Try.scala:84)
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:618)
  ... 51 more

请求建议。

Scala 代码(使用 IntelliJ 2018.1):

import org.apache.log4j.Logger._
import org.apache.spark.sql.SparkSession

object test 

  def main(args: Array[String]): Unit = 
    println("test3")
    import org.apache.log4j._
    getLogger("org").setLevel(Level.ERROR)
    getLogger("akka").setLevel(Level.ERROR)

    val spark = SparkSession.
      builder.
      master("local").
      appName("StructuredNetworkWordCount").
      getOrCreate()

    import spark.implicits._

    val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "t_tweets")
      .load()

    lines.selectExpr("CAST(value AS STRING)")
      .as[(String)]

    // Split the lines into words
    val words = lines.as[String].flatMap(_.split(" "))

    // Generate running word count
    val wordCounts = words.groupBy("value").count()

    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .start()

    query.awaitTermination()
  

Build.sbt:

name := "scalaSpark3"

version := "0.1"

scalaVersion := "2.11.8"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.1"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.1"

// https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.1" % "provided"

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "1.1.0"

// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.3.1" % "provided"

完整的错误日志:

objc[7301]: Class JavaLaunchHelper is implemented in both /Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/bin/java (0x10c6334c0) and /Library/Java/JavaVirtualMachines/jdk1.8.0_151.jdk/Contents/Home/jre/lib/libinstrument.dylib (0x10c6bf4e0). One of the two will be used. Which one is undefined.
test3

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:635)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:159)
    at test$.main(test.scala:33)
    at test.main(test.scala)

Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23$$anonfun$apply$15.apply(DataSource.scala:618)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$23.apply(DataSource.scala:618)
    at scala.util.Try.orElse(Try.scala:84)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:618)
    ... 3 more

我的代码基于此处的示例代码: https://spark.apache.org/docs/2.3.1/structured-streaming-programming-guide.html https://spark.apache.org/docs/2.3.1/structured-streaming-kafka-integration.html

【问题讨论】:

【参考方案1】:

Kafka 集成未包含在 Spark 类路径中

因此,删除provided

libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.3.1" % "provided"

并确保在运行 Spark Submit 之前创建一个 uber jar

【讨论】:

以上是关于Spark 2.3.1结构化流kafka ClassNotFound [重复]的主要内容,如果未能解决你的问题,请参考以下文章

结构化流 Kafka 2.1->Zeppelin 0.8->Spark 2.4:spark 不使用 jar

如何从 Spark 结构化流中的 Cassandra 等外部存储读取 Kafka 和查询?

Spark2 Kafka结构化流Java不知道from_json函数

如何在 Spark 结构化流中手动设置 group.id 并提交 kafka 偏移量?

解释 Spark 结构化流执行器和 Kafka 分区之间的映射

如何从 kafka 中的两个生产者那里摄取数据并使用 Spark 结构化流加入?