PySpark 中 JDBC 上的自定义分区

Posted

技术标签:

【中文标题】PySpark 中 JDBC 上的自定义分区【英文标题】:Custom partitioning on JDBC in PySpark 【发布时间】:2021-10-09 08:57:26 【问题描述】:

我想在 pyspark 中处理 oracle 数据库中有一个巨大的表。但我想使用自定义查询对其进行分区,例如假设表中有一个包含用户名的列,并且我想根据用户名的第一个字母对数据进行分区。或者假设每条记录都有一个日期,我想根据月份对其进行分区。而且由于表很大,我绝对需要每个分区的数据直接由其执行程序而不是由主服务器获取。那么我可以在 pyspark 中做到这一点吗?

P.S.:我需要控制分区的原因是我需要在每个分区上执行一些聚合(分区有意义,而不仅仅是分发数据),所以我希望它们在同一台机器上避免任何洗牌。这可能吗?还是我错了?

注意

我不关心 evenskewed 分区!我希望将所有相关记录(如用户的所有记录,或城市的所有记录等)分区在一起,以便它们驻留在同一台机器上,我可以将它们聚合在一起而无需任何洗牌。

【问题讨论】:

这能回答你的问题吗? JDBC to Spark Dataframe - How to ensure even partitioning? Master 没有获取数据。向执行者发出查询。 @thebluephantom 感谢您的建议。但我的问题不是 evenskewed 分区。我只想控制数据的分区方式,以便可以将相关记录分区在一起。例如,我可能希望将用户的所有记录分区在一起。或者将包含特定值的所有记录分区在一起。关键是我需要完全控制 在 df 读取后,您对生成的 df 进行重新分区。 @thebluephantom 这是否涉及读取然后随机播放?还是每个执行者根据我的分区只读取一次数据? 【参考方案1】:

事实证明,spark 有一种精确控制分区逻辑的方法。这就是spark.read.jdbc 中的predicates 选项。

我最终想出的如下:

(为了这个例子,假设我们有一个商店的购买记录,我们需要根据userIdproductId对它进行分区,以便将一个实体的所有记录一起保存在同一台机器,我们可以在这些实体上执行聚合而不用打乱)

首先,生成要分区的每一列的直方图(每个值的计数):
userId count
123456 1640
789012 932
345678 1849
901234 11
... ...
productId count
123456789 5435
523485447 254
363478326 2343
326484642 905
... ...
然后,使用multifit algorithm 将每列的值划分到n 平衡箱中(n 是您想要的分区数)。
userId bin
123456 1
789012 1
345678 1
901234 2
... ...
productId bin
123456789 1
523485447 2
363478326 2
326484642 3
... ...

然后,将这些存储在数据库中

然后在这些表上更新您的查询和 join 以获取每条记录的 bin 编号:

url = 'jdbc:oracle:thin:username/password@address:port:dbname'

query = ```
(SELECT
  MY_TABLE.*, 
  USER_PARTITION.BIN as USER_BIN, 
  PRODUCT_PARTITION.BIN AS PRODUCT_BIN 
FROM MY_TABLE 
LEFT JOIN USER_PARTITION 
  ON my_table.USER_ID = USER_PARTITION.USER_ID 
LEFT JOIN PRODUCT_PARTITION 
  ON my_table.PRODUCT_ID = PRODUCT_PARTITION.PRODUCT_ID) MY_QUERY```

df = spark.read\
     .option('driver', 'oracle.jdbc.driver.OracleDriver')\
     jdbc(url=url, table=query, predicates=predicates)
最后,生成谓词。每个分区一个,如下所示:
predicates = [
  'USER_BIN = 1 OR PRODUCT_BIN = 1',
  'USER_BIN = 2 OR PRODUCT_BIN = 2',
  'USER_BIN = 3 OR PRODUCT_BIN = 3',
  ...
  'USER_BIN = n OR PRODUCT_BIN = n',
]

谓词作为WHERE子句添加到查询中,这意味着分区1中用户的所有记录都到同一台机器上。此外,分区 1 中产品的所有记录也都在同一台机器上。

请注意,这里的用户和产品之间没有任何关系。我们不在乎哪些产品位于哪个分区或发送到哪台机器。 但是由于我们想要对用户和产品(分别)执行一些聚合,我们需要将一个实体(用户或产品)的所有记录保存在一起。使用这种方法,我们可以在没有任何洗牌的情况下实现这一目标。

另外,请注意,如果有一些用户或产品的记录不适合工人的记忆,那么您需要进行子分区。这意味着您应该首先向您的数据添加一个新的随机数字列(介于 0 和一些 chunk_size 之间,例如 10000 或其他东西),然后根据该数字和原始 ID(例如 userId)的组合进行分区。这会导致每个实体被分成固定大小的块(即 10000),以确保它适合工作人员的记忆。 并且在聚合之后,您需要根据原始 ID 对数据进行分组,以将所有块聚合在一起,并使每个实体再次成为一个整体。

由于我们的内存限制和数据的性质,最后的洗牌是不可避免的,但这是实现预期结果的最有效方式。

【讨论】:

以上是关于PySpark 中 JDBC 上的自定义分区的主要内容,如果未能解决你的问题,请参考以下文章

使用自定义分区器对 Pyspark 中的数据框进行分区

在 PySpark 中写入数据帧的自定义文件名

如何在 pyspark 操作中轻松使用我的自定义类方法?

Pyspark 计算 RDD 中所有向量之间的自定义距离

pyspark 数据框中的自定义排序

如何使用 PySpark 中的自定义函数在同一 ML 管道中传递分桶器?