在 pyspark 中从另一个数据库加载表

Posted

技术标签:

【中文标题】在 pyspark 中从另一个数据库加载表【英文标题】:Load a table from another database in pyspark 【发布时间】:2021-07-29 08:58:34 【问题描述】:

我目前正在使用 AWS 和 PySpark。我的表存储在 S3 中,可从 Athena 查询。

在我的 Glue 工作中,我习惯于将表格加载为:

my_table_df = sparkSession.table("myTable")

但是,这一次,我想访问另一个数据库中的表,位于同一数据源 (AwsDataCatalog) 中。所以我做了一些效果很好的事情:

my_other_table_df = sparkSession.sql("SELECT * FROM anotherDatabase.myOtherTable")

我只是在寻找一种更好的方法来编写相同的东西,而无需使用 SQL 查询,只需为该操作指定数据库即可。应该是这样的

sparkSession.database("anotherDatabase").table("myOtherTable")

欢迎提出建议

【问题讨论】:

【参考方案1】:

您可以为此使用DynamicFrameReader。这将返回一个 DynamicFrame。不过,您只需在该 DynamicFrame 上调用 .toDF() 即可将其转换为本机 Spark DataFrame。

sc = SparkContext()
glue_context = GlueContext(sc)
spark = glue_context.spark_session
job = Job(glue_context)

data_source = glue_context.create_dynamic_frame.from_catalog(
            database="database",
            table_name="table_name"
).toDF()

【讨论】:

几乎完美:AttributeError: 'DynamicFrame' object has no attribute 'toDf' 它是一个函数,而不是一个属性。我已经更新了答案 这就是我所做的。它不起作用。 data_source.toDf() -> `没有属性'toDf'` 它是DF中的大写F而不是小写。 我想补充一点,您还可以指定 catalogue: 在 from_catalog 函数中使用参数 catalog_id="XXX"。 XXX 对应您的帐号 ID。

以上是关于在 pyspark 中从另一个数据库加载表的主要内容,如果未能解决你的问题,请参考以下文章

在 ms Access 数据宏中从另一个表中获取数据

如何在 PySpark 中从表中导入数据时排除 Header

如何在 IBM 的数据科学体验中从 pyspark 访问 postgres 表?

在 Oracle 中从另一个存储过程调用一个存储过程

在 Pyspark 中从 Rest Api 创建数据框时出错

在 SQL 中从另一个表更新一个表的最佳方法是啥?