在apache spark中使用distinct时出现***错误

Posted

技术标签:

【中文标题】在apache spark中使用distinct时出现***错误【英文标题】:***error while using distinct in apache spark 【发布时间】:2017-05-12 10:14:43 【问题描述】:

我使用 Spark 2.0.1

我正在尝试在 JavaRDD 中找到不同的值,如下所示

JavaRDD<String> distinct_installedApp_Ids = filteredInstalledApp_Ids.distinct();

我看到这条线抛出了下面的异常

Exception in thread "main" java.lang.***Error
    at org.apache.spark.rdd.RDD.checkpointRDD(RDD.scala:226)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:84)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84)
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84)
   ..........

相同的堆栈跟踪一次又一次地重复。 输入filteredInstalledApp_Ids 具有数百万条记录的大量输入。问题是记录数还是在JavaRDD 中找到不同值的有效方法。任何帮助将非常感激。提前致谢。干杯。

编辑 1:

添加过滤方法

JavaRDD<String> filteredInstalledApp_Ids = installedApp_Ids
        .filter(new Function<String, Boolean>() 
        @Override
          public Boolean call(String v1) throws Exception 
                return v1 != null;
            
          ).cache();

编辑 2:

添加了用于生成installedApp_Ids的方法

 public JavaRDD<String> getIdsWithInstalledApps(String inputPath, JavaSparkContext sc,
        JavaRDD<String> installedApp_Ids) 

    JavaRDD<String> appIdsRDD = sc.textFile(inputPath);
    try 
        JavaRDD<String> appIdsRDD1 = appIdsRDD.map(new Function<String, String>() 
            @Override
            public String call(String t) throws Exception 
                String delimiter = "\t";
                String[] id_Type = t.split(delimiter);
                StringBuilder temp = new StringBuilder(id_Type[1]);
                if ((temp.indexOf("\"")) != -1) 
                    String escaped = temp.toString().replace("\\", "");
                    escaped = escaped.replace("\"", "");
                    escaped = escaped.replace("\"", "");
                    temp = new StringBuilder(escaped);
                
                // To remove empty character in the beginning of a
                // string
                JSONObject wholeventObj = new JSONObject(temp.toString());
                JSONObject eventJsonObj = wholeventObj.getJSONObject("eventData");
                int appType = eventJsonObj.getInt("appType");
                if (appType == 1) 
                    try                            
                        return (String.valueOf(appType));
                     catch (JSONException e) 
                        return null;
                    
                
                return null;
            
        ).cache();
        if (installedApp_Ids != null)
            return sc.union(installedApp_Ids, appIdsRDD1);
        else
            return appIdsRDD1;
     catch (Exception e) 
        e.printStackTrace();
    
    return null;

【问题讨论】:

你确定它是distinct,因为它是一个转换?你如何创建filteredInstalledApp_Ids?你执行什么操作? @JacekLaskowski 我添加了我用来创建filteredInstalledApp_Ids的过滤方法。基本上我正在尝试从原始 RDD 中过滤空值 为什么是 RDD?为什么选择 Java? Spark 版本是什么? 我对 java 很满意。所以我正在使用java。火花版本 2.0.1。为什么是 RDD?如果有任何其他规定,我会很乐意使用。感谢你的及时回复。干杯 你能展示一个inputPath的内容样本吗?我猜不出appType 的样子以及它所在的 JSON 的样子。 【参考方案1】:

我假设主数据集位于inputPath看起来它是一个逗号分隔的文件,具有 JSON 编码的值。

我认为您可以通过结合 Spark SQL 的 DataFrames 和 from_json 函数来简化您的代码。我正在使用 Scala 并将代码转换为 Java 作为家庭练习:)

您加载inputPath 文本文件的行和解析本身的行可以很简单,如下所示:

import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
val dataset = spark.read.csv(inputPath)

您可以使用show 运算符显示内容。

dataset.show(truncate = false)

您应该会看到 JSON 编码的行。

似乎JSON 行包含eventDataappType 字段。

val jsons = dataset.withColumn("asJson", from_json(...))

请参阅functions 对象以供参考。

使用 JSON 行,您可以选择您感兴趣的字段:

val apptypes = jsons.select("eventData.appType")

然后unioninstalledApp_Ids

我确信代码变得更容易阅读(并且希望也能编写)。迁移将为您提供额外的优化,您可能无法使用类似汇编程序的 RDD API 编写自己的代码。

最好的是过滤空值就像使用na 运算符一样简单,它给出DataFrameNaFunctions 就像drop。我相信你会喜欢的。


它不一定回答您最初的问题,但这个java.lang.***Error 可能只是通过进行代码迁移就可以解决,并且代码也变得更容易维护。

【讨论】:

让我试试 scala 看看问题是否得到解决。谢谢

以上是关于在apache spark中使用distinct时出现***错误的主要内容,如果未能解决你的问题,请参考以下文章

使用 Spark 将函数中的多个列名传递给 dplyr::distinct()

Spark 中 Distinct() 函数是如何工作的?

Spark SQL .distinct()性能

有没有办法重写 Spark RDD distinct 以使用 mapPartitions 而不是 distinct?

在 spark java api( org.apache.spark.SparkException ) 中使用 filter(),map(),... 时出错

Spark distinct算子