如何保证 Spark Dataframe 中的重新分区

Posted

技术标签:

【中文标题】如何保证 Spark Dataframe 中的重新分区【英文标题】:How to guarantee repartitioning in Spark Dataframe 【发布时间】:2016-08-16 15:38:51 【问题描述】:

我对 Apache Spark 很陌生,我正在尝试按美国州重新分区数据框。然后我想将每个分区分成自己的 RDD 并保存到特定位置:

schema = types.StructType([
  types.StructField("details", types.StructType([
      types.StructField("state", types.StringType(), True)
  ]), True)
])

raw_rdd = spark_context.parallelize([
  '"details": "state": "AL"',
  '"details": "state": "AK"',
  '"details": "state": "AZ"',
  '"details": "state": "AR"',
  '"details": "state": "CA"',
  '"details": "state": "CO"',
  '"details": "state": "CT"',
  '"details": "state": "DE"',
  '"details": "state": "FL"',
  '"details": "state": "GA"'
]).map(
    lambda row: json.loads(row)
)

rdd = sql_context.createDataFrame(raw_rdd).repartition(10, "details.state").rdd

for index in range(0, rdd.getNumPartitions()):
    partition = rdd.mapPartitionsWithIndex(
        lambda partition_index, partition: partition if partition_index == index else []
    ).coalesce(1)

    if partition.count() > 0:
        df = sql_context.createDataFrame(partition, schema=schema)

        for event in df.collect():
            print "Partition 0: 1".format(index, str(event))
    else:
        print "Partition 0: No rows".format(index)

为了进行测试,我从 S3 加载了一个包含 50 行(示例中为 10 行)的文件,每行在 details.state 列中具有不同的状态。为了模仿我在上面的示例中并行化数据的行为,但行为是相同的。我得到了我要求的 50 个分区,但有些没有被使用,有些带有多个状态的条目。这是 10 个样本集的输出:

Partition 0: Row(details=Row(state=u'AK'))
Partition 1: Row(details=Row(state=u'AL'))
Partition 1: Row(details=Row(state=u'CT'))
Partition 2: Row(details=Row(state=u'CA'))
Partition 3: No rows
Partition 4: No rows
Partition 5: Row(details=Row(state=u'AZ'))
Partition 6: Row(details=Row(state=u'CO'))
Partition 6: Row(details=Row(state=u'FL'))
Partition 6: Row(details=Row(state=u'GA'))
Partition 7: Row(details=Row(state=u'AR'))
Partition 7: Row(details=Row(state=u'DE'))
Partition 8: No rows
Partition 9: No rows

我的问题:重新分区策略只是对 Spark 的建议还是我的代码存在根本性问题?

【问题讨论】:

【参考方案1】:

这里没有什么意外的事情发生。 Spark 使用分区键(正)的哈希模数分区在分区之间分配行,如果有 50 个分区,您将获得大量重复:

from pyspark.sql.functions import expr

states = sc.parallelize([
    "AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DC", "DE", "FL", "GA", 
    "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD", 
    "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", 
    "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC", 
    "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY"
])

states_df = states.map(lambda x: (x, )).toDF(["state"])

states_df.select(expr("pmod(hash(state), 50)")).distinct().count()
# 26

如果您想在写入时分隔文件,最好将partitionBy 子句用于DataFrameWriter。它将为每个级别创建单独的输出,并且不需要改组。

如果你真的想要完全重新分区,你可以使用 RDD API,它允许你使用自定义分区器。

【讨论】:

以上是关于如何保证 Spark Dataframe 中的重新分区的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark 中使用 Python 查找 DataFrame 中的分区数以及如何在 Spark 中使用 Python 在 DataFrame 中创建分区

Spark中的DataFrame是什么?以及如何构建DataFrame?(附案例)

如何从 Spark 2.0 中的 DataFrame 列创建数据集?

Apache Spark:如何使用 Java 在 dataFrame 中的空值列中插入数据

如何从 Scala 中的 DataFrame 在 Spark 中创建分布式稀疏矩阵

Spark中的最佳重新分区方式