从 Hive 元存储(parquet 格式)在 Spark 中本地处理 10 亿条记录需要 6 个小时。如何加快速度?
Posted
技术标签:
【中文标题】从 Hive 元存储(parquet 格式)在 Spark 中本地处理 10 亿条记录需要 6 个小时。如何加快速度?【英文标题】:Processing 1 billion records locally in Spark from Hive metastore(parquet format) takes forever 6 hours. How to speed it up? 【发布时间】:2020-02-21 21:41:25 【问题描述】:我在 IntelliJ 中运行一切。该应用程序在 Scala 中。
scalaVersion := "2.12.10" sparkVersion = "2.4.4"
所以我正在处理纽约市出租车数据。近 11 亿条记录,120GB 数据。我读取它,删除不必要的数据,清理它,然后以镶木地板格式写入分区(日、月、年)中的配置单元元存储,并使用默认压缩。到目前为止一切都很好,它在几分钟内(2-3)就可以很快地工作。 现在我再次从元存储中读取数据并执行一些操作,基本上,我想计算来自 manhattanToJFK 的所有游乐设施,因此最终需要几个 UDF 和一个计数。 这需要很长时间。
然后我有一个
总共需要 6 个小时才能运行(我是 4-5 个小时,我搞砸了一些东西,不得不重新启动)。我等不及要检查它是否准确。
根据我的阅读和理解,镶木地板的工作速度应该比阅读和使用 CSV 快得多,但我发现它正好相反。所以我认为我做错了什么。也许一些配置或设置等? 我是一个有火花的初学者,我自己学习。所以,如果我犯了一个菜鸟的错误,请多多包涵。任何帮助都会有很大帮助。
如果我应该发布任何更新或信息,请告诉我。我可以编辑它并发布它。
谢谢
def analysis() =
var parquetDF = SparkObject.spark.read.parquet("spark-warehouse/location")
// .cache()
val manhattanTojfkDF = countManhattanToJKF(parquetDF)
findCorrelation(manhattanTojfkDF)
def countManhattanToJKF(df:DataFrame):DataFrame =
var parquetDF = df
// val geojson = scala.io.Source.fromURL(this.getClass.getResource("/nyc-borough-boundaries-polygon.geojson")).mkString
val geojson = scala.io.Source.fromURL(this.getClass.getResource("/NYC Taxi Zones.geojson")).mkString
val features = geojson.parseJson.convertTo[FeatureCollection]
val broadcastFeatures = SparkObject.spark.sparkContext.broadcast(features)
val lonlatToZoneID = (longitude: Double, latitude: Double) =>
val feature: Option[Feature] = broadcastFeatures.value.find(f =>
f.geometry.contains(new Point(longitude, latitude))
)
feature.map(f =>
f("location_id").convertTo[String]
).getOrElse("NA")
val latlonToZoneIDUDF = udf(lonlatToZoneID)
parquetDF = parquetDF.withColumn("pickupZoneID", when(parquetDF("pickupZoneID") === "NA",
latlonToZoneIDUDF(parquetDF("pickup_longitude"), parquetDF("pickup_latitude")))
.otherwise(parquetDF("pickup_longitude")))
parquetDF = parquetDF.withColumn("dropoffZoneID", when(parquetDF("dropoffZoneID") === "NA",
latlonToZoneIDUDF(parquetDF("dropoff_longitude"), parquetDF("dropoff_latitude")))
.otherwise(parquetDF("dropoff_longitude")))
val boroughLookupID = (pickupID:String) =>
val feature: Option[Feature] = broadcastFeatures.value.find(f =>
f.properties("location_id").convertTo[String] == pickupID
)
feature.map(f =>
f("borough").convertTo[String]
).getOrElse("NA")
val boroughUDF = udf(boroughLookupID)
parquetDF = parquetDF.withColumn("pickupBorough", boroughUDF(parquetDF("pickupZoneID")))
parquetDF = parquetDF.withColumn("dropoffBorough", boroughUDF(parquetDF("dropoffZoneID")))
val manhattanToJFK = (borough:String, dropOffID:String) =>
(borough == "Manhattan" && dropOffID == "132")
val manhattanToJFKUDF = udf(manhattanToJFK)
parquetDF = parquetDF.withColumn("manhattanToJFK",
manhattanToJFKUDF(parquetDF("pickupBorough"), parquetDF("dropoffZoneID")))
val filteredDF = parquetDF.filter(parquetDF("ManhattanToJFK") === true)
val totalRidesFromManhattanTOJFK = filteredDF.count()
println(totalRidesFromManhattanTOJFK)
print(parquetDF.show())
filteredDF
def findCorrelation(filteredDF:DataFrame) =
var weatherDF = SparkObject.spark.read.format("csv")
.option("header", true)
.load(URLs.weatherData:_*)
weatherDF = weatherDF.select(weatherDF("DATE").cast("date"), weatherDF("AWND").cast("float"),
weatherDF("SNOW").cast("float"), weatherDF("SNWD").cast("float"), weatherDF("TMIN").cast("float"),
weatherDF("TMAX").cast("float"), weatherDF("PRCP").cast("float"))
val joinedDF = weatherDF.join(filteredDF, weatherDF("DATE") === filteredDF("pickupDate"))
.select(weatherDF("DATE"), weatherDF("AWND"), weatherDF("SNOW"), weatherDF("SNWD"), weatherDF("TMIN"),
weatherDF("TMAX"), weatherDF("PRCP"))
// .cache()
val ridesPerDay = joinedDF.groupBy("DATE").agg(count("DATE").alias("rides_per_day"))
val cleanedDF = ridesPerDay.join(joinedDF, "DATE").dropDuplicates().drop("DATE")
cleanedDF.printSchema()
val assembler = new VectorAssembler()
.setInputCols(cleanedDF.columns)
.setOutputCol("features")
val corrFeatures = assembler.transform(cleanedDF)
val Row(coeff1: Matrix) = Correlation.corr(corrFeatures, "features").head
println(s"Pearson correlation matrix:\n $coeff1")
val Row(coeff2: Matrix) = Correlation.corr(corrFeatures, "features", "spearman").head
println(s"Spearman correlation matrix:\n $coeff2")
SparkSession 看起来像
lazy val spark =
SparkSession
.builder()
.master("local[*]")
.appName("NYCTaxiDataKlarna")
.getOrCreate()
我将 -Xms4g -Xmx4g 作为 VM 选项传递,因此每个内存为 4.1 4.1 GB。
编辑:所以我现在只是运行 manhatantojfk 函数,最后稍作改动,基本上将数据持久化到 hive 中。下次我可以从那里开始。它现在已经运行了将近 5 个小时并且没有完成。
val dw = new DataWriter()
dw.writeToHive(parquetDF, "parquet", "location_with_borough", "pickupDate")
print(parquetDF.count())
val filteredDF = parquetDF.filter(parquetDF("ManhattanToJFK") === true)
dw.writeToHive(parquetDF, "parquet", "manhattan_to_jfk", "pickupDate")
// val totalRidesFromManhattanTOJFK = filteredDF.count()
// println(totalRidesFromManhattanTOJFK)
// print(parquetDF.show())
// filteredDF
【问题讨论】:
您需要分享一些代码来提出任何改进建议或指出任何错误。 丢掉你的 2 个 udf,并使用内置的 spark 函数来实现它们的逻辑。 【参考方案1】:您可能不受 IO 限制,因此您阅读的格式无关紧要。你真正需要做的是找到查询计划(也就是 spark 正在做的实际工作)并破译它。如果没有查询计划,您无法判断问题是初始读取还是连接或组或关联本身。 spark-history-server 的 sql 选项卡首先要查看。此外,通常最好在执行 ML(又名 cleanDF)之前将预处理的数据转储到存储中,这样您就不会不断地重新运行预处理。
“我在 IntelliJ 中运行一切。”
spark 也是如何配置的?您是否使用 master = local[*] 运行?
编辑:所以我建议启动 spark-shell 并进行一些 REPL 风格的代码探索。如果您在本地计算机上安装了 spark,请使用“spark-shell”从命令行启动它。当它启动时,您会看到它打印出 spark-history-server url。从这里运行您的代码,然后打开 spark-history 服务器以找出 spark 花费时间的原因。
IMO 您的代码看起来比实际需要的复杂。您可能无意中将自己的脚射中,直到您潜得更深一些,您才会知道这一点。
➜ ~ spark-shell
20/02/23 04:06:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.43.210:4040
Spark context available as 'sc' (master = local[*], app id = local-1582459622890).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.2
/_/
Using Scala version 2.12.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_231)
Type in expressions to have them evaluated.
Type :help for more information.
scala> :paste
// Entering paste mode (ctrl-D to finish)
//enter code
【讨论】:
好的,我明白了。但我需要两个 udf 从 lon lat 中找到 ZoneID,然后从 ZoneIDs 中找到 borough。所以这两个必须在全长的数据上运行。加入/关联稍后出现。我可以使用 manhattantoJFK 保存数据,然后进行连接和关联。但它和以前一样。 Spark 只是在 IntelliJ 中。是的,master = local[*] 我在发布之前设置了 -Xms4g -Xmx4g。 当 spark 在 intellij 中启动时,您将看到 spark 历史服务器的 URL。去看看吧。 也在你的最后一个数据帧上做 > df.exaplin(true) 这也会打印出查询计划和 spark 计划做什么。以上是关于从 Hive 元存储(parquet 格式)在 Spark 中本地处理 10 亿条记录需要 6 个小时。如何加快速度?的主要内容,如果未能解决你的问题,请参考以下文章