pyspark - kafka整合:缺少lib

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了pyspark - kafka整合:缺少lib相关的知识,希望对你有一定的参考价值。

我正在遵循Databricks在此地址中的指示,以便与Kafka开始一个项目:

Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher

代码:

# coding: utf-8
import sys
import os,time
sys.path.append("/usr/local/lib/python2.7/dist-packages")
from pyspark.sql import SparkSession,Row
from pyspark import SparkContext,SQLContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.types import *
import pyspark.sql.functions
import json

spark = SparkSession.builder.appName("Kakfa-test").getOrCreate()
spark.sparkContext.setLogLevel('WARN')


trainingSchema = StructType([
  StructField("code",StringType(),True),
  StructField("ean",StringType(),True),
  StructField("name",StringType(),True),
  StructField("description",StringType(),True),
  StructField("category",StringType(),True),
  StructField("attributes",StringType(),True)
])
trainingDF = spark.createDataFrame(sc.emptyRDD(),trainingSchema)

broker, topic = 
['kafka.partner.stg.some.domain:9092','hybris.products']

df = spark 
.readStream 
.format("kafka") 
.option("kafka.bootstrap.servers", 
"kafka.partner.stg.some.domain:9092") 
.option("subscribe", "hybris.products") 
.option("startingOffsets", "earliest") 
.load()

我的Hadoop版本是2.6,Spark的版本是2.3.0

spark-submit的命令行是:

spark-submit --jars jars/spark-sql-kafka-0-10_2.11-2.3.0.jar kafka-test-002.py

错误消息:

Py4JJavaError:调用o48.load时发生错误。 :java.lang.NoClassDefFoundError:org / apache / kafka / common / serialization / ByteArrayDeserializer at org.apache.spark.sql.kafka010.KafkaSourceProvider $。(KafkaSourceProvider.scala:413)at org.apache.spark.sql.kafka010。 kafkaSourceProvider $。(KafkaSourceProvider.scala)org.apache.spark.sql.kafka010.KafkaSourceProvider.validateStreamOptions(KafkaSourceProvider.scala:360)at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:64)在Org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:231)org.apache.spark.sql.execution.datasources.DataSource.sourceInfo $ lzycompute(DataSource.scala:94)at org位于org.apache的org.apache.spark.sql.execution.streaming.StreamingRelation $ .apply(StreamingRelation.scala:33)的.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:94)。 spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:170)at sun.refl.NativeMethodAccessorImpl.invoke0(Native Method)at sun.refl位于py.j.reflection.MethodInvoker的java.lang.reflect.Method.invoke(Method.java:498)的sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)的ect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) .invoke(MethodInvoker.java:244)py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)py4j.Gateway.invoke(Gateway.java:282)py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java: 132)at py4j.commands.CallCommand.execute(CallCommand.java:79)at py4j.GatewayConnection.run(GatewayConnection.java:214)at java.lang.Thread.run(Thread.java:745)引起:java。 java.Rlass.ClassNotFoundException:org.apache.kafka.common.serialization.ByteArrayDeserializer,java.net.URLClassLoader.findClass(URLClassLoader.java:381),位于java.lang的java.lang.ClassLoader.loadClass(ClassLoader.java:424)。 ClassLoader.loadClass(ClassLoader.java:357)

正如您可以在我上面提到的网站上查看我正在导入的jar文件是完全相同的文件。所以,我不知道为什么会这样。也许没有提到另一个模块?我真的迷失在这里

答案

提到的JAR不包括与kafka客户端的所有依赖关系。您应该使用--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0(如部署:https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html#deploying部分中的文档中所述)

以上是关于pyspark - kafka整合:缺少lib的主要内容,如果未能解决你的问题,请参考以下文章

kafka 到 pyspark 结构化流,将 json 解析为数据帧

Kafka 和 Pyspark 集成

运行 pyspark kafka steam 时出错

从 Docker 容器将 PySpark 连接到 Kafka

在 PySpark 作业上打印 Kafka 调试消息

来自 Kafka 的 pySpark Structured Streaming 不会输出到控制台进行调试