与 aws-java-sdk 链接时读取 json 文件时 Spark 崩溃

Posted

技术标签:

【中文标题】与 aws-java-sdk 链接时读取 json 文件时 Spark 崩溃【英文标题】:Spark crash while reading json file when linked with aws-java-sdk 【发布时间】:2016-02-01 15:10:06 【问题描述】:

config.json成为一个小json文件:


    "toto": 1

我做了一个简单的代码,用sc.textFile读取json文件(因为文件可以在S3,本地或HDFS,所以textFile很方便)

import org.apache.spark.SparkContext, SparkConf

object testAwsSdk 
  def main( args:Array[String] ):Unit = 
    val sparkConf = new SparkConf().setAppName("test-aws-sdk").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)
    val json = sc.textFile("config.json") 
    println(json.collect().mkString("\n"))
  

SBT 文件仅拉取spark-core

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.5.1" % "compile"
)

程序按预期运行,将 config.json 的内容写入标准输出。

现在我还想与亚马逊的 sdk 链接 aws-java-sdk 以访问 S3。

libraryDependencies ++= Seq(
  "com.amazonaws" % "aws-java-sdk" % "1.10.30" % "compile",
  "org.apache.spark" %% "spark-core" % "1.5.1" % "compile"
)

执行相同的代码,spark 会抛出以下异常。

Exception in thread "main" com.fasterxml.jackson.databind.JsonMappingException: Could not find creator property with name 'id' (in class org.apache.spark.rdd.RDDOperationScope)
 at [Source: "id":"0","name":"textFile"; line: 1, column: 1]
    at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148)
    at com.fasterxml.jackson.databind.DeserializationContext.mappingException(DeserializationContext.java:843)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:533)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:220)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:143)
    at com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer2(DeserializerCache.java:409)
    at com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer(DeserializerCache.java:358)
    at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:265)
    at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:245)
    at com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:143)
    at com.fasterxml.jackson.databind.DeserializationContext.findRootValueDeserializer(DeserializationContext.java:439)
    at com.fasterxml.jackson.databind.ObjectMapper._findRootDeserializer(ObjectMapper.java:3666)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3558)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2578)
    at org.apache.spark.rdd.RDDOperationScope$.fromJson(RDDOperationScope.scala:82)
    at org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:133)
    at org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:133)
    at scala.Option.map(Option.scala:145)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:133)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:709)
    at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:1012)
    at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:827)
    at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:825)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:709)
    at org.apache.spark.SparkContext.textFile(SparkContext.scala:825)
    at testAwsSdk$.main(testAwsSdk.scala:11)
    at testAwsSdk.main(testAwsSdk.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

读取堆栈,似乎在链接 aws-java-sdk 时,sc.textFile 检测到该文件是 json 文件并尝试使用 jackson 假设某种格式对其进行解析,当然它找不到。我需要链接 aws-java-sdk,所以我的问题是:

1- 为什么添加aws-java-sdk 会修改spark-core 的行为?

2- 是否有解决方法(文件可以在 HDFS、S3 或本地)?

【问题讨论】:

这是因为 aws-java-sdk 使用的是 jackson 库的最新版本 2.5.3,而 spark 使用的是旧版本 2.4.4。我面临同样的问题,但无法解决。如果您找到了解决方案,请分享。谢谢 嗨哈菲兹...很讨厌不是吗?我将案例发送到 AWS。他们确认这是一个兼容性问题。不过,他们还没有告诉我一个明确的解决方案。会尽快解决的。 嗨鲍里斯!是的,面对这个问题很烦人,但我已经通过从 spark-core 中排除杰克逊核心和杰克逊模块库并添加杰克逊核心最新库依赖项来解决它 @HafizMujadid 你是怎么做到的?你能解释一下吗?谢谢。 【参考方案1】:

与亚马逊支持人员交谈。这是杰克逊图书馆的一个依赖问题。在 SBT 中,覆盖 jackson:

libraryDependencies ++= Seq( 
"com.amazonaws" % "aws-java-sdk" % "1.10.30" % "compile",
"org.apache.spark" %% "spark-core" % "1.5.1" % "compile"
) 

dependencyOverrides ++= Set( 
"com.fasterxml.jackson.core" % "jackson-databind" % "2.4.4" 
) 

他们的回答: 我们已经在 Mac、Ec2 (redhat AMI) 实例和 EMR (Amazon Linux) 上完成了这项工作。 3 不同的环境。问题的根本原因是 sbt 构建了一个依赖关系图,然后通过逐出旧版本并选择依赖库的最新版本来处理版本冲突问题。在这种情况下,spark 依赖于 2.4 版本的 jackson 库,而 AWS SDK 需要 2.5。因此存在版本冲突,sbt 驱逐 spark 的依赖版本(较旧)并选择 AWS SDK 版本(最新)。

【讨论】:

【参考方案2】:

添加到Boris' answer,如果您不想使用Jackson 的固定版本(也许将来您会升级Spark)但仍想放弃AWS 的那个,您可以执行以下操作:

libraryDependencies ++= Seq( 
  "com.amazonaws" % "aws-java-sdk" % "1.10.30" % "compile" excludeAll (
    ExclusionRule("com.fasterxml.jackson.core", "jackson-databind")
  ),
  "org.apache.spark" %% "spark-core" % "1.5.1" % "compile"
) 

【讨论】:

以上是关于与 aws-java-sdk 链接时读取 json 文件时 Spark 崩溃的主要内容,如果未能解决你的问题,请参考以下文章

如何使用aws-java-sdk从S3读取块的文件块

Python 文件IO:JSON 文件的读取与写入

hadoop 2.7.7 的 AWS-Java-SDK 版本问题

从 IPFS 读取 JSON 不可能?

如何使用scala和aws-java-sdk从S3存储桶中获取所有S3ObjectSummary?

无法读取昂首阔步的JSON