减少 pyspark 数据框中的依赖项数量
Posted
技术标签:
【中文标题】减少 pyspark 数据框中的依赖项数量【英文标题】:Reduce number of dependiencies in pyspark dataframe 【发布时间】:2020-04-28 07:34:32 【问题描述】:我正在运行 pyspark 流式传输作业。对于每个 rdd,我都会使用一些我想要缓存的新数据更新一个临时表,如下所示:
def forach_rdd(rdd):
sqlContext = SQLContext(rdd.context)
cached_data_df = sqlContext.sql("SELECT * FROM temp_table WHERE UPDATED_ON >= NOW() - INTERVAL 24 HOUR")
external_df = sqlContext.read.format("jdbc").options(
url=config.value.get('host'),
driver="com.mysql.jdbc.Driver",
user=config.value.get('username'),
password=config.value.get('password'),
fetchsize=25000,
query="SELECT * FROM temp_table WHERE /*SOME THRESHOLD FOR NEW VALUES*/"
).load()
union_df = cached_data_df.union(external_df).coalesce(3).cache()
union_df.createOrReplaceTempView('temp_table')
# operate on union_df
DStream.foreachRDD(forach_rdd)
几个小时后,spark 作业由于堆栈溢出而崩溃;) 原因很可能与数据框下不断增长的 rdd 依赖树有关。
我的问题是:如何强制 spark 使用更新的数据创建新的数据框,但没有依赖历史记录。
我想像下面这样的东西会起作用,但它似乎效率不高:
sc.parallelize(union_df.collect()).toDF(union_df.schema)
有没有更好的方法来做到这一点?我欢迎任何提示。
[编辑] 我将异常堆栈跟踪上传到 pastebin,因为它有点长: https://pastebin.com/raw/3sPNdyUa
【问题讨论】:
能否也发布***异常的堆栈跟踪 【参考方案1】:首先用一个急切的检查点替换你的缓存:
union_df = cached_data_df.union(external_df).coalesce(3).checkpoint(True)
这将暂时缓解您的问题,但您应该为流式传输设置更强大的检查点。 Take a look at the docs.
【讨论】:
以上是关于减少 pyspark 数据框中的依赖项数量的主要内容,如果未能解决你的问题,请参考以下文章