在 Spark 中处理日期

Posted

技术标签:

【中文标题】在 Spark 中处理日期【英文标题】:Working with Dates in Spark 【发布时间】:2015-11-28 18:28:02 【问题描述】:

我有一个需求,即解析 CSV 文件,识别特定日期之间的记录,并查找该期间内每个 ProductCategory 的每个销售人员的总销售额和平均销售额。下面是 CSV 文件结构:

SalesPersonId、SalesPersonName、SaleDate、SaleAmount、ProductCategory

请帮助解决这个问题。在 Scala 中寻找解决方案

我尝试了什么:

使用了如下所述的 SimpleDateFormat: val 格式 = new java.text.SimpleDateFormat("MM/dd/yyyy") 并使用以下代码创建了一个 RDD: val onlyHouseLoan = readFile.map(line => (line.split(",")(0), line.split(",")(2), line.split(",")(3).toLong, format.parse(line.split(",")(4).toString())))

但是,我尝试在突出显示的表达式之上使用日历,但出现 NumberformatExpression 错误。

【问题讨论】:

你有没有尝试写任何东西? 如果您希望有人回答,您应该在提问时更加努力。 显然我试过了,但只有我问过。归根结底,应该由我来完成和交付。我不想发布我进入论坛并享受现场直播的作品。此外,我还是 Microsoft 技术的社区贡献者。无论我有什么经验,我都会帮助解决问题。希望这可以帮助您理解。请注意,如果这足够合乎逻辑,可以在这里发布,我会分享我尝试过的内容。 【参考方案1】:

因此,只需以您描述的 csv 文件格式创建一个快速 rdd

val list = sc.parallelize(List(("1","Timothy","04/02/2015","100","TV"), ("1","Timothy","04/03/2015","10","Book"), ("1","Timothy","04/03/2015","20","Book"), ("1","Timothy","04/05/2015","10","Book"),("2","Ursula","04/02/2015","100","TV")))

然后运行

import java.time.LocalDate
import java.time.format.DateTimeFormatter

val startDate = LocalDate.of(2015,1,4)
val endDate = LocalDate.of(2015,4,5)

val result = list
    .filtercase(_,_,date,_,_) => 
         val localDate = LocalDate.parse(date, DateTimeFormatter.ofPattern("MM/dd/yyyy"))
         localDate.isAfter(startDate) && localDate.isBefore(endDate)
    .mapcase(id, _, _, amount, category) => ((id, category), (amount.toDouble, 1)) 
    .reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2)) 
    .mapcase((id, category),(total, sales)) => (id, List((category, total, total/sales))) 
    .reduceByKey(_ ++ _)

会给你

(1,List((Book,30.0,15.0), (TV,100.0,100.0)))
(2,List((TV,100.0,100.0)))

格式为 (SalesPersonId, [(ProductCategory,TotalSaleAmount, AvgSaleAmount)]。这是您要找的吗?

【讨论】:

您好 Glennie - 非常感谢您的回复。我正在使用您的方法来实现我所需要的。我会尽快更新你。 顺便说一句,直到我回答后我才看到您的编辑,但如果您愿意,您也可以使用 val onlyHouseLoan = readFile.map(line => val subStrings = line.split(",") (subStrings(0), subStrings(2), subStrings(3).toLong, LocalDate.parse(subStrings(4),DateTimeFormatter.ofPattern("MM/dd/yyyy"))) ) 解析最初的日期 嗨 Glennie,感谢您对日期部分的建议,我会尝试的。但是,下面是我可以根据您的方法编写的代码:
 val onlyHouseLoan = readFile.map(line => (line.split(",")(2), line.split(",")( 3).toLong, getYearFromDate(line.split(",")(4).toString()))).filter(._1 == "house") //筛选房贷 .filtercase( ,,year) => (year >= 1960 && year  ) => (loantype , (loanamount, 1)).reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2)) 。根据您的方法编写。
然而,还剩下一件事,那就是计算平均值。我得到的贷款类型为“住房”,贷款总额和记录数。我无法编写表达式来计算平均值。请建议完成。其次,我是初学者和学习火花。您能否提供帮助我编写/采用您使用的编码风格/模式的指针。分解成不同部分并使用功能集的问题。任何让我从方法的角度思考的博客/帮助链接。那真的很有帮助。再次感谢。 要计算并包含每个类别的平均值,您只需添加最终的.mapValuescase(totalLoanAmount, recordCount) => (totalLoanAmount, totalLoanAmount.toDouble/recordCount)。请注意,我将 totalLoanAmount 显式转换为 Double,因为 LoanAmount 或 recordCount 必须是 Double 类型才能获得 Double 中的结果。从您的代码看来,loanAmount 的类型为 Long。

以上是关于在 Spark 中处理日期的主要内容,如果未能解决你的问题,请参考以下文章

知识点-Spark小节

如何从日期字符串中提取年份?

在 Spark SQL 中计算财务日期

在 Spark 中转换 dd-MMM-yy 日期格式

在 Spark scala 程序中解析日期时出错 [重复]

如何在 Spark 数据框中添加具有当前日期的额外列