如何在foreach中序列化jdbc连接以进行火花节点分布

Posted

技术标签:

【中文标题】如何在foreach中序列化jdbc连接以进行火花节点分布【英文标题】:How to serialize jdbc connection for spark node distrobution in a foreach 【发布时间】:2014-09-10 05:11:11 【问题描述】:

我的最终目标是让 apache spark 使用 jdbc 连接到 mysql 数据库,以便在 scala 中传输映射的 RDD 数据。这样做会导致一个错误,解释我正在使用的简单 jdbc 代码无法序列化。如何让 jdbc 类被序列化?

【问题讨论】:

而不是序列化它只是为每个映射器创建一个新的 【参考方案1】:

JDBC 连接对象与特定的 TCP 连接和套接字端口相关联,因此无法序列化。因此,您应该在远程执行程序 JVM 进程中创建 JDBC 连接,而不是在驱动程序 JVM 进程中。

实现此目的的一种方法是将连接对象作为 Scala 中的单例对象中的字段(或 Java 中的静态字段),如下所示。在下面的 sn-p 语句中,val session = ExecutorSingleton.session 语句没有在驱动程序中执行,但该语句被发送到执行它的执行器。

case class ConnectionProfile(host: String, username: String, password: String)

object ExecutorSingleton 
  var profile: ConnectionProfile = _
  lazy val session = createConnection(profile)
  def createJDBCSession(profile: ConnectionProfile) =  ... 


rdd.foreachPartition 
    msgIterator => 
      ExecutorSingleton.profile = ConnectionProfile("host", "username", "password")
      msgIterator.foreach msg =>
        val session = ExecutorSingleton.session
        session.execute(msg)
      
    

【讨论】:

【参考方案2】:

通常,驱动程序中的 DB 会话不能被序列化,因为它涉及线程并打开与底层 DB 的 TCP 连接。

正如@aaronman 所提到的,目前最简单的方法是将驱动程序连接的创建包含在分区 foreach 的闭包中。这样您就不会遇到驱动程序的序列化问题。

这是如何做到这一点的框架代码:

rdd.foreachPartition 
    msgIterator => 
      val cluster = Cluster.builder.addContactPoint(host).build()
      val session  = cluster.connect(db)
      msgIterator.foreach msg =>
        ...
        session.execute(statement)
      
      session.close
    

随着 SparkSQL 的不断发展,我预计未来会改进对数据库连接的支持。例如,DataStax 创建了一个 Cassandra-Spark 驱动程序,该驱动程序以一种有效的方式抽象出每个工作人员的连接创建,从而提高了资源使用率。

还可以查看JdbcRDD,它将连接处理添加为函数(在工作人员上执行)

【讨论】:

以上是关于如何在foreach中序列化jdbc连接以进行火花节点分布的主要内容,如果未能解决你的问题,请参考以下文章

如何在火花中使用 foreach 访问数组?

如何在火花中使用地图而不实现可序列化?

火花流foreach多个作家

更改 jdbc 代码以保持连接

如何使用 JDBC 从 Oracle 读取数据集?

如何在火花中连接多个列,同时从另一个表中连接列名(每行不同)