Spark通过JDBC读取RDBMS时是不是有参数分区?

Posted

技术标签:

【中文标题】Spark通过JDBC读取RDBMS时是不是有参数分区?【英文标题】:Is there any parameter partitioning when Spark reads RDBMS through JDBC?Spark通过JDBC读取RDBMS时是否有参数分区? 【发布时间】:2019-10-16 03:24:45 【问题描述】:

当我运行spark应用进行表同步时,报错如下:

19/10/16 01:37:40 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 51)
com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
    at com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:590)
    at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:57)
    at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:1606)
    at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:633)
    at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:347)
    at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:219)
    at org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:45)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:63)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:272)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

我认为这是表中数据量大造成的。我之前用过mongo分区相关的参数,如:spark.mongodb.input.partitioner,spark.mongodb.input.partitionerOptions.partitionSizeMB 我想知道Spark在通过JDBC读取RDBMS时是否有类似的分区参数?

【问题讨论】:

ORACLE、mysql 或 MOGODB 还是其他? @thebluephantom mysql 【参考方案1】:

以下是我们在使用 spark jdbc 读取 RDBMS 表时可以使用的参数及其描述。

partitionColumn、lowerBound、upperBound - 如果指定了其中任何一个选项,则必须全部指定这些选项。此外,必须指定 numPartitions。他们描述了在从多个工作人员并行读取时如何对表进行分区。 partitionColumn 必须是相关表中的数字、日期或时间戳列。请注意,lowerBound 和 upperBound 仅用于决定分区步长,而不是用于过滤表中的行。所以表中的所有行都会被分区并返回。此选项仅适用于阅读。

numPartitions——表读写中可用于并行的最大分区数。这也决定了并发 JDBC 连接的最大数量。如果要写入的分区数超过此限制,我们会在写入前通过调用 coalesce(numPartitions) 将其减少到此限制。

fetchsize - JDBC 提取大小,它确定每次往返要提取的行数。这有助于提高默认为低读取大小的 JDBC 驱动程序的性能(例如,具有 10 行的 Oracle)。此选项仅适用于阅读。

请注意,以上所有参数应一起使用。下面是一个例子:-

  spark.read.format("jdbc").
      option("driver", driver).
      option("url",url ).
      option("partitionColumn",column name).
      option("lowerBound", 10).
      option("upperBound", 10000).
      option("numPartitions", 10).
      option("fetchsize",1000).
      option("dbtable", query).
      option("user", user).
      option("password",password).load()

【讨论】:

以上是关于Spark通过JDBC读取RDBMS时是不是有参数分区?的主要内容,如果未能解决你的问题,请参考以下文章

将 jdbc 连接传递给 spark 读取

其他让 spark 分区读取 jdbc 的方法

Spark streaming jdbc 在数据到来时读取流 - 数据源 jdbc 不支持流式读取

通过 squirrel sql 连接到 hive/spark sql 时读取超时

从 JDBC 连接读取时如何使用谓词?

Spark上的Hive如何从jdbc读取数据?