从avro架构生成的类的Spark问题
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从avro架构生成的类的Spark问题相关的知识,希望对你有一定的参考价值。
我有一段用spark编写的代码,它将HDFS中的数据加载到avro idl生成的java类中。在以这种方式创建的RDD上,我正在执行简单的操作,结果取决于我是否在它之前缓存RDD,即如果我在下面运行代码
val loadedData = loadFromHDFS[Data](path,...)
println(loadedData.map(x => x.getUserId + x.getDate).distinct().count()) // 200000
程序将打印200000,另一方面执行下一个代码
val loadedData = loadFromHDFS[Data](path,...).cache()
println(loadedData.map(x => x.getUserId + x.getDate).distinct().count()) // 1
导致1打印到标准输出。
看起来,当我在读取缓存数据后检查字段的值时
我很确定所述问题的根本原因是avro idl生成的类的序列化问题,但我不知道如何解决它。我尝试使用Kryo,注册生成的类(Data),从chill_avro注册给定类(SpecificRecordSerializer,SpecificRecordBinarySerializer等)的不同序列化程序,但这些想法都没有帮助我。
我怎么能解决这个问题?
Link以最小,完整和可验证的例子。
答案
尝试下面的代码 -
val loadedData = loadFromHDFS[Data](path,...)
println(loadedData.map(x => x.getUserId + x.getDate).distinct().count()).cache()
以上是关于从avro架构生成的类的Spark问题的主要内容,如果未能解决你的问题,请参考以下文章
databricks avro 架构无法转换为 Spark SQL 结构类型