在独立/主从 spark-shell 中读取 Parquet 时的不同行为

Posted

技术标签:

【中文标题】在独立/主从 spark-shell 中读取 Parquet 时的不同行为【英文标题】:Different behaviour when reading from Parquet in standalone/master-slave spark-shell 【发布时间】:2016-03-08 00:37:14 【问题描述】:

这是我用来从 Scala 中的 Parquet 读取数据帧的较大代码中的 sn-p。

case class COOMatrix(row: Seq[Long], col: Seq[Long], data: Seq[Double])

def buildMatrix(cooMatrixFields: DataFrame) = 

  val cooMatrices = cooMatrixFields map 
    case Row(r,c,d) => COOMatrix(r.asInstanceOf[Seq[Long]], c.asInstanceOf[Seq[Long]], d.asInstanceOf[Seq[Double]])
  

  val matEntries = cooMatrices.zipWithIndex.flatMap 
    case (cooMat, matIndex) =>
      val rowOffset = cooMat.row.distinct.size
      val colOffset = cooMat.col.distinct.size

      val cooMatRowShifted = cooMat.row.map(rowEntry => rowEntry + rowOffset * matIndex)
      val cooMatColShifted = cooMat.col.map(colEntry => colEntry + colOffset * matIndex)

      (cooMatRowShifted, cooMatColShifted, cooMat.data).zipped.map 
        case (i, j, value) => MatrixEntry(i, j, value)
      
  

  new CoordinateMatrix(matEntries)



val C_entries = sqlContext.read.load(s"$dataBaseDir/C.parquet")

val C = buildMatrix(C_entries)

我的代码在本地 spark 上下文中运行时成功执行。

独立集群上,同样的代码一旦到达强制它实际从 Parquet 读取的操作就会失败。 正确检索数据框的架构:

C_entries: org.apache.spark.sql.DataFrame = [C_row: array<bigint>, C_col: array<bigint>, C_data: array<double>]

但是执行器在执行val C = buildMatrix(C_entries)这一行时会崩溃,除了这个例外:

java.lang.ExceptionInInitializerError
    at $line39.$read$$iwC.<init>(<console>:7)
    at $line39.$read.<init>(<console>:61)
    at $line39.$read$.<init>(<console>:65)
    at $line39.$read$.<clinit>(<console>)
    at $line67.$read$$iwC.<init>(<console>:7)
    at $line67.$read.<init>(<console>:24)
    at $line67.$read$.<init>(<console>:28)
    at $line67.$read$.<clinit>(<console>)
    at $line68.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$3.apply(<console>:63)
    at $line68.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$3.apply(<console>:62)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597)
    at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
    at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
    at $line4.$read$$iwC$$iwC.<init>(<console>:15)
    at $line4.$read$$iwC.<init>(<console>:24)
    at $line4.$read.<init>(<console>:26)
    at $line4.$read$.<init>(<console>:30)
    at $line4.$read$.<clinit>(<console>)
    ... 22 more

不确定是否相关,但在增加日志详细程度的同时,我注意到了这个异常:

16/03/07 20:59:38 INFO GenerateUnsafeProjection: Code generated in 157.285464 ms
16/03/07 20:59:38 DEBUG ExecutorClassLoader: Did not load class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection from REPL class server at http://155.198.193.158:32862
java.lang.ClassNotFoundException: Class file not found at URL http://155.198.193.158:32862/org/apache/spark/sql/catalyst/expressions/GeneratedClass%24SpecificUnsafeProjection.class

我为独立集群尝试了不同的配置:

主机、1 个从机和 spark-shell 在我的笔记本电脑上运行 master 和 1 个 slave 分别在不同的机器上运行,spark-shell 在我的笔记本电脑上 主机和 spark-shell 在一台机器上,1 个从机在另一台机器上

我从默认属性开始,然后演变为更复杂的属性文件,但没有取得更大的成功:

spark.driver.memory                4g
spark.rpc=netty
spark.eventLog.enabled             true
spark.eventLog.dir                 file:///mnt/fastmp/spark_workdir/logs
spark.driver.extraJavaOptions      -Xmx20480m -XX:MaxPermSize=2048m -XX:ReservedCodeCacheSize=2048m
spark.shuffle.service.enabled      true
spark.shuffle.consolidateFiles     true
spark.sql.parquet.binaryAsString   true
spark.speculation                  false
spark.rpc.timeout                 1000
spark.rdd.compress true
spark.core.connection.ack.wait.timeout 600
spark.driver.maxResultSize         0
spark.task.maxFailures             3
spark.shuffle.io.maxRetries        3

我正在运行 spark-1.6.0-bin-hadoop2.6 的预构建版本。 此部署中不涉及 HDFS,所有 Parquet 文件都存储在可供所有机器使用的共享挂载 (CephFS) 上。

我怀疑这与底层文件系统有关,因为我的代码的另一部分在本地和独立模式下都能正常读取不同的 Parquet 文件。

【问题讨论】:

因此,如果您在读取镶木地板文件val C_entries = sqlContext.read.load(s"$dataBaseDir/C.parquet") 后立即调用C_entries.count,它也会失败吗? 是的,我可以。我什至可以在坠毁的执行者恢复后做到这一点。我想我记得读过一些关于这个的东西,Spark 不需要读取整个 parquet 文件来获取数据帧中的行数,所以它的工作原理并不奇怪。 好吧 - 我不知道 Spark 可以省略执行操作,但这并不意味着它不是这样 ;) 我只是想弄清楚问题是否与阅读有关parquet 文件(可能是损坏的文件或您不希望出现的空值),或者问题出在实际代码上。因此,如果您认为count 可能具有误导性,那么请做一些琐碎的map,然后是reduceByKey,然后数数。它必须在调用reduceByKey 时评估整个parquet 文件。如果这个简单的例子有效,你就会知道它与 parquet 文件无关。 我真的看不出它是如何来自 Parquet 文件本身的,因为当代码在纯本地 spark-shell(未连接到主服务器)上运行时它读取良好。无论如何,我尝试确定,这运行良好C_entries.map (x =&gt; x).countByValue,这也运行良好C_entries.collect,但之后直接调用buildMatrix(C_entries) 会使执行程序崩溃。我的主从机的功能和/或配置中肯定有一些东西会弄乱 Parquet 文件的读取。 嗯...我只是尝试在一个小型 AWS 集群上运行代码(尽管在 YARN 上运行而不是独立运行)从 S3 读取镶木地板文件,它运行得很好...可以您缩小了 buildMatrix 的哪一部分导致崩溃? map 还是 flatmap 函数? 【参考方案1】:

TL;DR:将您的代码打包为 jar

出于记录目的,问题似乎与使用独立集群有关。

完全相同的代码适用于这些设置:

spark-shell 和 master 在同一台机器上 在 YARN(AWS EMR 集群)上运行并从 S3 读取 parquet 文件

通过对独立设置的日志进行更多挖掘,问题似乎与类服务器的此异常有关:

INFO GenerateUnsafeProjection: Code generated in 157.285464 ms
DEBUG ExecutorClassLoader: Did not load class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection from REPL class server at http://155.198.193.158:32862
java.lang.ClassNotFoundException: Class file not found at URL http://155.198.193.158:32862/org/apache/spark/sql/catalyst/expressions/GeneratedClass%24SpecificUnsafeProjection.class

我的理解是spark-shell 启动了一个 HTTP 服务器 (jetty) 以便将它从 REPL 中的代码生成的类提供给工作人员。

在我的例子中,很多课程都成功提供了(我什至设法通过 telnet 检索了一些课程)。但是类服务器找不到类 GeneratedClass(及其所有内部类)。

日志中出现的典型错误信息是:

DEBUG Server: RESPONSE /org/apache/spark/sql/catalyst/expressions/GeneratedClass.class  404 handled=true

我的想法是它与 master 和 spark-shell 在同一台服务器上运行,因为它们在同一 JVM 中运行,因此即使 HTTP 传输失败也可以找到该类。

目前我发现的唯一成功的解决方案是构建一个jar包并使用spark-shell--jars选项或将其作为参数传递给spark-submit

【讨论】:

以上是关于在独立/主从 spark-shell 中读取 Parquet 时的不同行为的主要内容,如果未能解决你的问题,请参考以下文章

spark-shell读取parquet文件

如何使用 spark-avro 包从 spark-shell 读取 avro 文件?

spark-shell读取.log文件获取日志信息后,怎么进行分析?比如统计包含404的行数

Spark记录-Spark-Shell客户端操作读取Hive数据

使用 spark-csv 在 zeppelin 中读取 csv 文件

多节点火花集群上的 spark-shell 无法在远程工作节点上旋转执行程序