从 Dataflow 作业连接到 Cloud SQL
Posted
技术标签:
【中文标题】从 Dataflow 作业连接到 Cloud SQL【英文标题】:Connecting to Cloud SQL from Dataflow Job 【发布时间】:2017-11-25 17:56:27 【问题描述】:我正在努力将 JdbcIO 与 Apache Beam 2.0 (Java) 一起使用,以从同一项目中的 Dataflow 连接到 Cloud SQL 实例。
我收到以下错误:
java.sql.SQLException: Cannot create PoolableConnectionFactory (Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.)
根据文档,如果数据流服务帐户 *@dataflow-service-producer-prod.iam.gserviceaccount.com 拥有“编辑”权限,他应该有权访问同一项目中的所有资源。
当我使用 DirectRunner 运行相同的 Dataflow 作业时,一切正常。
这是我正在使用的代码:
private static String JDBC_URL = "jdbc:mysql://myip:3306/mydb?verifyServerCertificate=false&useSSL=true";
PCollection < KV < String, Double >> exchangeRates = p.apply(JdbcIO. < KV < String, Double >> read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", JDBC_URL)
.withUsername(JDBC_USER).withPassword(JDBC_PW))
.withQuery(
"SELECT CurrencyCode, ExchangeRate FROM mydb.mytable")
.withCoder(KvCoder.of(StringUtf8Coder.of(), DoubleCoder.of()))
.withRowMapper(new JdbcIO.RowMapper < KV < String, Double >> ()
public KV < String, Double > mapRow(ResultSet resultSet) throws Exception
return KV.of(resultSet.getString(1), resultSet.getDouble(2));
));
编辑:
在另一个数据流作业中在梁之外使用以下方法似乎可以与 DataflowRunner 一起正常工作,这告诉我数据库可能不是问题。
java.sql.Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PW);
【问题讨论】:
【参考方案1】:按照这些说明如何从 Java 连接到 Cloud SQL:
https://cloud.google.com/sql/docs/mysql/connect-external-app#java
我设法让它工作。
这就是代码的样子(您必须将 MYDBNAME、MYSQLINSTANCE、USER 和 PASSWORD 替换为您的值。
注意:MYSQLINSTANCE 格式为 project:zone:instancename。
我使用自定义类 (Customer) 来存储每一行的值,而不是键值对。
p.apply(JdbcIO. <Customer> read()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver",
"jdbc:mysql://google/MYDBNAME?cloudSqlInstance=MYSQLINSTANCE&socketFactory=com.google.cloud.sql.mysql.SocketFactory&user=USER&password=PASSWORD&useUnicode=true&characterEncoding=UTF-8"
)
)
.withQuery( "SELECT CustomerId, Name, Location, Email FROM Customers" )
.withCoder( AvroCoder.of(Customer.class) )
.withRowMapper(
new JdbcIO.RowMapper < Customer > ()
@Override
public Customer mapRow(java.sql.ResultSet resultSet) throws Exception
final Logger LOG = LoggerFactory.getLogger(CloudSqlToBq.class);
LOG.info(resultSet.getString(2));
Customer customer = new Customer(resultSet.getInt(1), resultSet.getString(2), resultSet.getString(3), resultSet.getString(3));
return customer;
)
);
我希望这会有所帮助。
【讨论】:
只是好奇,Customer
类会是什么样子?会是类似于JavaBean
的类吗?【参考方案2】:
您好,它以您的方式为我工作。另外,我从 db 配置方法中删除了用户名和密码方法,我的管道配置如下所示
PCollection < KV < Double, Double >> exchangeRates = p.apply(JdbcIO. < KV < Double, Double >> read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", "jdbc:mysql://ip:3306/dbname?user=root&password=root&useUnicode=true&characterEncoding=UTF-8")
)
.withQuery(
"SELECT PERIOD_YEAR, PERIOD_YEAR FROM SALE")
.withCoder(KvCoder.of(DoubleCoder.of(), DoubleCoder.of()))
.withRowMapper(new JdbcIO.RowMapper < KV < Double, Double >> ()
@Override
public KV<Double, Double> mapRow(java.sql.ResultSet resultSet) throws Exception
LOG.info(resultSet.getDouble(1)+ "Came");
return KV.of(resultSet.getDouble(1), resultSet.getDouble(2));
));
希望对你有帮助
【讨论】:
【参考方案3】:我认为这种方法可能效果更好,请尝试 com.mysql.jdbc.GoogleDriver,并使用此处列出的 maven 依赖项。
https://cloud.google.com/appengine/docs/standard/java/cloud-sql/#Java_Connect_to_your_database
相关问题: Where i find and download this jar file com.mysql.jdbc.GoogleDriver?
【讨论】:
嘿@alex-amato,不幸的是,这似乎不适用于gcp数据流,因为我得到“java.sql.SQLException:无法加载JDBC驱动程序类'com.mysql.jdbc.GoogleDriver' " 即使添加了两个 Maven 依赖项也会出错。以上是关于从 Dataflow 作业连接到 Cloud SQL的主要内容,如果未能解决你的问题,请参考以下文章
如何从 Python SDK 中的 Dataflow 作业连接到 CloudSQL?
Google Cloud Dataflow 服务帐户未传播给工作人员?
Google Cloud DataFlow 作业尚不可用.. 在 Airflow
当 ParDo 函数出现错误时,NACK 不会从 Dataflow 发送回 Google Cloud Pub/Sub
Dataflow 中的 BigQuery 无法从 Cloud Storage 加载数据:为非记录字段指定了 JSON 对象