从 GCP 到本地的 Scala sql 查询远程访问错误
Posted
技术标签:
【中文标题】从 GCP 到本地的 Scala sql 查询远程访问错误【英文标题】:Scala sql query remote access error from GCP to on-premises 【发布时间】:2020-01-23 23:05:19 【问题描述】:我有以下代码:
import org.jooq._
import org.jooq.impl._
import org.jooq.impl.DSL._
import java.sql.DriverManager
import org.apache.log4j.receivers.db.dialect.SQLDialect
val session = SparkSession.builder().getOrCreate()
var df1 = session.emptyDataFrame
var df2 = session.emptyDataFrame
val userName = "user"
val password = "pass"
val c = DriverManager.getConnection("jdbc:mysql://blah_blah.com", userName, password)
df1 = sql(s"select * from $db1_name.$tb1_name")
df2 = c.prepareStatement(s"select * from $db2_name.$tb2_name")
然后我得到以下错误:
found : org.jooq.SQL
required: org.apache.spark.sql.DataFrame
(which expands to)
org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
df1 = sql(s"select * from $db1_name.$tb1_name")
^
found : java.sql.PreparedStatement
required: org.apache.spark.sql.DataFrame
(which expands to)
org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
df2 = c.prepareStatement(s"select * from $db2_name.$tb2_name")
然后根据 cmets 的建议,我将代码更改为:
我有以下 Scala 代码:
val userName = "user"
val password = "pass"
val session = SparkSession.builder().getOrCreate()
var df1 = session.emptyDataFrame
var df2 = session.emptyDataFrame
....
....
df1 = sql(s"select * from $db1_name.$tb1_name")
df2 = session.read.format("jdbc").
option("url", "jdbc:mysql://blah_blah.com").
option("driver", "com.mysql.jdbc.Driver").
option("useUnicode", "true").
option("continueBatchOnError","true").
option("useSSL", "false").
option("user", userName).
option("password", password).
option("dbtable",s"select * from $db2_name.$tb2_name").load()
我收到如下错误:
The last packet sent successfully to the server was 0 milliseconds
ago. The driver has not received any packets from the server.
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:989)
at com.mysql.jdbc.MysqlIO.readPacket(MysqlIO.java:632)
at com.mysql.jdbc.MysqlIO.doHandshake(MysqlIO.java:1016)
at com.mysql.jdbc.ConnectionImpl.coreConnect(ConnectionImpl.java:2194)
at com.mysql.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:2225)
at com.mysql.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:2024)
at com.mysql.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:779)
at com.mysql.jdbc.JDBC4Connection.<init>(JDBC4Connection.java:47)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
at com.mysql.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:389)
at com.mysql.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:330)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:63)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:56)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:115)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:52)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:341)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
... 78 elided
Caused by: java.io.EOFException: Can not read response from server.
Expected to read 4 bytes, read 0 bytes before connection was unexpectedly lost.
at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:3011)
at com.mysql.jdbc.MysqlIO.readPacket(MysqlIO.java:567)
... 100 more
对这两个错误有什么解决方案或建议吗?
我也尝试过 postgresql 和 h2 驱动 => org.postgresql.Driver
但我得到了类似的错误(可能不准确)
【问题讨论】:
Maybe helpful?该解决方案直接使用 spark 查询数据库,而不是从 jdbc 中提取数据,然后尝试将其放入 DataFrame 中。 我得到这个错误:最后一个成功发送到服务器的数据包是 0 毫秒前。驱动没有从服务器收到任何数据包。 spark.sql 其中spark的意思是spark Session。 【参考方案1】:您的问题是 scala 编译器已经将 var ds1 和 ds2 初始化为空数据帧。 您必须尝试直接从 spark 中读取:
spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("query", "select c1, c2 from t1")
.load()
有关其他信息,您可以直接在 apache spark 页面上查看
https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
【讨论】:
我需要将它们保存为数据框。我尝试了您的解决方案,查询的输出不会是数据框。 根据 Apache Spark 库的定义,我编写的代码返回一个 sql.Dataframe。所以你确定写对了吗?发布您的代码! 我可以分配给一个空的数据框。但是连接被拒绝,我不确定为什么: 原因:java.net.ConnectException:连接被拒绝(连接被拒绝)。所以我这样做: var df2 = SparkSession.builder().getOrCreate().emptyDataFrame 然后 df2 = session.read.format("jdbc").option("url", "jdbc:mysql://blah_blah.com" ).option("driver", "com.mysql.jdbc.Driver").option("useUnicode", "true").option("continueBatchOnError","true").option("useSSL", "false" ).option("user", userName).option("password", password).option("dbtable",s"select * from $db2_name.$tb2_name").load() @Alan 你能发布所有连接拒绝的堆栈跟踪并更新你发布的代码吗?您确定您仍然使用正确的用户名和密码吗? 是的,我使用了正确的用户名和密码。我什至使用凭据从 GCP SSH 到服务器。我更新了问题以及我看到的错误。【参考方案2】:您可以通过阅读以下内容获得DataFrame
。设置您的连接详细信息:
val jdbcHostname = "some.host.name"
val jdbcDatabase = "some_db"
val driver = "com.mysql.cj.jdbc.Driver" // update driver as needed, In your case it will be `org.postgresql.Driver`
// url to DB
val jdbcUrl = s"jdbc:mysql://$jdbcHostname:3306/$jdbcDatabase"
val username = "someUser"
val password = "somePass"
// create a properties map for your DB connection
val connectionProperties = new Properties()
connectionProperties.put("user", s"$username")
connectionProperties.put("password", s"$password")
connectionProperties.setProperty("Driver", driver)
然后从 JDBC 中读取为:
// use above created url and connection properties to fetch data
val tableName = "some-table"
val mytable = spark.read.jdbc(jdbcUrl, tableName, connectionProperties)
Spark 自动从数据库表中读取架构并将其类型映射回 Spark SQL 类型。
您可以使用上述mytable
数据框来运行您的查询或保存数据。
假设您要选择喜欢的列然后保存
// your select query
val selectedDF = mytable.select("c1", "c2")
// now you can save above dataframe
【讨论】:
感谢您的详细解答。我得到了这个错误:java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver @Alan 您可以下载 MySQL 连接器并添加到类路径中。您甚至可以将此依赖项添加到您的项目中 -org.postgresql.Driver
作为您的驱动程序
感谢您的评论。实际上,正如我在主要帖子中提到的,我也尝试过 org.postgresql.Driver。
@Alan 您正面临与服务器的连接问题。您可以将?useSSL=false&autoReconnect=true
添加到您的JDBC URL。如果 SSL false 不起作用,您可以将其删除并尝试以上是关于从 GCP 到本地的 Scala sql 查询远程访问错误的主要内容,如果未能解决你的问题,请参考以下文章
带有presto的GCP dataproc - 有没有办法使用pyhive通过python远程运行查询?