导入 Pyspark Delta Lake 模块时未找到模块错误

Posted

技术标签:

【中文标题】导入 Pyspark Delta Lake 模块时未找到模块错误【英文标题】:Module not found error when importing Pyspark Delta Lake module 【发布时间】:2020-06-11 14:12:48 【问题描述】:

我正在使用 delta Lake 运行 Pyspark,但是当我尝试导入 delta 模块时,我得到了一个 ModuleNotFoundError: No module named 'delta'。这是在没有互联网连接的机器上,所以我必须从Maven 手动下载 delta-core jar 并将其放入%SPARK_HOME%/jars 文件夹中。

我的程序运行没有任何问题,而且我可以从 delta Lake 读写,所以我很高兴我有正确的 jar。但是当我尝试导入 delta 模块 from delta.tables import * 时,我得到了错误。

有关信息,我的代码是:

import os
from pyspark.sql import SparkSession
from pyspark.sql.types import TimestampType, FloatType, StructType, StructField
from pyspark.sql.functions import input_file_name
from Constants import Constants

if __name__ == "__main__":
    constants = Constants()
    spark = SparkSession.builder.master("local[*]")\
                                .appName("Delta Lake Testing")\
                                .getOrCreate()

    # have to start spark session before importing: https://docs.delta.io/latest/quick-start.html#python
    from delta.tables import *

    # set logging level to limit output
    spark.sparkContext.setLogLevel("ERROR")

    spark.conf.set("spark.sql.session.timeZone", "UTC")
    # push additional python files to the worker nodes
    base_path = os.path.abspath(os.path.dirname(__file__))
    spark.sparkContext.addPyFile(os.path.join(base_path, 'Constants.py'))

    # start pipeline
    schema = StructType([StructField("Timestamp", TimestampType(), False),\
                        StructField("ParamOne", FloatType(), False),\
                        StructField("ParamTwo", FloatType(), False),\
                        StructField("ParamThree", FloatType(), False)])

    df = spark.readStream\
               .option("header", "true")\
               .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")\
               .schema(schema)\
               .csv(constants.input_path)\
               .withColumn("input_file_name", input_file_name())

     df.writeStream\
       .format("delta")\
       .outputMode("append")\
       .option("checkpointLocation", constants.checkpoint_location)\
       .start("/tmp/bronze")

    # await on stream
    sqm = spark.streams
    sqm.awaitAnyTermination()

这是使用 Spark v2.4.4 和 Python v3.6.1 并且使用 spark-submit path/to/job.py 提交作业

【问题讨论】:

这可以帮助你***.com/questions/59170595/… 成功了,谢谢@TheDarkW3b。我不敢相信我在搜索过程中错过了这个问题!如果您愿意,请将其作为答案提交,我会将其标记为完成。 【参考方案1】:
%pyspark
sc.addPyFile("**LOCATION_OF_DELTA_LAKE_JAR_FILE**")
from delta.tables import *

【讨论】:

以上是关于导入 Pyspark Delta Lake 模块时未找到模块错误的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark Delta Lake 捕获表不是 delta 表异常

pyspark delta-lake 元存储

Delta Lake 学习

深入剖析 Delta Lake: MySQL CDC 实战

深入剖析 Delta Lake: MySQL CDC 实战

Delta Lake中CDC的实现