pyspark - kafka整合:缺少lib
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了pyspark - kafka整合:缺少lib相关的知识,希望对你有一定的参考价值。
我正在遵循Databricks在此地址中的指示,以便与Kafka开始一个项目:
Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher
代码:
# coding: utf-8
import sys
import os,time
sys.path.append("/usr/local/lib/python2.7/dist-packages")
from pyspark.sql import SparkSession,Row
from pyspark import SparkContext,SQLContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.types import *
import pyspark.sql.functions
import json
spark = SparkSession.builder.appName("Kakfa-test").getOrCreate()
spark.sparkContext.setLogLevel('WARN')
trainingSchema = StructType([
StructField("code",StringType(),True),
StructField("ean",StringType(),True),
StructField("name",StringType(),True),
StructField("description",StringType(),True),
StructField("category",StringType(),True),
StructField("attributes",StringType(),True)
])
trainingDF = spark.createDataFrame(sc.emptyRDD(),trainingSchema)
broker, topic =
['kafka.partner.stg.some.domain:9092','hybris.products']
df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers",
"kafka.partner.stg.some.domain:9092")
.option("subscribe", "hybris.products")
.option("startingOffsets", "earliest")
.load()
我的Hadoop版本是2.6,Spark的版本是2.3.0
spark-submit
的命令行是:
spark-submit --jars jars/spark-sql-kafka-0-10_2.11-2.3.0.jar kafka-test-002.py
错误消息:
Py4JJavaError:调用o48.load时发生错误。 :java.lang.NoClassDefFoundError:org / apache / kafka / common / serialization / ByteArrayDeserializer at org.apache.spark.sql.kafka010.KafkaSourceProvider $。(KafkaSourceProvider.scala:413)at org.apache.spark.sql.kafka010。 kafkaSourceProvider $。(KafkaSourceProvider.scala)org.apache.spark.sql.kafka010.KafkaSourceProvider.validateStreamOptions(KafkaSourceProvider.scala:360)at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:64)在Org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:231)org.apache.spark.sql.execution.datasources.DataSource.sourceInfo $ lzycompute(DataSource.scala:94)at org位于org.apache的org.apache.spark.sql.execution.streaming.StreamingRelation $ .apply(StreamingRelation.scala:33)的.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:94)。 spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:170)at sun.refl.NativeMethodAccessorImpl.invoke0(Native Method)at sun.refl位于py.j.reflection.MethodInvoker的java.lang.reflect.Method.invoke(Method.java:498)的sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)的ect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) .invoke(MethodInvoker.java:244)py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)py4j.Gateway.invoke(Gateway.java:282)py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java: 132)at py4j.commands.CallCommand.execute(CallCommand.java:79)at py4j.GatewayConnection.run(GatewayConnection.java:214)at java.lang.Thread.run(Thread.java:745)引起:java。 java.Rlass.ClassNotFoundException:org.apache.kafka.common.serialization.ByteArrayDeserializer,java.net.URLClassLoader.findClass(URLClassLoader.java:381),位于java.lang的java.lang.ClassLoader.loadClass(ClassLoader.java:424)。 ClassLoader.loadClass(ClassLoader.java:357)
正如您可以在我上面提到的网站上查看我正在导入的jar文件是完全相同的文件。所以,我不知道为什么会这样。也许没有提到另一个模块?我真的迷失在这里
提到的JAR不包括与kafka客户端的所有依赖关系。您应该使用--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0
(如部署:https://spark.apache.org/docs/2.3.0/structured-streaming-kafka-integration.html#deploying部分中的文档中所述)
以上是关于pyspark - kafka整合:缺少lib的主要内容,如果未能解决你的问题,请参考以下文章
kafka 到 pyspark 结构化流,将 json 解析为数据帧