从 Dataflow 作业连接到 Cloud SQL

Posted

技术标签:

【中文标题】从 Dataflow 作业连接到 Cloud SQL【英文标题】:Connecting to Cloud SQL from Dataflow Job 【发布时间】:2017-11-25 17:56:27 【问题描述】:

我正在努力将 JdbcIO 与 Apache Beam 2.0 (Java) 一起使用,以从同一项目中的 Dataflow 连接到 Cloud SQL 实例。

我收到以下错误:

java.sql.SQLException: Cannot create PoolableConnectionFactory (Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.)

根据文档,如果数据流服务帐户 *@dataflow-service-producer-prod.iam.gserviceaccount.com 拥有“编辑”权限,他应该有权访问同一项目中的所有资源。

当我使用 DirectRunner 运行相同的 Dataflow 作业时,一切正常。

这是我正在使用的代码:

private static String JDBC_URL = "jdbc:mysql://myip:3306/mydb?verifyServerCertificate=false&useSSL=true";

PCollection < KV < String, Double >> exchangeRates = p.apply(JdbcIO. < KV < String, Double >> read()
 .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", JDBC_URL)
  .withUsername(JDBC_USER).withPassword(JDBC_PW))
 .withQuery(
  "SELECT CurrencyCode, ExchangeRate FROM mydb.mytable")
 .withCoder(KvCoder.of(StringUtf8Coder.of(), DoubleCoder.of()))
 .withRowMapper(new JdbcIO.RowMapper < KV < String, Double >> () 
  public KV < String, Double > mapRow(ResultSet resultSet) throws Exception 
   return KV.of(resultSet.getString(1), resultSet.getDouble(2));
  
 ));

编辑:

在另一个数据流作业中在梁之外使用以下方法似乎可以与 DataflowRunner 一起正常工作,这告诉我数据库可能不是问题。

java.sql.Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PW);

【问题讨论】:

【参考方案1】:

按照这些说明如何从 Java 连接到 Cloud SQL:

https://cloud.google.com/sql/docs/mysql/connect-external-app#java

我设法让它工作。

这就是代码的样子(您必须将 MYDBNAME、MYSQLINSTANCE、USER 和 PASSWORD 替换为您的值。

注意:MYSQLINSTANCE 格式为 project:zone:instancename。

我使用自定义类 (Customer) 来存储每一行​​的值,而不是键值对。

p.apply(JdbcIO. <Customer> read()
    .withDataSourceConfiguration(
        JdbcIO.DataSourceConfiguration.create(
            "com.mysql.jdbc.Driver", 
            "jdbc:mysql://google/MYDBNAME?cloudSqlInstance=MYSQLINSTANCE&socketFactory=com.google.cloud.sql.mysql.SocketFactory&user=USER&password=PASSWORD&useUnicode=true&characterEncoding=UTF-8"
        )
    )
    .withQuery( "SELECT CustomerId, Name, Location, Email FROM Customers" )
    .withCoder( AvroCoder.of(Customer.class) )
    .withRowMapper(
        new JdbcIO.RowMapper < Customer > ()
        
            @Override
            public Customer mapRow(java.sql.ResultSet resultSet) throws Exception
            
                final Logger LOG = LoggerFactory.getLogger(CloudSqlToBq.class);
                LOG.info(resultSet.getString(2));
                Customer customer = new Customer(resultSet.getInt(1), resultSet.getString(2), resultSet.getString(3), resultSet.getString(3));
                return customer;
            
        
    )
);

我希望这会有所帮助。

【讨论】:

只是好奇,Customer 类会是什么样子?会是类似于JavaBean 的类吗?【参考方案2】:

您好,它以您的方式为我工作。另外,我从 db 配置方法中删除了用户名和密码方法,我的管道配置如下所示

PCollection < KV <  Double, Double >> exchangeRates = p.apply(JdbcIO. < KV <  Double, Double >> read()
     .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", "jdbc:mysql://ip:3306/dbname?user=root&password=root&useUnicode=true&characterEncoding=UTF-8")
             )
     .withQuery(
      "SELECT PERIOD_YEAR, PERIOD_YEAR FROM SALE")
     .withCoder(KvCoder.of(DoubleCoder.of(), DoubleCoder.of()))
     .withRowMapper(new JdbcIO.RowMapper < KV < Double, Double >> () 
      @Override
       public KV<Double, Double> mapRow(java.sql.ResultSet resultSet) throws Exception 
         LOG.info(resultSet.getDouble(1)+ "Came");
          return KV.of(resultSet.getDouble(1), resultSet.getDouble(2));
      
     ));

希望对你有帮助

【讨论】:

【参考方案3】:

我认为这种方法可能效果更好,请尝试 com.mysql.jdbc.GoogleDriver,并使用此处列出的 maven 依赖项。

https://cloud.google.com/appengine/docs/standard/java/cloud-sql/#Java_Connect_to_your_database

相关问题: Where i find and download this jar file com.mysql.jdbc.GoogleDriver?

【讨论】:

嘿@alex-amato,不幸的是,这似乎不适用于gcp数据流,因为我得到“java.sql.SQLException:无法加载JDBC驱动程序类'com.mysql.jdbc.GoogleDriver' " 即使添加了两个 Maven 依赖项也会出错。

以上是关于从 Dataflow 作业连接到 Cloud SQL的主要内容,如果未能解决你的问题,请参考以下文章

如何从 Python SDK 中的 Dataflow 作业连接到 CloudSQL?

通过Dataflow管道写入Cloud SQL非常慢

Google Cloud Dataflow 服务帐户未传播给工作人员?

Google Cloud DataFlow 作业尚不可用.. 在 Airflow

当 ParDo 函数出现错误时,NACK 不会从 Dataflow 发送回 Google Cloud Pub/Sub

Dataflow 中的 BigQuery 无法从 Cloud Storage 加载数据:为非记录字段指定了 JSON 对象