从 Postgres 加载之前的 Pyspark 过滤结果(不要先加载整个表)

Posted

技术标签:

【中文标题】从 Postgres 加载之前的 Pyspark 过滤结果(不要先加载整个表)【英文标题】:Pyspark filter results before loading from Postgres (do not load entire table first) 【发布时间】:2020-12-14 14:13:28 【问题描述】:

我正在尝试将大量数据从 VPC 中的 RDS Postgres 实例迁移到同一 VPC 中的 redshift 集群。我正在尝试使用 PySpark 和 AWS Glue 来做到这一点。我只想迁移最近 6 个月的数据,但是我的查询似乎正在执行整个有问题的表的加载,然后对其进行过滤,这会导致内存故障。这是我到目前为止的代码:

from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext

sc = SparkContext()
sc.setLogLevel('WARN')
glueContext = GlueContext(sc)
spark = glueContext.spark_session

datasource0 = glueContext.create_dynamic_frame.from_catalog(database="db", table_name="table")
datasource0.printSchema()

filtered_dyF = Filter.apply(frame = datasource0, f = lambda x: x["scandate"] > "2020-05-31")
print(filtered_dyF.count())

有什么方法可以在负载查询上应用该过滤器吗?此路径当前尝试select * from table,我希望它改为select * from table where scandate > "2020-05-31"

【问题讨论】:

试试这个答案? ***.com/a/50294863/14165730 @mck 不幸的是,我在 Postgres 中的数据没有分区,所以使用下推谓词将不起作用 或者你可以使用pyspark的jdbc阅读器吗?您可以使用 jdbc 阅读器将查询读入数据帧 我必须使用 AWS 粘合连接,因为我的数据库位于 VPC 中 【参考方案1】:

我最终只使用了 AWS Database Migration Service。实际上很无痛

【讨论】:

以上是关于从 Postgres 加载之前的 Pyspark 过滤结果(不要先加载整个表)的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark 连接到 ipython 笔记本中的 Postgres 数据库

将 JSON 多行文件加载到 pyspark 数据框中

从数据框批量插入到数据库,忽略 Pyspark 中的失败行

批量从Dataframe插入到DB,忽略Pyspark中的失败行

(Pyspark 使用 JDBC 写入 postgres 失败并出现 NullPointerException

如何使用 PySpark 将 JSON 列类型写入 Postgres?