如何在foreach中序列化jdbc连接以进行火花节点分布
Posted
技术标签:
【中文标题】如何在foreach中序列化jdbc连接以进行火花节点分布【英文标题】:How to serialize jdbc connection for spark node distrobution in a foreach 【发布时间】:2014-09-10 05:11:11 【问题描述】:我的最终目标是让 apache spark 使用 jdbc 连接到 mysql 数据库,以便在 scala 中传输映射的 RDD 数据。这样做会导致一个错误,解释我正在使用的简单 jdbc 代码无法序列化。如何让 jdbc 类被序列化?
【问题讨论】:
而不是序列化它只是为每个映射器创建一个新的 【参考方案1】:JDBC 连接对象与特定的 TCP 连接和套接字端口相关联,因此无法序列化。因此,您应该在远程执行程序 JVM 进程中创建 JDBC 连接,而不是在驱动程序 JVM 进程中。
实现此目的的一种方法是将连接对象作为 Scala 中的单例对象中的字段(或 Java 中的静态字段),如下所示。在下面的 sn-p 语句中,val session = ExecutorSingleton.session
语句没有在驱动程序中执行,但该语句被发送到执行它的执行器。
case class ConnectionProfile(host: String, username: String, password: String)
object ExecutorSingleton
var profile: ConnectionProfile = _
lazy val session = createConnection(profile)
def createJDBCSession(profile: ConnectionProfile) = ...
rdd.foreachPartition
msgIterator =>
ExecutorSingleton.profile = ConnectionProfile("host", "username", "password")
msgIterator.foreach msg =>
val session = ExecutorSingleton.session
session.execute(msg)
【讨论】:
【参考方案2】:通常,驱动程序中的 DB 会话不能被序列化,因为它涉及线程并打开与底层 DB 的 TCP 连接。
正如@aaronman 所提到的,目前最简单的方法是将驱动程序连接的创建包含在分区 foreach 的闭包中。这样您就不会遇到驱动程序的序列化问题。
这是如何做到这一点的框架代码:
rdd.foreachPartition
msgIterator =>
val cluster = Cluster.builder.addContactPoint(host).build()
val session = cluster.connect(db)
msgIterator.foreach msg =>
...
session.execute(statement)
session.close
随着 SparkSQL 的不断发展,我预计未来会改进对数据库连接的支持。例如,DataStax 创建了一个 Cassandra-Spark 驱动程序,该驱动程序以一种有效的方式抽象出每个工作人员的连接创建,从而提高了资源使用率。
还可以查看JdbcRDD,它将连接处理添加为函数(在工作人员上执行)
【讨论】:
以上是关于如何在foreach中序列化jdbc连接以进行火花节点分布的主要内容,如果未能解决你的问题,请参考以下文章