Greenplum Spark Connector org.postgresql.util.PSQLException: ERROR: error when writing data to gpfdi

Posted

技术标签:

【中文标题】Greenplum Spark Connector org.postgresql.util.PSQLException: ERROR: error when writing data to gpfdist【英文标题】: 【发布时间】:2020-03-25 02:39:43 【问题描述】:

我在 Azure 上有一个 Greenplum 集群,我正在尝试使用本地机器上的 spark 连接(使用 Pivotal Greenplum Spark 连接器)。

我在我的 scala 代码中做这样的事情:

var options = Map[String, String]()
options += ("url" -> url)
options += ("user" -> credential("user"))
options += ("password" -> credential("password"))
options += ("partitionColumn" -> partitionColumn)

sqlContext.read.format("greenplum").options(options).load()

出于测试目的,我创建了一个用户:

DROP USER IF EXISTS user1;
CREATE USER user1 CREATEEXTTABLE (type='writable') PASSWORD 'p@ss0rd';

然后使用这个用户创建一个数据库/表如下

drop table if exists sample;
create table public.sample (id serial, big bigint, wee smallint, stuff text) distributed by (id) ;
insert into sample (big) values (generate_series(1,100));
update sample set wee = 0; 
update sample set wee = 1 where mod(id,7)=1;
update sample set stuff = substr('abcdefghijklmnopqrstuvwxyz',1,mod(wee,13));

现在,当我使用 Greenplum 凭据执行我的 spark 代码时,似乎(在调试模式下运行时)代码正在成功读取表元数据(它获取列和类型),但读取行失败并显示 SQLSTATE(08006), ErrorCode(0)。这是堆栈跟踪:

2020-03-24 19:04:31,168 WARN [Executor task launch worker for task 0] com.zaxxer.hikari.pool.ProxyConnection - HikariPool-1 - Connection org.postgresql.jdbc.PgConnection@14fab679 marked as broken because of SQLSTATE(08006), ErrorCode(0)
org.postgresql.util.PSQLException: ERROR: error when writing data to gpfdist http://127.0.0.1:60352/spark_e0aa1f0c8646f023_fffec8bf08e0054d_driver_261, quit after 8 tries  (seg1 172.21.0.4:6001 pid=25909)
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2310)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2023)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:217)
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:421)
    at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:318)
    at org.postgresql.jdbc.PgStatement.executeUpdate(PgStatement.java:294)
    at com.zaxxer.hikari.pool.ProxyStatement.executeUpdate(ProxyStatement.java:120)
    at com.zaxxer.hikari.pool.HikariProxyStatement.executeUpdate(HikariProxyStatement.java)
    at io.pivotal.greenplum.spark.jdbc.Jdbc$$anonfun$2.apply(Jdbc.scala:81)
    at io.pivotal.greenplum.spark.jdbc.Jdbc$$anonfun$2.apply(Jdbc.scala:79)
    at resource.AbstractManagedResource$$anonfun$5.apply(AbstractManagedResource.scala:88)
    at scala.util.control.Exception$Catch$$anonfun$either$1.apply(Exception.scala:125)
    at scala.util.control.Exception$Catch$$anonfun$either$1.apply(Exception.scala:125)
    at scala.util.control.Exception$Catch.apply(Exception.scala:103)
    at scala.util.control.Exception$Catch.either(Exception.scala:125)
    at resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88)
    at resource.ManagedResourceOperations$class.apply(ManagedResourceOperations.scala:26)
    at resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50)
    at resource.DeferredExtractableManagedResource$$anonfun$tried$1.apply(AbstractManagedResource.scala:33)
    at scala.util.Try$.apply(Try.scala:192)
    at resource.DeferredExtractableManagedResource.tried(AbstractManagedResource.scala:33)
    at io.pivotal.greenplum.spark.jdbc.Jdbc$.copyTable(Jdbc.scala:83)
    at io.pivotal.greenplum.spark.externaltable.GreenplumRowIterator.liftedTree1$1(GreenplumRowIterator.scala:105)
    at io.pivotal.greenplum.spark.externaltable.GreenplumRowIterator.<init>(GreenplumRowIterator.scala:104)
    at io.pivotal.greenplum.spark.GreenplumRDD.compute(GreenplumRDD.scala:49)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
    at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1165)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

我尝试在 Azure 上的 Greenplum 集群上打开端口,但没有帮助

有什么线索吗?

【问题讨论】:

【参考方案1】:

Spark 连接器在每个 Spark 工作程序节点上启动一个 gpfdist 服务器,确定工作程序运行所在机器的地址/主机名,并将其报告给 Greenplum,以便 Greenplum 可以向其发送数据。在您的情况下,当您在本地计算机上运行时,这解析为 127.0.0.1,并且 Azure 上的 Greenplum 无法连接到 http://127.0.0.1:60352 上的 gpfdist 服务器

为此,您的 Spark 工作人员必须可以通过可路由的 IP 地址或 DNS 可解析的主机名从 Azure 访问。您可以通过指定此处描述的选项来指定是使用主机名还是 IP 地址(以及用于获取 IP 地址的网络接口):https://greenplum-spark.docs.pivotal.io/1-6/using_the_connector.html#addrcfg

【讨论】:

【参考方案2】:

正如@denalex 在他的回答中提到的,从本地机器运行 spark 可能不是最简单的,因为 Greenplum 不知道将数据发送到哪里。

另一种方法是在 Azure 上启动 Spark 集群或主机,并使其对同一网络上的 Greenplum 可见。

【讨论】:

感谢@oliverallbertini,恐怕我的网络上不能有Greenplum,因为我不拥有数据库,同样不能暴露火花集群。您有什么替代方案,使用简单的 JDBC 驱动程序可以与 Spark 一起使用吗? @bachr 我会尝试在云端(Azure、AWS、GCP 等)的某个地方创建一个 Spark 集群,并使用网络规则来允许来自 Greenplum 主机的流量。 我已经使用了带有 Spark SQL 的 Greenplum JDBC 驱动程序。由于 pgBouncer,我遇到了一些问题,但到目前为止一切正常。仍然需要研究读取/写入几 GB 数据的性能。

以上是关于Greenplum Spark Connector org.postgresql.util.PSQLException: ERROR: error when writing data to gpfdi的主要内容,如果未能解决你的问题,请参考以下文章

无法使用 Spark 连接器从 GreenPlum 读取

spark sql连接greenplum验证结果

Spark Streaming 加入 GreenPlum/Postgres 数据。方法

从Greenplum上的表中读取数据时,如何在Spark-jdbc应用程序的选项“dbtable”中指定子查询? [复制]

Datastage 作业使用 ODBC Greenplum Wire Protocol 驱动程序使 netezza 无法加载到 greenplum 数据

将 Spark 和 Mysql 与 mysql-connector-java 一起使用