即使使用较旧的 spark 版本,也没有名为“pyspark.streaming.kafka”的模块
Posted
技术标签:
【中文标题】即使使用较旧的 spark 版本,也没有名为“pyspark.streaming.kafka”的模块【英文标题】:No module named 'pyspark.streaming.kafka' even with older spark version 【发布时间】:2020-11-13 03:10:54 【问题描述】:在另一个类似的 question 中,他们暗示“安装较旧的 spark 2.4.5”。
编辑:上面链接中的解决方案说“安装 spark 2.4.5,它确实有 kafkautils。但问题是我无法下载 spark2.4.5 - 即使在存档中也不可用。
我听从了建议,安装了旧版本的 spark - spark2.4.6(唯一可用的旧版本)并且还有 python37、kafka-python、pyspark 库。
我的 spark_job.py 文件需要使用 kafka
from pyspark.streaming.kafka import KafkaUtils
点击“python spark_job.py”时
ModuleNotFoundError: No module named 'pyspark.streaming.kafka'
错误仍然存在!
spark_job.py:
from __future__ import print_function
import sys
import os
import shutil
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession
from pyspark.streaming.kafka import KafkaUtils # this is the problem
import json
outputPath = 'C:/Users/Admin/Desktop/kafka_project/checkpoint_01'
def getSparkSessionInstance(sparkConf):
if ('sparkSessionSingletonInstance' not in globals()):
globals()['sparkSessionSingletonInstance'] = SparkSession\
.builder\
.config(conf=sparkConf)\
.getOrCreate()
return globals()['sparkSessionSingletonInstance']
#-------------------------------------------------
# What I want to do per each RDD...
#-------------------------------------------------
def process(time, rdd):
print("===========-----> %s <-----===========" % str(time))
try:
spark = getSparkSessionInstance(rdd.context.getConf())
rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
currency=w['currency'],
amount=w['amount']))
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")
sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)
# Insert into DB
try:
testResultDataFrame.write \
.format("jdbc") \
.mode("append") \
.option("driver", 'org.postgresql.Driver') \
.option("url", "jdbc:postgresql://myhabrtest.cuyficqfa1h0.ap-south-1.rds.amazonaws.com:5432/habrDB") \
.option("dbtable", "transaction_flow") \
.option("user", "habr") \
.option("password", "habr12345") \
.save()
except Exception as e:
print("--> Opps! It seems an Errrorrr with DB working!", e)
except Exception as e:
print("--> Opps! Is seems an Error!!!", e)
#-------------------------------------------------
# General function
#-------------------------------------------------
def createContext():
sc = SparkContext(appName="PythonStreamingKafkaTransaction")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 2)
broker_list, topic = sys.argv[1:]
try:
directKafkaStream = KafkaUtils.createDirectStream(ssc,
[topic],
"metadata.broker.list": broker_list)
except:
raise ConnectionError("Kafka error: Connection refused: \
broker_list= topic=".format(broker_list, topic))
parsed_lines = directKafkaStream.map(lambda v: json.loads(v[1]))
# RDD handling
parsed_lines.foreachRDD(process)
return ssc
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: spark_job.py <zk> <topic>", file=sys.stderr)
exit(-1)
print("--> Creating new context")
if os.path.exists(outputPath):
shutil.rmtree('outputPath')
ssc = StreamingContext.getOrCreate(outputPath, lambda: createContext())
ssc.start()
ssc.awaitTermination()
【问题讨论】:
您能否运行print(spark.version)
调用来验证正在使用的 Spark 版本?查看PySpark 2.4.5 docs,它们确实包含pyspark.streaming.kafka.KafkaUtils
类。
@Powers 这就是问题所在——我使用的是 2.4.6,而不是 2.4.5!我尝试安装 2.4.5 它在下载页面上不存在,甚至在存档中也不存在!
PySpark 2.4.6 在 PyPi 中。 KafkaUtils is also in 2.4.6。您可能想尝试使用像 Poetry with PySpark 这样的 Python 依赖项管理系统,以确保您使用的是具有所需依赖项的虚拟环境。
@Powers,谢谢我刚刚使用 pip 降级了
【参考方案1】:
我只是使用 pip 降级了它:
pip install --force-reinstall pyspark==2.4.6
我没有使用任何诗歌。重新安装后,kafkaUtils pkg 被识别。
【讨论】:
以上是关于即使使用较旧的 spark 版本,也没有名为“pyspark.streaming.kafka”的模块的主要内容,如果未能解决你的问题,请参考以下文章
如何安装较旧的 Goclipse 版本?无法在 Eclipse 4.5.2 (Mars.2) 上安装 Goclipse 16.1
将较新的 Visual Studio 版本与较旧的 MFC 版本一起使用?
如何使用 2010 \ 2011 sdk 编译较旧的 3dsmax 版本?