通过 JDBC 进行并行化 - Pyspark - 并行化如何使用 JDBC 工作?
Posted
技术标签:
【中文标题】通过 JDBC 进行并行化 - Pyspark - 并行化如何使用 JDBC 工作?【英文标题】:Parallelization via JDBC - Pyspark - How does parallelization work using JDBC? 【发布时间】:2018-03-13 09:46:51 【问题描述】:如何使用JDBC
进行并行化?
这是我的代码:
spark = SparkSession.builder.getOrCreate()
DF = spark.read.jdbc( url = ...,
table = '...',
column = 'XXXX',
lowerBound = Z,
upperBound = Y,
numPartitions = K
)
我想知道以下参数之间的关系以及是否有办法正确选择它们:
column
-> 它应该是为分区选择的列(它需要是数字列吗?)
lowerBound
-> 选择它有经验法则吗?
upperBound
-> 选择它有经验法则吗?
numPartitions
-> 选择它有经验法则吗?
我明白了
stride = ( upperBound / numPartitions ) - ( lowerBound / numPartitions )
每个分区中是否有很多“步幅”?
换句话说,在所有观察完成之前,分区是否充满了一大堆步幅?
请看this picture 了解问题的意义,考虑以下参数:
lowerBound 80.000
upperBound 180.000
numPartitions 8
Stride 12.500
注意:
min('XXXX') = 0
max('XXXX') = 350.000
('XXXX').count() = 500.000.000
附:我看了documentation和this的答案,但不是很明白。
【问题讨论】:
【参考方案1】:是的,根据文档,它 列 必须是数字列。为什么?因为否则您无法计算 stride 即 (upperBound - lowerBound) / numPartitions = 12.500(items per partition)
我认为如果列已经是数据库中的索引列,那将是理想的,因为您需要尽快检索这些记录。那么upperBound 和lowerBound 应该是您要检索到spark 中的数据的边界(例如,考虑column=id 那么您需要的数据可能是id between 1 and max(id)
)。
对于所有情况,正确的 numPartitions 是一个难以准确处理的主题。不过要注意的一个经典问题是连接池的大小。例如,您应该避免创建比池可以处理的更多的并行连接。当然并行连接的数量与分区的数量直接相关。例如,如果您有 8 个最大分区,则确保最大并行连接数也是 8。有关如何为 numPartitions 选择正确值的更多信息,您可以查看this
祝你好运
【讨论】:
您好 Alexandros,感谢您提供的答案!查看第 2 点,您说“[...] upperBound 和 lowerBound 应该是要检索到 spark [...] 中的数据的边界”。那么,如果我设置id between 1 and max(id) / 2
我将有一个 .count()
是整个数据库的 1/2,这是否正确?非常感谢!
这取决于你的 ids @Franco 的分布。如果每次插入时 id 增加一,那么 yes 是安全的,尽管许多数据库不确保在下一次插入期间 id 将等于 new_id = max(id) + 1 因为他们希望避免这种额外的开销。所以我想说你需要检查一下才能确定。例如,您可以检查 max(id) - min(id) ~ count(*) if 是否完全相同,那么是的,您可以安全地做出假设。如果足够接近,您可以根据需要的准确度自行决定
所以,考虑以下场景:min('XXXX') = 0
max('XXXX') = 350.000
('XXXX').count() = 500.000.000
你会根据值(0 ~ 350.000)还是根据我有 500.000 的事实来选择 lowerBound 和 upperBound .000 个索引?
在这种情况下你不能做出这个假设。让我们再举一个例子,假设您有桌子公司。该表有一个名为 id 的 PK,那么如果该表上有 100.000 条记录,则 id 必须至少为 100.000。所以在你发给我的例子中,做这个假设是没有意义的。换句话说,在您的最后一条评论中,计数和列之间没有线性关系。如果可能,正在使用的列必须是键,否则您必须重新考虑使用 numPartitions
在我的数据库中,我没有键(索引)列。我只有 3 个数字列。考虑到以下情况,您将如何进行并行化操作min('XXXX') = 0
max('XXXX') = 350.000
('XXXX').count() = 500.000.000?
以上是关于通过 JDBC 进行并行化 - Pyspark - 并行化如何使用 JDBC 工作?的主要内容,如果未能解决你的问题,请参考以下文章
SqlContext 导入和并行化 Pyspark 中的错误
Pyspark:如何在 HDFS 中并行化多 gz 文件处理
如何使 pyspark 作业在多个节点上正确并行化并避免内存问题?
PySpark:当通过 JDBC 在 Oracle 中创建表时,为啥我会得到“没有为类 oracle.jdbc.driver.T4CRowidAccessor 实现 getLong”?