Spark 中的 JavaRDD<String>.foreach 之后 Arraylist 为空

Posted

技术标签:

【中文标题】Spark 中的 JavaRDD<String>.foreach 之后 Arraylist 为空【英文标题】:Arraylist is empty after JavaRDD<String>.foreach in spark 【发布时间】:2017-04-24 15:46:19 【问题描述】:

示例 json(总共 100 条记录):

"name":"dev","salary":10000,"occupation":"engg","address":"noida" "name":"karthik","salary":20000,"occupation":"engg","address":"noida"

有用的代码:

   final List<Map<String,String>> jsonData = new ArrayList<>();

   DataFrame df =  sqlContext.read().json("file:///home/dev/data-json/emp.json");
   JavaRDD<String> rdd = df.repartition(1).toJSON().toJavaRDD(); 

   rdd.foreach(new VoidFunction<String>() 
       @Override
       public void call(String line)  
           try 
               jsonData.add (new ObjectMapper().readValue(line, Map.class));
               System.out.println(Thread.currentThread().getName());
               System.out.println("List size: "+jsonData.size());
            catch (IOException e) 
               e.printStackTrace();
           
       
   );

   System.out.println(Thread.currentThread().getName());
   System.out.println("List size: "+jsonData.size());

jsonData 最后是空的。

输出:

Executor task launch worker-1
List size: 1
Executor task launch worker-1
List size: 2
Executor task launch worker-1
List size: 3
.
.
.
Executor task launch worker-1
List size: 100

main
List size: 0

【问题讨论】:

由于列表一开始似乎是空的,可能是对象映射器无法解析它得到的行吗?你能提供一个minimal reproducible example吗? rdd 是什么? 也许System.out.println 是在 foreach 完成其任务(或什至开始)之前执行的? 那个final关键字与处理后列表为空无关。它只是告诉编译器不应再次初始化此变量/字段。如果您的列表为空,则因为 no 发生了添加调用。 使用调试器逐步完成。我敢打赌答案会一飞冲天。 【参考方案1】:

我已经测试过了,这很有效 https://github.com/freedev/spark-test

final ObjectMapper objectMapper = new ObjectMapper();

List<Map<String, Object>> list = rdd
        .map(new org.apache.spark.api.java.function.Function<String, Map<String, Object>>() 
            @Override
            public Map<String, Object> call(String line) throws Exception 
                TypeReference<Map<String, Object>> typeRef = new TypeReference<Map<String, Object>>() 
                ;
                Map<String, Object> rs = objectMapper.readValue(line, typeRef);
                return rs;
            
        ).collect();

我更喜欢映射Map&lt;String, Object&gt;,因为这将处理Json 中值部分不是字符串(即"salary":20000)的情况。

【讨论】:

问题被标记为java-7。 Java 8 代码不太可能有帮助。 @freedev 感谢您的努力。我试过了,但即使在将implements Serializable 添加到我正在运行主方法的主类之后,也会得到Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 我刚刚在这里github.com/freedev/spark-test 尝试过这段代码,效果很好。 在匿名函数之外创建 typeRef 会产生问题。改了之后就可以了。感谢您的帮助。

以上是关于Spark 中的 JavaRDD<String>.foreach 之后 Arraylist 为空的主要内容,如果未能解决你的问题,请参考以下文章

Spark Java API:如何将 JavaRDD 转换为 RDD 类型

Spark学习之JavaRdd

Apache Spark MySQL JavaRDD.foreachPartition - 为啥我得到 ClassNotFoundException

在 SPARK 2.1 中传递包含 ArrayType 列的 javaRDD 时,createDataFrame() 抛出异常

spark记录spark算子之Transformation

Apache Spark 中的数据集