即使使用较旧的 spark 版本,也没有名为“pyspark.streaming.kafka”的模块

Posted

技术标签:

【中文标题】即使使用较旧的 spark 版本,也没有名为“pyspark.streaming.kafka”的模块【英文标题】:No module named 'pyspark.streaming.kafka' even with older spark version 【发布时间】:2020-11-13 03:10:54 【问题描述】:

在另一个类似的 question 中,他们暗示“安装较旧的 spark 2.4.5”。

编辑:上面链接中的解决方案说“安装 spark 2.4.5,它确实有 kafkautils。但问题是我无法下载 spark2.4.5 - 即使在存档中也不可用。

我听从了建议,安装了旧版本的 spark - spark2.4.6(唯一可用的旧版本)并且还有 python37、kafka-python、pyspark 库。

我的 spark_job.py 文件需要使用 kafka

from pyspark.streaming.kafka import KafkaUtils

点击“python spark_job.py”时

ModuleNotFoundError: No module named 'pyspark.streaming.kafka'

错误仍然存​​在!

spark_job.py:

from __future__ import print_function
import sys
import os
import shutil

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession
from pyspark.streaming.kafka import KafkaUtils # this is the problem
import json


outputPath = 'C:/Users/Admin/Desktop/kafka_project/checkpoint_01'


def getSparkSessionInstance(sparkConf):
    if ('sparkSessionSingletonInstance' not in globals()):
        globals()['sparkSessionSingletonInstance'] = SparkSession\
            .builder\
            .config(conf=sparkConf)\
            .getOrCreate()
    return globals()['sparkSessionSingletonInstance']

#-------------------------------------------------
# What I want to do per each RDD...
#-------------------------------------------------
def process(time, rdd):

    print("===========-----> %s <-----===========" % str(time))

    try:
        spark = getSparkSessionInstance(rdd.context.getConf())

        rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))
                                       
        testDataFrame = spark.createDataFrame(rowRdd)

        testDataFrame.createOrReplaceTempView("treasury_stream")

        sql_query = get_sql_query()
        testResultDataFrame = spark.sql(sql_query)
        testResultDataFrame.show(n=5)

        # Insert into DB
        try:
            testResultDataFrame.write \
                .format("jdbc") \
                .mode("append") \
                .option("driver", 'org.postgresql.Driver') \
                .option("url", "jdbc:postgresql://myhabrtest.cuyficqfa1h0.ap-south-1.rds.amazonaws.com:5432/habrDB") \
                .option("dbtable", "transaction_flow") \
                .option("user", "habr") \
                .option("password", "habr12345") \
                .save()
        except Exception as e:
            print("--> Opps! It seems an Errrorrr with DB working!", e)

    except Exception as e:
        print("--> Opps! Is seems an Error!!!", e)

#-------------------------------------------------
# General function
#-------------------------------------------------
def createContext():

    sc = SparkContext(appName="PythonStreamingKafkaTransaction")
    sc.setLogLevel("ERROR")
    
    ssc = StreamingContext(sc, 2)

    broker_list, topic = sys.argv[1:]

    try:
        directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                        [topic],
                                        "metadata.broker.list": broker_list)
    except:
        raise ConnectionError("Kafka error: Connection refused: \
                            broker_list= topic=".format(broker_list, topic))

    parsed_lines = directKafkaStream.map(lambda v: json.loads(v[1]))

    # RDD handling
    parsed_lines.foreachRDD(process)

    return ssc


if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: spark_job.py <zk> <topic>", file=sys.stderr)
        exit(-1)
        
    print("--> Creating new context")
    if os.path.exists(outputPath):
        shutil.rmtree('outputPath')

    ssc = StreamingContext.getOrCreate(outputPath, lambda: createContext())
    ssc.start()
    ssc.awaitTermination()

【问题讨论】:

您能否运行print(spark.version) 调用来验证正在使用的 Spark 版本?查看PySpark 2.4.5 docs,它们确实包含pyspark.streaming.kafka.KafkaUtils 类。 @Powers 这就是问题所在——我使用的是 2.4.6,而不是 2.4.5!我尝试安装 2.4.5 它在下载页面上不存在,甚至在存档中也不存在! PySpark 2.4.6 在 PyPi 中。 KafkaUtils is also in 2.4.6。您可能想尝试使用像 Poetry with PySpark 这样的 Python 依赖项管理系统,以确保您使用的是具有所需依赖项的虚拟环境。 @Powers,谢谢我刚刚使用 pip 降级了 【参考方案1】:

我只是使用 pip 降级了它:

pip install --force-reinstall pyspark==2.4.6

我没有使用任何诗歌。重新安装后,kafkaUtils pkg 被识别。

【讨论】:

以上是关于即使使用较旧的 spark 版本,也没有名为“pyspark.streaming.kafka”的模块的主要内容,如果未能解决你的问题,请参考以下文章

如何安装较旧的 Goclipse 版本?无法在 Eclipse 4.5.2 (Mars.2) 上安装 Goclipse 16.1

如何在较旧的 GPU 上测试 OpenGL 应用程序?

将较新的 Visual Studio 版本与较旧的 MFC 版本一起使用?

如何使用 2010 \ 2011 sdk 编译较旧的 3dsmax 版本?

将最新版本的 TypeScript 定义与较旧的 JavaScript 库一起使用

使用较旧的 generator-angular-fullstack 版本