从Greenplum上的表中读取数据时,如何在Spark-jdbc应用程序的选项“dbtable”中指定子查询? [复制]
Posted
技术标签:
【中文标题】从Greenplum上的表中读取数据时,如何在Spark-jdbc应用程序的选项“dbtable”中指定子查询? [复制]【英文标题】:How to specify subquery in the option "dbtable" in Spark-jdbc application while reading data from a table on Greenplum? [duplicate] 【发布时间】:2018-12-22 10:39:57 【问题描述】:我正在尝试使用 Spark 将 Greenplum 上的表中的数据读入 HDFS。我在选项中提供了一个子查询来读取 greenplum 表,如下所示。
val execQuery = s"(select $allColumns, 0 as $flagCol from dbanscience.xx_lines where year=2017 and month=12) as xx_lines_periodYear"
println("ExecQuery: " + execQuery)
val dataDF = spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").option("url", conUrl)
.option("dbtable", execQuery)
.option("user", devUsrName).option("password", devPwd)
.option("partitionColumn","id")
.option("lowerBound", 165512)
.option("upperBound", 11521481695656862L)
.option("numPartitions",300).load()
当我运行代码时,我看到以下异常:
Exec query: (select je_header_id,source_system_name,je_line_num,last_update_date,last_updated_by,last_updated_by_name,ledger_id,code_combination_id,balancing_segment,cost_center_segment,period_name,period_year,period_num,effective_date,status,creation_date,created_by,created_by_name,entered_dr,entered_cr,entered_amount,accounted_dr,accounted_cr,accounted_amount,description,sap_document_number,sap_fiscal_year,sap_document_date,sap_posting_date,sap_period,sap_reference,sap_document_header_text,sap_exchange_rate,sap_reference_key,sap_line_item,sap_account_number,sap_cost_element,sap_profit_center,sap_segment,sap_trading_partner,sap_co_order,sap_geography,sap_reference_code,sap_product_line,sap_sender_cost_center,usd_mor_activity_amount::character varying as usd_mor_activity_amount_text, 0 as del_flag from analytics.xx_gl_je_lines where period_year=2017 and period_num=12) as xx_gl_je_lines_periodYear
Exception in thread "main" org.postgresql.util.PSQLException: ERROR: relation "public.(select je_header_id,source_system_name,je_line_num,last_update" does not exist
Position: 15
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:421)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:318)
at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:281)
at com.zaxxer.hikari.pool.ProxyStatement.executeQuery(ProxyStatement.java:111)
at com.zaxxer.hikari.pool.HikariProxyStatement.executeQuery(HikariProxyStatement.java)
at io.pivotal.greenplum.spark.jdbc.Jdbc$.resolveTable(Jdbc.scala:301)
at io.pivotal.greenplum.spark.GreenplumRelationProvider.createRelation(GreenplumRelationProvider.scala:29)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:309)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
at com.partition.source.chunk$.prepareDF$1(chunk.scala:153)
at com.partition.source.chunk$.main(chunk.scala:184)
at com.partition.source.chunk.main(chunk.scala)
异常显示:public
为 dbname,子查询 (execQuery) 为 tablename
我尝试将 exec 查询设为:
val execQuery = s"(select $allColumns, 0 as $flagCol from analytics.xx_gl_je_lines where period_year=2017 and period_num=12) as xx_gl_je_lines_periodYear"
或
val execQuery = s"select $allColumns, 0 as $flagCol from analytics.xx_gl_je_lines where period_year=2017 and period_num=12 as xx_gl_je_lines_periodYear"
它们都没有工作。我正在使用 jar:greenplum-spark_2.11-1.4.0.jar 从 greenplum 读取数据。 以下是我尝试使用的 spark-submit:
SPARK_MAJOR_VERSION=2 spark-submit --class com.partition.source.chunk --master=yarn --conf spark.ui.port=4090 --driver-class-path /home/ibusr/jars/greenplum-spark_2.11-1.4.0.jar --conf spark.jars=/home/ibusr/jars/greenplum-spark_2.11-1.4.0.jar --executor-cores 3 --executor-memory 13G --keytab /home/ibusr/ibusr.keytab --principal ibusr@dev.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,connections.properties --name Splinter --conf spark.executor.extraClassPath=/home/ibusr/jars/greenplum-spark_2.11-1.4.0.jar splinter_2.11-0.1.jar
我通过参考 greenplumn 文档中的说明编写代码:https://greenplum-spark.docs.pivotal.io/100/read_from_gpdb.html
我无法确定我在这里犯的错误。谁能告诉我如何解决这个问题?
【问题讨论】:
【参考方案1】:将dbtable
替换为子查询的选项是内置JDBC 数据源的一项功能。但是 Greenplum Spark 连接器似乎没有提供这样的功能。
具体来源由dbschema
和dbtable
标识,其中the latter one should be(强调我的):
Greenplum 数据库表的名称。从 Greenplum 数据库读取时,此表必须驻留在
dbschema
选项值中标识的 Greenplum 数据库模式中。
这解释了你得到的异常。
同时,您共享的代码中没有任何内容表明您确实需要此类功能。由于您不应用任何特定于数据库的逻辑,因此该过程可能会简单地重写为
import org.apache.spark.sql.functions.col, lit
val allColumns: Seq[String] = ???
val dataDF = spark.read.format("greenplum")
.option("url", conUrl)
.option("dbtable", "xx_lines")
.option("dbschema", "dbanscience")
.option("partitionColumn", "id")
.option("user", devUsrName)
.option("password", devPwd)
.load()
.where("year = 2017 and month=12")
.select(allColumns map col:_*)
.withColumn(flagCol, lit(0))
请注意您使用的其他选项(upperBound
、lowerBound
、numPartitions
)are neither supported 也不是必需的。
根据官方文档:
Greenplum 数据库跨段存储表数据。使用 Greenplum-Spark 连接器加载 Greenplum 数据库表的 Spark 应用程序将特定表列标识为分区列。连接器使用此列中的数据值将每个 Greenplum 数据库段上的特定表数据行分配给一个或多个 Spark 分区。
所以如你所见,分发机制与内置的 JDBC 源完全不同。
连接器还提供了一个额外的partitionsPerSegment
option
which sets:
每个 Greenplum 数据库段的 Spark 分区数。可选,默认值为1个分区。
【讨论】:
以上是关于从Greenplum上的表中读取数据时,如何在Spark-jdbc应用程序的选项“dbtable”中指定子查询? [复制]的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Greenplum/Postgres 中使用 PL/R 反序列化模型对象?