减少 pyspark 数据框中的依赖项数量

Posted

技术标签:

【中文标题】减少 pyspark 数据框中的依赖项数量【英文标题】:Reduce number of dependiencies in pyspark dataframe 【发布时间】:2020-04-28 07:34:32 【问题描述】:

我正在运行 pyspark 流式传输作业。对于每个 rdd,我都会使用一些我想要缓存的新数据更新一个临时表,如下所示:

def forach_rdd(rdd):
    sqlContext = SQLContext(rdd.context)
    cached_data_df = sqlContext.sql("SELECT * FROM temp_table WHERE UPDATED_ON >= NOW() - INTERVAL 24 HOUR")

    external_df = sqlContext.read.format("jdbc").options(
        url=config.value.get('host'),
        driver="com.mysql.jdbc.Driver",
        user=config.value.get('username'),
        password=config.value.get('password'),
        fetchsize=25000,
        query="SELECT * FROM temp_table WHERE /*SOME THRESHOLD FOR NEW VALUES*/"
    ).load()

    union_df = cached_data_df.union(external_df).coalesce(3).cache()
    union_df.createOrReplaceTempView('temp_table')

    # operate on union_df 

DStream.foreachRDD(forach_rdd)

几个小时后,spark 作业由于堆栈溢出而崩溃;) 原因很可能与数据框下不断增长的 rdd 依赖树有关。

我的问题是:如何强制 spark 使用更新的数据创建新的数据框,但没有依赖历史记录。

我想像下面这样的东西会起作用,但它似乎效率不高:

sc.parallelize(union_df.collect()).toDF(union_df.schema)

有没有更好的方法来做到这一点?我欢迎任何提示。

[编辑] 我将异常堆栈跟踪上传到 pastebin,因为它有点长: https://pastebin.com/raw/3sPNdyUa

【问题讨论】:

能否也发布***异常的堆栈跟踪 【参考方案1】:

首先用一个急切的检查点替换你的缓存:

union_df = cached_data_df.union(external_df).coalesce(3).checkpoint(True)

这将暂时缓解您的问题,但您应该为流式传输设置更强大的检查点。 Take a look at the docs.

【讨论】:

以上是关于减少 pyspark 数据框中的依赖项数量的主要内容,如果未能解决你的问题,请参考以下文章

从 PySpark 中的数据框中删除重复项

PySpark数据框显示错误的值

读取 pyspark 数据框中的 jsonb 类型字段? [复制]

计算 pyspark 数据框中的聚类成本

Pyspark - 从数据框中删除重复项,保持最后一次出现

在 pyspark 数据框中使用 write.partitionBy 时如何删除重复项?