在 Spark 中处理日期
Posted
技术标签:
【中文标题】在 Spark 中处理日期【英文标题】:Working with Dates in Spark 【发布时间】:2015-11-28 18:28:02 【问题描述】:我有一个需求,即解析 CSV 文件,识别特定日期之间的记录,并查找该期间内每个 ProductCategory 的每个销售人员的总销售额和平均销售额。下面是 CSV 文件结构:
SalesPersonId、SalesPersonName、SaleDate、SaleAmount、ProductCategory
请帮助解决这个问题。在 Scala 中寻找解决方案
我尝试了什么:
使用了如下所述的 SimpleDateFormat: val 格式 = new java.text.SimpleDateFormat("MM/dd/yyyy") 并使用以下代码创建了一个 RDD: val onlyHouseLoan = readFile.map(line => (line.split(",")(0), line.split(",")(2), line.split(",")(3).toLong, format.parse(line.split(",")(4).toString())))
但是,我尝试在突出显示的表达式之上使用日历,但出现 NumberformatExpression 错误。
【问题讨论】:
你有没有尝试写任何东西? 如果您希望有人回答,您应该在提问时更加努力。 显然我试过了,但只有我问过。归根结底,应该由我来完成和交付。我不想发布我进入论坛并享受现场直播的作品。此外,我还是 Microsoft 技术的社区贡献者。无论我有什么经验,我都会帮助解决问题。希望这可以帮助您理解。请注意,如果这足够合乎逻辑,可以在这里发布,我会分享我尝试过的内容。 【参考方案1】:因此,只需以您描述的 csv 文件格式创建一个快速 rdd
val list = sc.parallelize(List(("1","Timothy","04/02/2015","100","TV"), ("1","Timothy","04/03/2015","10","Book"), ("1","Timothy","04/03/2015","20","Book"), ("1","Timothy","04/05/2015","10","Book"),("2","Ursula","04/02/2015","100","TV")))
然后运行
import java.time.LocalDate
import java.time.format.DateTimeFormatter
val startDate = LocalDate.of(2015,1,4)
val endDate = LocalDate.of(2015,4,5)
val result = list
.filtercase(_,_,date,_,_) =>
val localDate = LocalDate.parse(date, DateTimeFormatter.ofPattern("MM/dd/yyyy"))
localDate.isAfter(startDate) && localDate.isBefore(endDate)
.mapcase(id, _, _, amount, category) => ((id, category), (amount.toDouble, 1))
.reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2))
.mapcase((id, category),(total, sales)) => (id, List((category, total, total/sales)))
.reduceByKey(_ ++ _)
会给你
(1,List((Book,30.0,15.0), (TV,100.0,100.0)))
(2,List((TV,100.0,100.0)))
格式为 (SalesPersonId, [(ProductCategory,TotalSaleAmount, AvgSaleAmount)]。这是您要找的吗?
【讨论】:
您好 Glennie - 非常感谢您的回复。我正在使用您的方法来实现我所需要的。我会尽快更新你。 顺便说一句,直到我回答后我才看到您的编辑,但如果您愿意,您也可以使用val onlyHouseLoan = readFile.map(line => val subStrings = line.split(",") (subStrings(0), subStrings(2), subStrings(3).toLong, LocalDate.parse(subStrings(4),DateTimeFormatter.ofPattern("MM/dd/yyyy"))) )
解析最初的日期
嗨 Glennie,感谢您对日期部分的建议,我会尝试的。但是,下面是我可以根据您的方法编写的代码: val onlyHouseLoan = readFile.map(line => (line.split(",")(2), line.split(",")( 3).toLong, getYearFromDate(line.split(",")(4).toString()))).filter(._1 == "house") //筛选房贷 .filtercase( ,,year) => (year >= 1960 && year ) => (loantype , (loanamount, 1)).reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2)) 。根据您的方法编写。然而,还剩下一件事,那就是计算平均值。我得到的贷款类型为“住房”,贷款总额和记录数。我无法编写表达式来计算平均值。请建议完成。其次,我是初学者和学习火花。您能否提供帮助我编写/采用您使用的编码风格/模式的指针。分解成不同部分并使用功能集的问题。任何让我从方法的角度思考的博客/帮助链接。那真的很有帮助。再次感谢。 要计算并包含每个类别的平均值,您只需添加最终的
.mapValuescase(totalLoanAmount, recordCount) => (totalLoanAmount, totalLoanAmount.toDouble/recordCount)
。请注意,我将 totalLoanAmount 显式转换为 Double,因为 LoanAmount 或 recordCount 必须是 Double 类型才能获得 Double 中的结果。从您的代码看来,loanAmount 的类型为 Long。以上是关于在 Spark 中处理日期的主要内容,如果未能解决你的问题,请参考以下文章