从 Hive 元存储(parquet 格式)在 Spark 中本地处理 10 亿条记录需要 6 个小时。如何加快速度?

Posted

技术标签:

【中文标题】从 Hive 元存储(parquet 格式)在 Spark 中本地处理 10 亿条记录需要 6 个小时。如何加快速度?【英文标题】:Processing 1 billion records locally in Spark from Hive metastore(parquet format) takes forever 6 hours. How to speed it up? 【发布时间】:2020-02-21 21:41:25 【问题描述】:

我在 IntelliJ 中运行一切。该应用程序在 Scala 中。

scalaVersion := "2.12.10" sparkVersion = "2.4.4"

所以我正在处理纽约市出租车数据。近 11 亿条记录,120GB 数据。我读取它,删除不必要的数据,清理它,然后以镶木地板格式写入分区(日、月、年)中的配置单元元存储,并使用默认压缩。到目前为止一切都很好,它在几分钟内(2-3)就可以很快地工作。 现在我再次从元存储中读取数据并执行一些操作,基本上,我想计算来自 manhattanToJFK 的所有游乐设施,因此最终需要几个 UDF 和一个计数。 这需要很长时间。

然后我有一个

总共需要 6 个小时才能运行(我是 4-5 个小时,我搞砸了一些东西,不得不重新启动)。我等不及要检查它是否准确。

根据我的阅读和理解,镶木地板的工作速度应该比阅读和使用 CSV 快得多,但我发现它正好相反。所以我认为我做错了什么。也许一些配置或设置等? 我是一个有火花的初学者,我自己学习。所以,如果我犯了一个菜鸟的错误,请多多包涵。任何帮助都会有很大帮助。

如果我应该发布任何更新或信息,请告诉我。我可以编辑它并发布它。

谢谢

def analysis() = 
    var parquetDF = SparkObject.spark.read.parquet("spark-warehouse/location")
//      .cache()
    val manhattanTojfkDF = countManhattanToJKF(parquetDF)
    findCorrelation(manhattanTojfkDF)
  

  def countManhattanToJKF(df:DataFrame):DataFrame = 
    var parquetDF = df
    //  val geojson = scala.io.Source.fromURL(this.getClass.getResource("/nyc-borough-boundaries-polygon.geojson")).mkString
    val geojson = scala.io.Source.fromURL(this.getClass.getResource("/NYC Taxi Zones.geojson")).mkString
    val features = geojson.parseJson.convertTo[FeatureCollection]
    val broadcastFeatures = SparkObject.spark.sparkContext.broadcast(features)
    val lonlatToZoneID = (longitude: Double, latitude: Double) => 
      val feature: Option[Feature] = broadcastFeatures.value.find(f => 
        f.geometry.contains(new Point(longitude, latitude))
      )
      feature.map(f => 
        f("location_id").convertTo[String]
      ).getOrElse("NA")
    
    val latlonToZoneIDUDF = udf(lonlatToZoneID)

    parquetDF = parquetDF.withColumn("pickupZoneID", when(parquetDF("pickupZoneID") === "NA",
      latlonToZoneIDUDF(parquetDF("pickup_longitude"), parquetDF("pickup_latitude")))
      .otherwise(parquetDF("pickup_longitude")))

    parquetDF = parquetDF.withColumn("dropoffZoneID", when(parquetDF("dropoffZoneID") === "NA",
      latlonToZoneIDUDF(parquetDF("dropoff_longitude"), parquetDF("dropoff_latitude")))
      .otherwise(parquetDF("dropoff_longitude")))


    val boroughLookupID = (pickupID:String) => 
      val feature: Option[Feature] = broadcastFeatures.value.find(f => 
        f.properties("location_id").convertTo[String] == pickupID
      )
      feature.map(f => 
        f("borough").convertTo[String]
      ).getOrElse("NA")
    

    val boroughUDF = udf(boroughLookupID)
    parquetDF = parquetDF.withColumn("pickupBorough", boroughUDF(parquetDF("pickupZoneID")))
    parquetDF = parquetDF.withColumn("dropoffBorough", boroughUDF(parquetDF("dropoffZoneID")))

    val manhattanToJFK = (borough:String, dropOffID:String) => 
      (borough == "Manhattan" && dropOffID == "132")
    

    val manhattanToJFKUDF = udf(manhattanToJFK)
    parquetDF = parquetDF.withColumn("manhattanToJFK",
      manhattanToJFKUDF(parquetDF("pickupBorough"), parquetDF("dropoffZoneID")))

    val filteredDF =  parquetDF.filter(parquetDF("ManhattanToJFK") === true)
    val totalRidesFromManhattanTOJFK = filteredDF.count()
    println(totalRidesFromManhattanTOJFK)
    print(parquetDF.show())
    filteredDF
  

  def findCorrelation(filteredDF:DataFrame) = 
    var weatherDF = SparkObject.spark.read.format("csv")
      .option("header", true)
      .load(URLs.weatherData:_*)

    weatherDF = weatherDF.select(weatherDF("DATE").cast("date"), weatherDF("AWND").cast("float"),
      weatherDF("SNOW").cast("float"), weatherDF("SNWD").cast("float"), weatherDF("TMIN").cast("float"),
      weatherDF("TMAX").cast("float"), weatherDF("PRCP").cast("float"))

     val joinedDF = weatherDF.join(filteredDF, weatherDF("DATE") === filteredDF("pickupDate"))
      .select(weatherDF("DATE"), weatherDF("AWND"), weatherDF("SNOW"), weatherDF("SNWD"), weatherDF("TMIN"),
        weatherDF("TMAX"), weatherDF("PRCP"))
    //    .cache()

    val ridesPerDay = joinedDF.groupBy("DATE").agg(count("DATE").alias("rides_per_day"))
    val cleanedDF =  ridesPerDay.join(joinedDF, "DATE").dropDuplicates().drop("DATE")
    cleanedDF.printSchema()

    val assembler = new VectorAssembler()
      .setInputCols(cleanedDF.columns)
      .setOutputCol("features")

    val corrFeatures = assembler.transform(cleanedDF)

    val Row(coeff1: Matrix) = Correlation.corr(corrFeatures, "features").head
    println(s"Pearson correlation matrix:\n $coeff1")

    val Row(coeff2: Matrix) = Correlation.corr(corrFeatures, "features", "spearman").head
    println(s"Spearman correlation matrix:\n $coeff2")
  

SparkSession 看起来像

lazy val spark = 
    SparkSession
      .builder()
      .master("local[*]")
      .appName("NYCTaxiDataKlarna")
      .getOrCreate()
  

我将 -Xms4g -Xmx4g 作为 VM 选项传递,因此每个内存为 4.1 4.1 GB。

编辑:所以我现在只是运行 manhatantojfk 函数,最后稍作改动,基本上将数据持久化到 hive 中。下次我可以从那里开始。它现在已经运行了将近 5 个小时并且没有完成。

val dw = new DataWriter()
      dw.writeToHive(parquetDF, "parquet", "location_with_borough", "pickupDate")
      print(parquetDF.count())

      val filteredDF =  parquetDF.filter(parquetDF("ManhattanToJFK") === true)
      dw.writeToHive(parquetDF, "parquet", "manhattan_to_jfk", "pickupDate")
//      val totalRidesFromManhattanTOJFK = filteredDF.count()
//      println(totalRidesFromManhattanTOJFK)
//      print(parquetDF.show())
//      filteredDF

【问题讨论】:

您需要分享一些代码来提出任何改进建议或指出任何错误。 丢掉你的 2 个 udf,并使用内置的 spark 函数来实现它们的逻辑。 【参考方案1】:

您可能不受 IO 限制,因此您阅读的格式无关紧要。你真正需要做的是找到查询计划(也就是 spark 正在做的实际工作)并破译它。如果没有查询计划,您无法判断问题是初始读取还是连接或组或关联本身。 spark-history-server 的 sql 选项卡首先要查看。此外,通常最好在执行 ML(又名 cleanDF)之前将预处理的数据转储到存储中,这样您就不会不断地重新运行预处理。

“我在 IntelliJ 中运行一切。”

spark 也是如何配置的?您是否使用 master = local[*] 运行?

编辑:所以我建议启动 spark-shell 并进行一些 REPL 风格的代码探索。如果您在本地计算机上安装了 spark,请使用“spark-shell”从命令行启动它。当它启动时,您会看到它打印出 spark-history-server url。从这里运行您的代码,然后打开 spark-history 服务器以找出 spark 花费时间的原因。

IMO 您的代码看起来比实际需要的复杂。您可能无意中将自己的脚射中,直到您潜得更深一些,您才会知道这一点。

➜  ~ spark-shell 
20/02/23 04:06:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.43.210:4040
Spark context available as 'sc' (master = local[*], app id = local-1582459622890).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.2
      /_/

Using Scala version 2.12.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_231)
Type in expressions to have them evaluated.
Type :help for more information.

scala> :paste
// Entering paste mode (ctrl-D to finish)

//enter code

【讨论】:

好的,我明白了。但我需要两个 udf 从 lon lat 中找到 ZoneID,然后从 ZoneIDs 中找到 borough。所以这两个必须在全长的数据上运行。加入/关联稍后出现。我可以使用 manhattantoJFK 保存数据,然后进行连接和关联。但它和以前一样。 Spark 只是在 IntelliJ 中。是的,master = local[*] 我在发布之前设置了 -Xms4g -Xmx4g。 当 spark 在 intellij 中启动时,您将看到 spark 历史服务器的 URL。去看看吧。 也在你的最后一个数据帧上做 > df.exaplin(true) 这也会打印出查询计划和 spark 计划做什么。

以上是关于从 Hive 元存储(parquet 格式)在 Spark 中本地处理 10 亿条记录需要 6 个小时。如何加快速度?的主要内容,如果未能解决你的问题,请参考以下文章

hive最早在哪个版本起支持Parquet格式

使用 hive 生成​​ Parquet 文件

Sqooping 数据后 Hive 抛出错误

hive 存储格式对比

关于hive的存储格式

Hive中Parquet格式的使用