Spark 中的 JavaRDD<String>.foreach 之后 Arraylist 为空
Posted
技术标签:
【中文标题】Spark 中的 JavaRDD<String>.foreach 之后 Arraylist 为空【英文标题】:Arraylist is empty after JavaRDD<String>.foreach in spark 【发布时间】:2017-04-24 15:46:19 【问题描述】:示例 json(总共 100 条记录):
"name":"dev","salary":10000,"occupation":"engg","address":"noida" "name":"karthik","salary":20000,"occupation":"engg","address":"noida"
有用的代码:
final List<Map<String,String>> jsonData = new ArrayList<>();
DataFrame df = sqlContext.read().json("file:///home/dev/data-json/emp.json");
JavaRDD<String> rdd = df.repartition(1).toJSON().toJavaRDD();
rdd.foreach(new VoidFunction<String>()
@Override
public void call(String line)
try
jsonData.add (new ObjectMapper().readValue(line, Map.class));
System.out.println(Thread.currentThread().getName());
System.out.println("List size: "+jsonData.size());
catch (IOException e)
e.printStackTrace();
);
System.out.println(Thread.currentThread().getName());
System.out.println("List size: "+jsonData.size());
jsonData
最后是空的。
输出:
Executor task launch worker-1
List size: 1
Executor task launch worker-1
List size: 2
Executor task launch worker-1
List size: 3
.
.
.
Executor task launch worker-1
List size: 100
main
List size: 0
【问题讨论】:
由于列表一开始似乎是空的,可能是对象映射器无法解析它得到的行吗?你能提供一个minimal reproducible example吗?rdd
是什么?
也许System.out.println
是在 foreach 完成其任务(或什至开始)之前执行的?
那个final关键字与处理后列表为空无关。它只是告诉编译器不应再次初始化此变量/字段。如果您的列表为空,则因为 no 发生了添加调用。
使用调试器逐步完成。我敢打赌答案会一飞冲天。
【参考方案1】:
我已经测试过了,这很有效 https://github.com/freedev/spark-test
final ObjectMapper objectMapper = new ObjectMapper();
List<Map<String, Object>> list = rdd
.map(new org.apache.spark.api.java.function.Function<String, Map<String, Object>>()
@Override
public Map<String, Object> call(String line) throws Exception
TypeReference<Map<String, Object>> typeRef = new TypeReference<Map<String, Object>>()
;
Map<String, Object> rs = objectMapper.readValue(line, typeRef);
return rs;
).collect();
我更喜欢映射Map<String, Object>
,因为这将处理Json 中值部分不是字符串(即"salary":20000
)的情况。
【讨论】:
问题被标记为java-7
。 Java 8 代码不太可能有帮助。
@freedev 感谢您的努力。我试过了,但即使在将implements Serializable
添加到我正在运行主方法的主类之后,也会得到Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
我刚刚在这里github.com/freedev/spark-test 尝试过这段代码,效果很好。
在匿名函数之外创建 typeRef
会产生问题。改了之后就可以了。感谢您的帮助。以上是关于Spark 中的 JavaRDD<String>.foreach 之后 Arraylist 为空的主要内容,如果未能解决你的问题,请参考以下文章
Spark Java API:如何将 JavaRDD 转换为 RDD 类型
Apache Spark MySQL JavaRDD.foreachPartition - 为啥我得到 ClassNotFoundException
在 SPARK 2.1 中传递包含 ArrayType 列的 javaRDD 时,createDataFrame() 抛出异常