Spark 使用 Data Frame 读取 CSV 文件并从 PostgreSQL DB 查询

Posted

技术标签:

【中文标题】Spark 使用 Data Frame 读取 CSV 文件并从 PostgreSQL DB 查询【英文标题】:Spark read CSV file using Data Frame and query from PostgreSQL DB 【发布时间】:2021-01-08 15:40:23 【问题描述】:

我是 Spark 的新手,我正在使用下面给出的数据框代码加载一个巨大的 CSV 文件

Dataset<Row> df = sqlContext.read().format("com.databricks.spark.csv").schema(customSchema)
                .option("delimiter", "|").option("header", true).load(inputDataPath);

现在在数据框中加载 CSV 数据后,现在我想遍历每一行并基于一些列想要从 PostgreSQL DB 查询(执行一些几何操作)。稍后想将从数据库返回的一些字段与数据框记录合并。最好的方法是什么,考虑大量的记录。 任何帮助表示赞赏。我正在使用 Java。

【问题讨论】:

从不迭代行。对另一个数据库使用join 【参考方案1】:

就像@mck 也指出:最好的方法是使用join。 使用 Spark,您可以使用 DataRame Api 读取外部 jdbc 表 例如

val props = Map(....)
spark.read.format("jdbc").options(props).load()

请参阅DataFrameReader scaladoc 了解更多选项以及您需要设置的属性和值。

然后使用join来合并字段

【讨论】:

考虑到我不能加入,因为 PostgreSQL 中的一些几何操作 对不起,我不明白:如果您可以对 postgres 进行查询,那么您也可以使用该查询将结果集作为数据框。如果你必须在查询中做几何操作,spark 会将该查询下推到 postgresql db 可以发一个查询的例子吗? 我的意思是我们如何加入,我们没有任何主要字段要加入,我的要求是在数据框中加载数据,然后对于每条记录我想从 PostgreSQL 数据库中获取一些附加信息(考虑一些几何操作(如 ST_Distance)然后将来自 Postgres 的附加数据(两个几何之间的距离)与数据框行结合起来。

以上是关于Spark 使用 Data Frame 读取 CSV 文件并从 PostgreSQL DB 查询的主要内容,如果未能解决你的问题,请参考以下文章

使用 Java 在 Spark Data Frame 中添加空值列

Spark:如何从具有属性的多个嵌套 XML 文件转换为 Data Frame 数据

read.csv() 读取 data.frame OK readr::read_csv() for the same data.frame 失败,为啥?

redshift data.frame 没有被写入

R vs Python:构建data.frame读取csv与统计描述

如何使我的 Spark Accumulator 统计信息在 Azure Databricks 中可靠?