Apache Beam - org.apache.beam.sdk.util.UserCodeException:java.sql.SQLException:无法创建 PoolableConnecti

Posted

技术标签:

【中文标题】Apache Beam - org.apache.beam.sdk.util.UserCodeException:java.sql.SQLException:无法创建 PoolableConnectionFactory(不支持方法)【英文标题】:Apache Beam - org.apache.beam.sdk.util.UserCodeException: java.sql.SQLException: Cannot create PoolableConnectionFactory (Method not supported) 【发布时间】:2017-12-07 15:43:25 【问题描述】:

我正在尝试使用 Apache Beam-dataflow 连接到安装在云实例中的配置单元实例。当我运行它时,我得到了以下异常。当我使用 Apache Beam 访问这个数据库时,就会发生这种情况。我见过许多与 apache Beam 或 google 数据流无关的相关问题。

(c9ec8fdbe9d1719a): java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.sql.SQLException: Cannot create PoolableConnectionFactory (Method not supported)
at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:289)
at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:261)
at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:55)
at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:43)
at com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:78)
at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory.create(MapTaskExecutorFactory.java:152)
at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.doWork(DataflowWorker.java:272)
at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244)
at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:125)
at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:105)
at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:92)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.beam.sdk.util.UserCodeException: java.sql.SQLException: Cannot create PoolableConnectionFactory (Method not supported)
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
at org.apache.beam.sdk.io.jdbc.JdbcIO$Read$ReadFn$auxiliary$8CR0LcYI.invokeSetup(Unknown Source)
at com.google.cloud.dataflow.worker.runners.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:65)
at com.google.cloud.dataflow.worker.runners.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:47)
at com.google.cloud.dataflow.worker.runners.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:100)
at com.google.cloud.dataflow.worker.runners.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:70)
at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory.createParDoOperation(MapTaskExecutorFactory.java:365)
at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:278)
... 14 more
        Caused by: java.sql.SQLException: Cannot create PoolableConnectionFactory (Method not supported)
at org.apache.commons.dbcp2.BasicDataSource.createPoolableConnectionFactory(BasicDataSource.java:2294)
at org.apache.commons.dbcp2.BasicDataSource.createDataSource(BasicDataSource.java:2039)
at org.apache.commons.dbcp2.BasicDataSource.getConnection(BasicDataSource.java:1533)
at org.apache.beam.sdk.io.jdbc.JdbcIO$Read$ReadFn.setup(JdbcIO.java:377)
Caused by: java.sql.SQLException: Method not supported
at org.apache.hive.jdbc.HiveConnection.isValid(HiveConnection.java:898)
at org.apache.commons.dbcp2.DelegatingConnection.isValid(DelegatingConnection.java:918)
at org.apache.commons.dbcp2.PoolableConnection.validate(PoolableConnection.java:283)
at org.apache.commons.dbcp2.PoolableConnectionFactory.validateConnection(PoolableConnectionFactory.java:357)
at org.apache.commons.dbcp2.BasicDataSource.validateConnectionFactory(BasicDataSource.java:2307)
at org.apache.commons.dbcp2.BasicDataSource.createPoolableConnectionFactory(BasicDataSource.java:2290)
at org.apache.commons.dbcp2.BasicDataSource.createDataSource(BasicDataSource.java:2039)
at org.apache.commons.dbcp2.BasicDataSource.getConnection(BasicDataSource.java:1533)
at org.apache.beam.sdk.io.jdbc.JdbcIO$Read$ReadFn.setup(JdbcIO.java:377)
at org.apache.beam.sdk.io.jdbc.JdbcIO$Read$ReadFn$auxiliary$8CR0LcYI.invokeSetup(Unknown Source)
at com.google.cloud.dataflow.worker.runners.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:65)
at com.google.cloud.dataflow.worker.runners.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:47)
at com.google.cloud.dataflow.worker.runners.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:100)
at com.google.cloud.dataflow.worker.runners.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:70)
at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory.createParDoOperation(MapTaskExecutorFactory.java:365)
at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:278)
at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:261)
at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:55)
at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:43)
at com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:78)
at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory.create(MapTaskExecutorFactory.java:152)
at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.doWork(DataflowWorker.java:272)
at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244)
at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:125)
at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:105)
at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:92)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

使用相同的连接字符串和驱动程序文件,我可以使用普通的 java-jdbc 程序连接到这个实例。

这个问题已经有一段时间了,我无法找到解决方案。任何人都可以对此提出任何想法吗?

请看下面连接hive的代码sn-p:

PCollection<Customer> collection = dataflowPipeline.apply(JdbcIO.<Customer>read()
            .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
                    .create("org.apache.hive.jdbc.HiveDriver", "jdbc:hive2://<external IP of computer instance>:10000/dbtest")
                    .withUsername("username").withPassword("password"))     
            .withQuery(
                    "select c_customer_id,c_first_name,c_last_name,c_preferred_cust_flag,c_birth_day,from dbtest.customer")     
            .withRowMapper(new JdbcIO.RowMapper<Customer>() 
                @Override
                public Customer mapRow(ResultSet resultSet) throws Exception 
                    // TODO Auto-generated method stub
                    Customer customer = new Customer();
                    customer.setC_customer_id(resultSet.getString("c_customer_id"));
                    customer.setC_first_name(resultSet.getString("c_first_name"));
                    customer.setC_last_name(resultSet.getString("c_last_name"));
                    customer.setC_preferred_cust_flag(resultSet.getString("c_preferred_cust_flag"));
                    customer.setC_birth_day(resultSet.getInt("c_birth_day"));
                    return customer;
                
            ).withCoder(AvroCoder.of(Customer.class)));

【问题讨论】:

【参考方案1】:

Apache DBCP BasicDataSource 使用方法 isValid 来验证连接,该方法不是由旧版本的 Hive JDBC 驱动程序实现的 - 请参阅 JDBC to hive connection fails on invalid operation isValid()。

但是,该方法是在 Hive 2.1.0 之后的版本中实现的。 https://github.com/apache/hive/commit/2d2ab0942482a6ce1523dd9dd0f4094865e93b28 。

您可以使用更新版本的 Hive JDBC 驱动程序吗?

【讨论】:

嗨@jkff。我忘了在这里更新问题。看到这个问题后,我做了一些分析,并尝试使用2.1.1版本的hive-jdbc。现在,我遇到了这个问题 - util.UserCodeException: java.sql.SQLException: Cannot create PoolableConnectionFactory (Could not open client transport with JDBC Uri: jdbc:hive2://:3306/db/: java.net.ConnectException:连接超时)在 dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:289)。看起来它没有连接。 这看起来像是常规网络问题,而不是数据流问题。您是否能够使用来自常规 Java 程序而不是 Dataflow 管道的相同参数连接到同一个数据库? 是的。我可以使用普通的 java 程序成功地提取数据。我想知道,当数据流开始从数据库读取时,它可以创建多少个线程。最初未启用连接池,因为为 hive 配置了 derby。后来我们修改为mysql。但是,安装了 cloudera 的配置较少的 vm 确实很慢,并且由于资源较少而无法接受池化。你认为我的假设是对的吗? 您有我可以查看的数据流作业 ID 吗? 您如何看待托管在我的云实例中的作业?我应该将您添加到示例项目中吗?由于这个问题,我无法完成这个 poc 任务。让我知道我应该如何分享该工作 ID。 @jkff

以上是关于Apache Beam - org.apache.beam.sdk.util.UserCodeException:java.sql.SQLException:无法创建 PoolableConnecti的主要内容,如果未能解决你的问题,请参考以下文章

Apache Beam/数据流重组

Beam:使用窗口边界写入每个窗口元素计数

在 mac zsh 终端上安装 apache-beam[gcp] 时出错 - “zsh: no match found: apache-beam[gcp]”

如何运行 Apache Beam 集成测试?

Python 上的 Apache Beam 将 beam.Map 调用相乘

数据流和 Bigtable 依赖冲突