从 Postgres 加载之前的 Pyspark 过滤结果(不要先加载整个表)
Posted
技术标签:
【中文标题】从 Postgres 加载之前的 Pyspark 过滤结果(不要先加载整个表)【英文标题】:Pyspark filter results before loading from Postgres (do not load entire table first) 【发布时间】:2020-12-14 14:13:28 【问题描述】:我正在尝试将大量数据从 VPC 中的 RDS Postgres 实例迁移到同一 VPC 中的 redshift 集群。我正在尝试使用 PySpark 和 AWS Glue 来做到这一点。我只想迁移最近 6 个月的数据,但是我的查询似乎正在执行整个有问题的表的加载,然后对其进行过滤,这会导致内存故障。这是我到目前为止的代码:
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
sc = SparkContext()
sc.setLogLevel('WARN')
glueContext = GlueContext(sc)
spark = glueContext.spark_session
datasource0 = glueContext.create_dynamic_frame.from_catalog(database="db", table_name="table")
datasource0.printSchema()
filtered_dyF = Filter.apply(frame = datasource0, f = lambda x: x["scandate"] > "2020-05-31")
print(filtered_dyF.count())
有什么方法可以在负载查询上应用该过滤器吗?此路径当前尝试select * from table
,我希望它改为select * from table where scandate > "2020-05-31"
【问题讨论】:
试试这个答案? ***.com/a/50294863/14165730 @mck 不幸的是,我在 Postgres 中的数据没有分区,所以使用下推谓词将不起作用 或者你可以使用pyspark的jdbc阅读器吗?您可以使用 jdbc 阅读器将查询读入数据帧 我必须使用 AWS 粘合连接,因为我的数据库位于 VPC 中 【参考方案1】:我最终只使用了 AWS Database Migration Service。实际上很无痛
【讨论】:
以上是关于从 Postgres 加载之前的 Pyspark 过滤结果(不要先加载整个表)的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark 连接到 ipython 笔记本中的 Postgres 数据库
批量从Dataframe插入到DB,忽略Pyspark中的失败行