将 Dataframe 转换为 RDD 减少了分区
Posted
技术标签:
【中文标题】将 Dataframe 转换为 RDD 减少了分区【英文标题】:Converting Dataframe to RDD reduces partitions 【发布时间】:2016-12-23 12:04:17 【问题描述】:在我们的代码中,Dataframe 被创建为:
DataFrame DF = hiveContext.sql("select * from table_instance");
当我将数据帧转换为 rdd 并尝试获取其分区数时
RDD<Row> newRDD = Df.rdd();
System.out.println(newRDD.getNumPartitions());
它将分区的数量减少到 1(1 打印在控制台中)。最初我的数据框有 102 个分区。
更新:
在阅读时,我重新分配了数据框:
DataFrame DF = hiveContext.sql("select * from table_instance").repartition(200);
然后转换为 rdd ,所以它只给了我 200 个分区。 有没有
JavaSparkContext
在这方面有什么作用吗?当我们将数据帧转换为 rdd 时,默认最小分区标志是否也在 spark 上下文级别考虑?
更新:
我制作了一个单独的示例程序,其中我将完全相同的表读入数据帧并转换为 rdd。没有为 RDD 转换创建额外的阶段,并且分区计数也是正确的。我现在想知道我在主程序中做了什么不同。
如果我的理解有误,请告诉我。
【问题讨论】:
【参考方案1】:它基本上取决于hiveContext.sql()
的实现。由于我是 Hive 新手,我的猜测是 hiveContext.sql
不知道 OR 无法拆分表中存在的数据。
例如,当您从 HDFS 读取文本文件时,spark 上下文会考虑该文件使用的块数来确定分区。
您对repartition
所做的操作是解决此类问题的明显方法。(注意:如果不使用适当的分区器,重新分区可能会导致随机操作,默认使用哈希分区器)
引起您的怀疑,hiveContext 可能会考虑默认的最小分区属性。但是,依赖默认属性不会 解决你所有的问题。例如,如果您的 hive 表的大小增加,您的程序仍然使用默认的分区数。
更新:重新分区期间避免随机播放
定义您的自定义分区器:
public class MyPartitioner extends HashPartitioner
private final int partitions;
public MyPartitioner(int partitions)
super();
this.partitions = partitions;
@Override
public int numPartitions()
return this.partitions;
@Override
public int getPartition(Object key)
if (key instanceof String)
return super.getPartition(key);
else if (key instanceof Integer)
return (Integer.valueOf(key.toString()) % this.partitions);
else if (key instanceof Long)
return (int)(Long.valueOf(key.toString()) % this.partitions);
//TOD ... add more types
使用您的自定义分区器:
JavaPairRDD<Long, SparkDatoinDoc> pairRdd = hiveContext.sql("select * from table_instance")
.mapToPair( //TODO ... expose the column as key)
rdd = rdd.partitionBy(new MyPartitioner(200));
//... rest of processing
【讨论】:
感谢@code 的回复。我一直在这个问题上停留了一段时间。来自 hivecontext.sql() 的分区被读取为 102。我对数据帧启动了计数操作,并了解到有 102 个任务正在启动,因此有 102 个分区。现在,当我进行重新分区时,会引起很多洗牌。我想根据某些列进行重新分区,请提出一种可以进行最小洗牌的重新分区技术 您能解释一下它是什么类型的列以及它的值范围吗? 这基本上是由用户决定的。可以是 string , long , bigint 任何数据类型的列都可以存在 不能像这样重新分区数据框。其次 hivecontext.sql 返回一个数据框而不是一对 rdd 。为了使用它,我必须将数据帧转换为 rdd,然后在这个 rdd 上调用这个自定义分区器,并使用一个分区,从而启动一个任务。它会再次给我带来内存问题。 由于我是hive的新手,所以我评论了将dataframe转换为rdd的部分。此外,一旦将数据框转换为 rdd,您需要在其上运行mapToPair()
并选择任何列作为键,整行作为值。根据我的理解,上面的代码不会出现内存问题,因为 spark 可以管道分区过程。请运行代码并确认。以上是关于将 Dataframe 转换为 RDD 减少了分区的主要内容,如果未能解决你的问题,请参考以下文章
在 RDD 转换上保留 Spark DataFrame 列分区
将 RDD 转换为 DataFrame 并再次转换回来的开销是多少?
将 DataFrame 转换为 RDD 并将 RDD 动态拆分为与 DataFrame 相同数量的 Columns