在apache spark中使用distinct时出现***错误
Posted
技术标签:
【中文标题】在apache spark中使用distinct时出现***错误【英文标题】:***error while using distinct in apache spark 【发布时间】:2017-05-12 10:14:43 【问题描述】:我使用 Spark 2.0.1。
我正在尝试在 JavaRDD 中找到不同的值,如下所示
JavaRDD<String> distinct_installedApp_Ids = filteredInstalledApp_Ids.distinct();
我看到这条线抛出了下面的异常
Exception in thread "main" java.lang.***Error
at org.apache.spark.rdd.RDD.checkpointRDD(RDD.scala:226)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:84)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:84)
..........
相同的堆栈跟踪一次又一次地重复。 输入filteredInstalledApp_Ids 具有数百万条记录的大量输入。问题是记录数还是在JavaRDD 中找到不同值的有效方法。任何帮助将非常感激。提前致谢。干杯。
编辑 1:
添加过滤方法
JavaRDD<String> filteredInstalledApp_Ids = installedApp_Ids
.filter(new Function<String, Boolean>()
@Override
public Boolean call(String v1) throws Exception
return v1 != null;
).cache();
编辑 2:
添加了用于生成installedApp_Ids的方法
public JavaRDD<String> getIdsWithInstalledApps(String inputPath, JavaSparkContext sc,
JavaRDD<String> installedApp_Ids)
JavaRDD<String> appIdsRDD = sc.textFile(inputPath);
try
JavaRDD<String> appIdsRDD1 = appIdsRDD.map(new Function<String, String>()
@Override
public String call(String t) throws Exception
String delimiter = "\t";
String[] id_Type = t.split(delimiter);
StringBuilder temp = new StringBuilder(id_Type[1]);
if ((temp.indexOf("\"")) != -1)
String escaped = temp.toString().replace("\\", "");
escaped = escaped.replace("\"", "");
escaped = escaped.replace("\"", "");
temp = new StringBuilder(escaped);
// To remove empty character in the beginning of a
// string
JSONObject wholeventObj = new JSONObject(temp.toString());
JSONObject eventJsonObj = wholeventObj.getJSONObject("eventData");
int appType = eventJsonObj.getInt("appType");
if (appType == 1)
try
return (String.valueOf(appType));
catch (JSONException e)
return null;
return null;
).cache();
if (installedApp_Ids != null)
return sc.union(installedApp_Ids, appIdsRDD1);
else
return appIdsRDD1;
catch (Exception e)
e.printStackTrace();
return null;
【问题讨论】:
你确定它是distinct
,因为它是一个转换?你如何创建filteredInstalledApp_Ids
?你执行什么操作?
@JacekLaskowski 我添加了我用来创建filteredInstalledApp_Ids的过滤方法。基本上我正在尝试从原始 RDD 中过滤空值
为什么是 RDD?为什么选择 Java? Spark 版本是什么?
我对 java 很满意。所以我正在使用java。火花版本 2.0.1。为什么是 RDD?如果有任何其他规定,我会很乐意使用。感谢你的及时回复。干杯
你能展示一个inputPath
的内容样本吗?我猜不出appType
的样子以及它所在的 JSON 的样子。
【参考方案1】:
我假设主数据集位于inputPath
。 看起来它是一个逗号分隔的文件,具有 JSON 编码的值。
我认为您可以通过结合 Spark SQL 的 DataFrames 和 from_json
函数来简化您的代码。我正在使用 Scala 并将代码转换为 Java 作为家庭练习:)
您加载inputPath
文本文件的行和解析本身的行可以很简单,如下所示:
import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
val dataset = spark.read.csv(inputPath)
您可以使用show
运算符显示内容。
dataset.show(truncate = false)
您应该会看到 JSON 编码的行。
似乎JSON 行包含eventData
和appType
字段。
val jsons = dataset.withColumn("asJson", from_json(...))
请参阅functions 对象以供参考。
使用 JSON 行,您可以选择您感兴趣的字段:
val apptypes = jsons.select("eventData.appType")
然后union
和installedApp_Ids
。
我确信代码变得更容易阅读(并且希望也能编写)。迁移将为您提供额外的优化,您可能无法使用类似汇编程序的 RDD API 编写自己的代码。
最好的是过滤空值就像使用na
运算符一样简单,它给出DataFrameNaFunctions 就像drop
。我相信你会喜欢的。
它不一定回答您最初的问题,但这个java.lang.***Error
可能只是通过进行代码迁移就可以解决,并且代码也变得更容易维护。
【讨论】:
让我试试 scala 看看问题是否得到解决。谢谢以上是关于在apache spark中使用distinct时出现***错误的主要内容,如果未能解决你的问题,请参考以下文章
使用 Spark 将函数中的多个列名传递给 dplyr::distinct()
有没有办法重写 Spark RDD distinct 以使用 mapPartitions 而不是 distinct?
在 spark java api( org.apache.spark.SparkException ) 中使用 filter(),map(),... 时出错