在 Spark 流中找不到 KafkaUtils 类
Posted
技术标签:
【中文标题】在 Spark 流中找不到 KafkaUtils 类【英文标题】:KafkaUtils class not found in Spark streaming 【发布时间】:2015-02-26 22:32:12 【问题描述】:我刚开始使用 Spark Streaming,我正在尝试构建一个示例应用程序来计算 Kafka 流中的单词。虽然它使用sbt package
编译,但当我运行它时,我得到NoClassDefFoundError
。这个post 似乎有同样的问题,但解决方案是针对 Maven 的,我无法用 sbt 重现它。
KafkaApp.scala
:
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object KafkaApp
def main(args: Array[String])
val conf = new SparkConf().setAppName("kafkaApp").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(1))
val kafkaParams = Map(
"zookeeper.connect" -> "localhost:2181",
"zookeeper.connection.timeout.ms" -> "10000",
"group.id" -> "sparkGroup"
)
val topics = Map(
"test" -> 1
)
// stream of (topic, ImpressionLog)
val messages = KafkaUtils.createStream(ssc, kafkaParams, topics, storage.StorageLevel.MEMORY_AND_DISK)
println(s"Number of words: %messages.count()")
build.sbt
:
name := "Simple Project"
version := "1.1"
scalaVersion := "2.10.4"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.1.1",
"org.apache.spark" %% "spark-streaming" % "1.1.1",
"org.apache.spark" %% "spark-streaming-kafka" % "1.1.1"
)
resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
然后我提交:
bin/spark-submit \
--class "KafkaApp" \
--master local[4] \
target/scala-2.10/simple-project_2.10-1.1.jar
错误:
14/12/30 19:44:57 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@192.168.5.252:65077/user/HeartbeatReceiver
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$
at KafkaApp$.main(KafkaApp.scala:28)
at KafkaApp.main(KafkaApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils$
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
【问题讨论】:
【参考方案1】:spark-submit 不会自动放置包含 KafkaUtils 的包。你需要在你的项目中有 JAR。为此,您需要使用 sbt assembly 创建一个包罗万象的 uber-jar。这是一个示例 build.sbt 。
https://github.com/tdas/spark-streaming-external-projects/blob/master/kafka/build.sbt
您显然还需要将程序集插件添加到 SBT。
https://github.com/tdas/spark-streaming-external-projects/tree/master/kafka/project
【讨论】:
我在使用 Maven 时也遇到了同样的问题。之后,我在 pom.xml 中包含了“org.apache.maven.plugins”,但问题没有解决。我还需要检查其他参数吗? 随着变化,如果我运行 stb 包,我得到了错误。 : 错误: 未找到: object AssemblyKeys import AssemblyKeys._ ^ [error] 表达式中的类型错误 @johnsam 把第一个导入行和“assemblySettings”行去掉,对我有用。【参考方案2】:请尝试在提交应用程序时包含所有依赖项 jar:
./spark-submit --name "SampleApp" --deploy-mode client--master spark://host:7077 --class com.stackexchange.SampleApp --jars $SPARK_INSTALL_DIR/spark-streaming-kafka_2。 10-1.3.0.jar,$KAFKA_INSTALL_DIR/libs/kafka_2.10-0.8.2.0.jar,$KAFKA_INSTALL_DIR/libs/metrics-core-2.2.0.jar,$KAFKA_INSTALL_DIR/libs/zkclient-0.3.jar spark- example-1.0-SNAPSHOT.jar
【讨论】:
【参考方案3】:关注build.sbt
为我工作。它还要求您将sbt-assembly
插件放在projects/
目录下的文件中。
build.sbt
name := "NetworkStreaming" // https://github.com/sbt/sbt-assembly/blob/master/Migration.md#upgrading-with-bare-buildsbt
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-streaming_2.10" % "1.4.1",
"org.apache.spark" % "spark-streaming-kafka_2.10" % "1.4.1", // kafka
"org.apache.hbase" % "hbase" % "0.92.1",
"org.apache.hadoop" % "hadoop-core" % "1.0.2",
"org.apache.spark" % "spark-mllib_2.10" % "1.3.0"
)
mergeStrategy in assembly :=
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard
case "log4j.properties" => MergeStrategy.discard
case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines
case "reference.conf" => MergeStrategy.concat
case _ => MergeStrategy.first
项目/plugins.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")
【讨论】:
【参考方案4】:遇到同样的问题,我通过构建带有依赖项的jar来解决它。
将以下代码添加到 pom.xml
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
<plugins>
<!--
Bind the maven-assembly-plugin to the package phase
this will create a jar file without the storm dependencies
suitable for deployment to a cluster.
-->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
mvn 包 提交“example-jar-with-dependencies.jar”
【讨论】:
【参考方案5】:在外部添加了Dependency,project-->properties--> java Build Path-->Libraries--> add External jars,添加需要的jar。
这解决了我的问题。
【讨论】:
【参考方案6】:使用 Spark 1.6 为我完成这项工作,而无需处理这么多外部 jar 的麻烦......管理起来会变得相当复杂......
【讨论】:
【参考方案7】:您也可以下载 jar 文件并将其放在 Spark lib 文件夹中,因为它没有随 Spark 一起安装,而不是拼命尝试打赌 SBT build.sbt 工作。
http://central.maven.org/maven2/org/apache/spark/spark-streaming-kafka-0-10_2.10/2.1.1/spark-streaming-kafka-0-10_2.10-2.1.1.jar
复制到:
/usr/local/spark/spark-2.1.0-bin-hadoop2.6/jars/
【讨论】:
【参考方案8】:在spark-submit
上使用--packages
参数,它采用group:artifact:version,...
格式的mvn 包
【讨论】:
【参考方案9】:import org.apache.spark.streaming.kafka.KafkaUtils
在 build.sbt 中使用以下内容
name := "kafka"
version := "0.1"
scalaVersion := "2.11.12"
retrieveManaged := true
fork := true
//libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.0"
//libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.1.0"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0"
//libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.2.0"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.2.0" % "provided"
// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8-assembly
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-8-assembly" % "2.2.0"
这将解决问题
【讨论】:
以上是关于在 Spark 流中找不到 KafkaUtils 类的主要内容,如果未能解决你的问题,请参考以下文章
在 Scala Spark 中找不到 reduceByKey 方法
在 org.apache.spark.sql.types.DataTypes 中找不到 uuid
在 Spark SQL 中找不到 Hive 表 - Cloudera VM 中的 spark.sql.AnalysisException