Apache Spark SQL 上下文删除重复项

Posted

技术标签:

【中文标题】Apache Spark SQL 上下文删除重复项【英文标题】:Apache Spark SQL context dropDuplicates 【发布时间】:2016-08-01 17:21:45 【问题描述】:

我正在尝试使用 Spark 的 1.5 方法 dropDuplicates() 过滤 DataFrame 内容。 将它与完全填充数据的表(我的意思是没有空单元格)一起使用会得到正确的结果,但是当我的 CSV 源包含空单元格时(我会为您提供源文件) - Spark 抛出 ArrayIndexOutOfBoundsException。 我究竟做错了什么?我已经阅读了版本 1.6.2 的 Spark SQL 和 DataFrames 教程,它没有详细描述 DataFrame 操作。我也在阅读“Learning Spark。Lightning-Fast Big Data Analysis。”一书,但它是为 Spark 1.5 编写的,我需要的操作没有在那里描述。我很高兴得到手册链接的解释。 谢谢。

    package data;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;

import java.util.Arrays;

public class TestDrop 
    public static void main(String[] args) 
        DropData dropData = new DropData("src/main/resources/distinct-test.csv");
        dropData.execute();
    


class DropData

    private String csvPath;
    private JavaSparkContext sparkContext;
    private SQLContext sqlContext;

    DropData(String csvPath) 
        this.csvPath = csvPath;
    

    void execute()
        initContext();
        DataFrame dataFrame = loadDataFrame();
        dataFrame.show();
        dataFrame.dropDuplicates(new String[]"surname").show();
        //this one fails too: dataFrame.drop("surname")
    

    private void initContext() 
        sparkContext = new JavaSparkContext(new SparkConf().setMaster("local[4]").setAppName("Drop test"));
        sqlContext = new SQLContext(sparkContext);
    

    private DataFrame loadDataFrame() 
        JavaRDD<String> strings = sparkContext.textFile(csvPath);

        JavaRDD<Row> rows = strings.map(string -> 
            String[] cols = string.split(",");
            return RowFactory.create(cols);
        );

        StructType st = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, false),
                DataTypes.createStructField("surname", DataTypes.StringType, true),
                DataTypes.createStructField("age", DataTypes.StringType, true),
                DataTypes.createStructField("sex", DataTypes.StringType, true),
                DataTypes.createStructField("socialId", DataTypes.StringType, true)));

        return sqlContext.createDataFrame(rows, st);
    

【问题讨论】:

那么您还有什么期待?您声明了一些字段,如果它不匹配您会得到异常。这是预期的行为。只需过滤掉格式错误的数据。 你是什么意思它没有?我有“姓”列。根据本专栏,我希望 Spark 过滤重复的行,如 JavaDoc 中所写。顺便说一句,这是我的csv file 【参考方案1】:

发送 List 而不是 Object[] 结果作为创建行,其中包含 1 列,其中包含一个列表。那是我做错了。

【讨论】:

以上是关于Apache Spark SQL 上下文删除重复项的主要内容,如果未能解决你的问题,请参考以下文章

PySpark DataFrame 无法删除重复项

窗口函数 partitionBy 在列表上

Apache Spark 如何检测重复项?可以修改吗?

为啥我的“org.apache.spark.sql”下面没有“SparkSession”[重复]

Spark - 在json数组字段中删除重复项[重复]

Spark SQL cassandra 删除记录