将 Dataframe 转换为 RDD 减少了分区

Posted

技术标签:

【中文标题】将 Dataframe 转换为 RDD 减少了分区【英文标题】:Converting Dataframe to RDD reduces partitions 【发布时间】:2016-12-23 12:04:17 【问题描述】:

在我们的代码中,Dataframe 被创建为:

DataFrame DF = hiveContext.sql("select * from table_instance");

当我将数据帧转换为 rdd 并尝试获取其分区数时

RDD<Row> newRDD = Df.rdd();
System.out.println(newRDD.getNumPartitions());

它将分区的数量减少到 1(1 打印在控制台中)。最初我的数据框有 102 个分区。

更新:

在阅读时,我重新分配了数据框:

DataFrame DF = hiveContext.sql("select * from table_instance").repartition(200);

然后转换为 rdd ,所以它只给了我 200 个分区。 有没有

JavaSparkContext

在这方面有什么作用吗?当我们将数据帧转换为 rdd 时,默认最小分区标志是否也在 spark 上下文级别考虑?

更新:

我制作了一个单独的示例程序,其中我将完全相同的表读入数据帧并转换为 rdd。没有为 RDD 转换创建额外的阶段,并且分区计数也是正确的。我现在想知道我在主程序中做了什么不同。

如果我的理解有误,请告诉我。

【问题讨论】:

【参考方案1】:

它基本上取决于hiveContext.sql()的实现。由于我是 Hive 新手,我的猜测是 hiveContext.sql 不知道 OR 无法拆分表中存在的数据。

例如,当您从 HDFS 读取文本文件时,spark 上下文会考虑该文件使用的块数来确定分区。

您对repartition 所做的操作是解决此类问题的明显方法。(注意:如果不使用适当的分区器,重新分区可能会导致随机操作,默认使用哈希分区器)

引起您的怀疑,hiveContext 可能会考虑默认的最小分区属性。但是,依赖默认属性不会 解决你所有的问题。例如,如果您的 hive 表的大小增加,您的程序仍然使用默认的分区数。

更新:重新分区期间避免随机播放

定义您的自定义分区器:

public class MyPartitioner extends HashPartitioner 
    private final int partitions;
    public MyPartitioner(int partitions) 
        super();
        this.partitions = partitions;
    
    @Override
    public int numPartitions() 
        return this.partitions;
    

    @Override
    public int getPartition(Object key) 
        if (key instanceof String) 
            return super.getPartition(key);
         else if (key instanceof Integer) 
            return (Integer.valueOf(key.toString()) % this.partitions);
         else if (key instanceof Long) 
            return (int)(Long.valueOf(key.toString()) % this.partitions);
        
        //TOD ... add more types
    

使用您的自定义分区器:

JavaPairRDD<Long, SparkDatoinDoc> pairRdd = hiveContext.sql("select *   from table_instance")
.mapToPair( //TODO ... expose the column as key)

rdd = rdd.partitionBy(new MyPartitioner(200));
//... rest of processing

【讨论】:

感谢@code 的回复。我一直在这个问题上停留了一段时间。来自 hivecontext.sql() 的分区被读取为 102。我对数据帧启动了计数操作,并了解到有 102 个任务正在启动,因此有 102 个分区。现在,当我进行重新分区时,会引起很多洗牌。我想根据某些列进行重新分区,请提出一种可以进行最小洗牌的重新分区技术 您能解释一下它是什么类型的列以及它的值范围吗? 这基本上是由用户决定的。可以是 string , long , bigint 任何数据类型的列都可以存在 不能像这样重新分区数据框。其次 hivecontext.sql 返回一个数据框而不是一对 rdd 。为了使用它,我必须将数据帧转换为 rdd,然后在这个 rdd 上调用这个自定义分区器,并使用一个分区,从而启动一个任务。它会再次给我带来内存问题。 由于我是hive的新手,所以我评论了将dataframe转换为rdd的部分。此外,一旦将数据框转换为 rdd,您需要在其上运行 mapToPair() 并选择任何列作为键,整行作为值。根据我的理解,上面的代码不会出现内存问题,因为 spark 可以管道分区过程。请运行代码并确认。

以上是关于将 Dataframe 转换为 RDD 减少了分区的主要内容,如果未能解决你的问题,请参考以下文章

在 RDD 转换上保留 Spark DataFrame 列分区

将 RDD 转换为 DataFrame 并再次转换回来的开销是多少?

将 DataFrame 转换为 RDD 并将 RDD 动态拆分为与 DataFrame 相同数量的 Columns

将 RDD 转换为 Dataframe Spark

将 RDD 转换为 DataFrame Spark Streaming 时的 ClassCastException

为啥 list 应该先转换为 RDD 再转换为 Dataframe?有啥方法可以将列表转换为数据框?