Spark 从 PostgreSQL 表中读取单列

Posted

技术标签:

【中文标题】Spark 从 PostgreSQL 表中读取单列【英文标题】:Spark read single column from PostgreSQL table 【发布时间】:2018-09-04 19:57:21 【问题描述】:

问题

有没有办法将 (PostreSQL) 数据库表中的特定列作为 Spark DataFrame 加载?

以下是我尝试过的。

预期行为:

下面的代码应该将指定的列存储在内存中,而不是整个表(表对于我的集群来说太大了)。

# make connection in order to get column names
conn = p2.connect(database=database, user=user, password=password, host=host, port="5432")
cursor = conn.cursor()
cursor.execute("SELECT column_name FROM information_schema.columns WHERE table_name = '%s'" % table)

for header in cursor:
    header = header[0]
    df = spark.read.jdbc('jdbc:postgresql://%s:5432/%s' % (host, database), table=table, properties=properties).select(str(header)).limit(10)
    # doing stuff with Dataframe containing this column's contents here before continuing to next column and loading that into memory
    df.show()

实际行为:

发生内存不足异常。我假设这是因为 Spark 尝试加载整个表然后选择一列,而不是仅仅加载选定的列?或者它实际上只是加载了列,但该列太大了;我将列限制为只有 10 个值,所以不应该是这种情况?

2018-09-04 19:42:11 ERROR Utils:91 - uncaught error in thread spark-listener-group-appStatus, stopping SparkContext
java.lang.OutOfMemoryError: GC overhead limit exceeded

【问题讨论】:

【参考方案1】:

只有一列的SQL查询可以在jdbc中使用,而不是“table”参数,请在此处找到一些详细信息:

spark, scala & jdbc - how to limit number of records

【讨论】:

我用查询字符串替换了 table 参数:'SELECT %s FROM %s' % (header, table)。但是,我收到 postgresql 驱动程序中的 SELECT 关键字错误。我注意到我可以将读取重写为 spark.read.format('jdbc') 并通过 dbtable 选项传入查询字符串,但是有没有办法使用 spark.read.jdbc 函数来做到这一点? 括号在此类查询中很重要,请查看:docs.databricks.com/spark/latest/data-sources/… 谢谢!查询字符串 '(SELECT %s FROM %s) %s' % (header, table, header) 起到了作用。

以上是关于Spark 从 PostgreSQL 表中读取单列的主要内容,如果未能解决你的问题,请参考以下文章

Spark JDBC 读取仅在一个分区中结束

根据某些条件从多个表中读取列值 - postgreSQL

使用 psycopg2 copy_expert 从 postgreSQL 表中更快地读取数据

如何从大表中读取所有行?

Spark scala 从列表中选择多列和单列

从Greenplum上的表中读取数据时,如何在Spark-jdbc应用程序的选项“dbtable”中指定子查询? [复制]