通过 JDBC 进行并行化 - Pyspark - 并行化如何使用 JDBC 工作?

Posted

技术标签:

【中文标题】通过 JDBC 进行并行化 - Pyspark - 并行化如何使用 JDBC 工作?【英文标题】:Parallelization via JDBC - Pyspark - How does parallelization work using JDBC? 【发布时间】:2018-03-13 09:46:51 【问题描述】:

如何使用JDBC 进行并行化?

这是我的代码:

spark = SparkSession.builder.getOrCreate()
DF    = spark.read.jdbc( url           =  ...,
                         table         = '...',
                         column        = 'XXXX',
                         lowerBound    =  Z,
                         upperBound    =  Y,
                         numPartitions = K
                         )

我想知道以下参数之间的关系以及是否有办法正确选择它们:

    column -> 它应该是为分区选择的列(它需要是数字列吗?) lowerBound -> 选择它有经验法则吗? upperBound -> 选择它有经验法则吗? numPartitions -> 选择它有经验法则吗?

我明白了

stride = ( upperBound / numPartitions ) - ( lowerBound / numPartitions )

每个分区中是否有很多“步幅”?

换句话说,在所有观察完成之前,分区是否充满了一大堆步幅?

请看this picture 了解问题的意义,考虑以下参数:

 lowerBound     80.000
 upperBound    180.000
 numPartitions       8
 Stride         12.500

注意:

 min('XXXX')      =           0
 max('XXXX')      =     350.000
 ('XXXX').count() = 500.000.000

附:我看了documentation和this的答案,但不是很明白。

【问题讨论】:

【参考方案1】:

    是的,根据文档,它 必须是数字列。为什么?因为否则您无法计算 stride 即 (upperBound - lowerBound) / numPartitions = 12.500(items per partition)

    我认为如果列已经是数据库中的索引列,那将是理想的,因为您需要尽快检索这些记录。那么upperBound 和lowerBound 应该是您要检索到spark 中的数据的边界(例如,考虑column=id 那么您需要的数据可能是id between 1 and max(id))。

    对于所有情况,正确的 numPartitions 是一个难以准确处理的主题。不过要注意的一个经典问题是连接池的大小。例如,您应该避免创建比池可以处理的更多的并行连接。当然并行连接的数量与分区的数量直接相关。例如,如果您有 8 个最大分区,则确保最大并行连接数也是 8。有关如何为 numPartitions 选择正确值的更多信息,您可以查看this

祝你好运

【讨论】:

您好 Alexandros,感谢您提供的答案!查看第 2 点,您说“[...] upperBound 和 lowerBound 应该是要检索到 spark [...] 中的数据的边界”。那么,如果我设置 id between 1 and max(id) / 2 我将有一个 .count() 是整个数据库的 1/2,这是否正确?非常感谢! 这取决于你的 ids @Franco 的分布。如果每次插入时 id 增加一,那么 yes 是安全的,尽管许多数据库不确保在下一次插入期间 id 将等于 new_id = max(id) + 1 因为他们希望避免这种额外的开销。所以我想说你需要检查一下才能确定。例如,您可以检查 max(id) - min(id) ~ count(*) if 是否完全相同,那么是的,您可以安全地做出假设。如果足够接近,您可以根据需要的准确度自行决定 所以,考虑以下场景:min('XXXX') = 0max('XXXX') = 350.000('XXXX').count() = 500.000.000 你会根据值(0 ~ 350.000)还是根据我有 500.000 的事实来选择 lowerBound 和 upperBound .000 个索引? 在这种情况下你不能做出这个假设。让我们再举一个例子,假设您有桌子公司。该表有一个名为 id 的 PK,那么如果该表上有 100.000 条记录,则 id 必须至少为 100.000。所以在你发给我的例子中,做这个假设是没有意义的。换句话说,在您的最后一条评论中,计数和列之间没有线性关系。如果可能,正在使用的列必须是键,否则您必须重新考虑使用 numPartitions 在我的数据库中,我没有键(索引)列。我只有 3 个数字列。考虑到以下情况,您将如何进行并行化操作min('XXXX') = 0max('XXXX') = 350.000('XXXX').count() = 500.000.000?

以上是关于通过 JDBC 进行并行化 - Pyspark - 并行化如何使用 JDBC 工作?的主要内容,如果未能解决你的问题,请参考以下文章

SqlContext 导入和并行化 Pyspark 中的错误

Pyspark:如何在 HDFS 中并行化多 gz 文件处理

如何使 pyspark 作业在多个节点上正确并行化并避免内存问题?

如何使用 PySpark 并行化我的文件处理程序

PySpark:当通过 JDBC 在 Oracle 中创建表时,为啥我会得到“没有为类 oracle.jdbc.driver.T4CRowidAccessor 实现 getLong”?

熊猫平行适用于考拉(pyspark)