pyspark 负载分布不均匀,零件尺寸增加一倍

Posted

技术标签:

【中文标题】pyspark 负载分布不均匀,零件尺寸增加一倍【英文标题】:pyspark distributes load unevenly with parts increasing in double size 【发布时间】:2016-07-25 12:46:28 【问题描述】:

我的 pyspark 进程的输出部分大小不均,但可以预见的是具有 n**2 模式(0、1、2、4、8、16 等)。这是我的过程:

我像这样从 Google BigQuery 加载数据:

dConf = 
    "mapred.bq.project.id": project_id,
    "mapred.bq.gcs.bucket": bucket,
    "mapred.bq.input.project.id": project_id,
    "mapred.bq.input.dataset.id":dataset_id,
    "mapred.bq.input.table.id": table_id


rdd_dataset_raw = sc.newAPIHadoopRDD(
    "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
    "org.apache.hadoop.io.LongWritable",
    "com.google.gson.JsonObject",
    conf=dConf
)

谁的输出看起来像这样(rdd_dataset_raw.take(2)):

[(0, u'"group_id":"1","pertubations":"Current Affairs,Sport,Technology"'), 
(67, u'"group_id":"2","pertubations":"Current Affairs,Sport,Celeb Gossip"')]

一些琐碎的处理,重新分区:

rdd_dataset = (
    rdd_dataset_raw
    .repartition(nr_partitions)
    .map(lambda t, json=json: json.loads(t[1]))
)

看起来像这样:

[u'group_id': u'1', u'pertubations': u'Current Affairs,Sport,Technology', 
u'group_id': u'2', u'pertubations': u'Current Affairs,Sport,Celeb Gossip']

当我将 RDD 保存到 Google 存储时:

rdd_dataset_raw.saveAsTextFile("gs://bucket/directory")

这将创建nr_partitions 零件文件。

但是,这些零件文件的大小并不均匀。它们在n**2 中增加,其中 n 是零件文件编号。换句话说,

part-00000 包含 0 行part-00001 包含 1 行part-00002 包含 2 行part-00003 包含 4 行part-00004 包含 8 行 等等

其中大部分也几乎立即完成,后面的部分内存不足。

这是怎么回事!?如何让分区负载均衡?

【问题讨论】:

【参考方案1】:

就像用partitionBy替换repartition一样简单:

rdd_dataset = (
    rdd_dataset_raw
    .partitionBy(nr_partitions)
    .map(lambda t, json=json: json.loads(t[1]))
)

请注意,这需要尽早完成。传递未分区的 rdd,然后分区失败。

Docs

【讨论】:

以上是关于pyspark 负载分布不均匀,零件尺寸增加一倍的主要内容,如果未能解决你的问题,请参考以下文章

pyspark 将负载均匀地分配给所有执行程序

均匀分布的标准偏差是区间半宽的多少倍

根据间隔pyspark中的记录数增加一列

在动态空间中均匀分布多个项目

R - 为啥向数据表添加 1 列几乎会使使用的峰值内存增加一倍?

使用“H * * * *”而不是“5 * * * *”来均匀分布负载