Spark 使用 Data Frame 读取 CSV 文件并从 PostgreSQL DB 查询
Posted
技术标签:
【中文标题】Spark 使用 Data Frame 读取 CSV 文件并从 PostgreSQL DB 查询【英文标题】:Spark read CSV file using Data Frame and query from PostgreSQL DB 【发布时间】:2021-01-08 15:40:23 【问题描述】:我是 Spark 的新手,我正在使用下面给出的数据框代码加载一个巨大的 CSV 文件
Dataset<Row> df = sqlContext.read().format("com.databricks.spark.csv").schema(customSchema)
.option("delimiter", "|").option("header", true).load(inputDataPath);
现在在数据框中加载 CSV 数据后,现在我想遍历每一行并基于一些列想要从 PostgreSQL DB 查询(执行一些几何操作)。稍后想将从数据库返回的一些字段与数据框记录合并。最好的方法是什么,考虑大量的记录。 任何帮助表示赞赏。我正在使用 Java。
【问题讨论】:
从不迭代行。对另一个数据库使用join
。
【参考方案1】:
就像@mck 也指出:最好的方法是使用join。 使用 Spark,您可以使用 DataRame Api 读取外部 jdbc 表 例如
val props = Map(....)
spark.read.format("jdbc").options(props).load()
请参阅DataFrameReader scaladoc 了解更多选项以及您需要设置的属性和值。
然后使用join来合并字段
【讨论】:
考虑到我不能加入,因为 PostgreSQL 中的一些几何操作 对不起,我不明白:如果您可以对 postgres 进行查询,那么您也可以使用该查询将结果集作为数据框。如果你必须在查询中做几何操作,spark 会将该查询下推到 postgresql db 可以发一个查询的例子吗? 我的意思是我们如何加入,我们没有任何主要字段要加入,我的要求是在数据框中加载数据,然后对于每条记录我想从 PostgreSQL 数据库中获取一些附加信息(考虑一些几何操作(如 ST_Distance)然后将来自 Postgres 的附加数据(两个几何之间的距离)与数据框行结合起来。以上是关于Spark 使用 Data Frame 读取 CSV 文件并从 PostgreSQL DB 查询的主要内容,如果未能解决你的问题,请参考以下文章
使用 Java 在 Spark Data Frame 中添加空值列
Spark:如何从具有属性的多个嵌套 XML 文件转换为 Data Frame 数据
read.csv() 读取 data.frame OK readr::read_csv() for the same data.frame 失败,为啥?