折叠火花数据框中的列值
Posted
技术标签:
【中文标题】折叠火花数据框中的列值【英文标题】:Collapsing column values in spark dataframes 【发布时间】:2017-05-30 23:14:22 【问题描述】:我有 2 个数据帧
case class UserTransactions(id: Long, transactionDate: java.sql.Date, currencyUsed: String, value: Long)
ID, TransactionDate, CurrencyUsed, value
1, 2016-01-05, USD, 100
1, 2016-01-09, GBP, 150
1, 2016-02-01, USD, 50
1, 2016-02-10, JPN, 10
2, 2016-01-10, EURO, 50
2, 2016-01-10, GBP, 100
case class ReportingTime(userId: Long, reportDate: java.sql.Date)
userId, reportDate
1, 2016-01-05
1, 2016-01-31
1, 2016-02-15
2, 2016-01-10
2, 2016-02-01
现在我想通过组合userId
、reportDate
和sum
之前使用的所有货币来获得摘要。结果应如下所示:
userId, reportDate, trasactionSummary
1, 2016-01-05, None
1, 2016-01-31, (USD -> 100)(GBP-> 150) // combined above 2 transactions less than 2016-01-31
1, 2016-02-15, (USD -> 150)(GBP-> 150)(JPN->10) // combined transactions less than 2016-02-15
2, 2016-01-10, None
2, 2016-02-01, (EURO-> 50) (GBP-> 100)
执行此操作的最佳方法是什么?我们有超过 3 亿笔交易,每个用户最多可以有 10,000 笔交易。
【问题讨论】:
在您的示例输出中,您为什么显示None
对应于reportDate
s 对应于UserTransactions
DataFrame 中的第一个事务?你总是想“跳过”第一笔交易吗?
因为在第一笔交易中用户没有任何历史记录,所以总而言之,它显示 None 并且从第二笔交易开始,它将有类似 (USD -> 100)(GBP-> 150) 的总结
【参考方案1】:
下面的 sn-p 将满足您的要求。初始加入和聚合是通过 pyspark 的 Dataframe API 完成的。然后数据的分组(使用reduceByKey
)和最终的数据集准备是通过RDD api完成的,因为它更适合这样的操作。
from datetime import datetime
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType
from pyspark.sql import functions as F
df1 = spark.createDataFrame([(1,'2016-01-05','USD',100),
(1,'2016-01-09','GBP',150),
(1,'2016-02-01','USD',50),
(1,'2016-02-10','JPN',10),
(2,'2016-01-10','EURO',50),
(2,'2016-01-10','GBP',100)],['id', 'tdate', 'currency', 'value'])
df2 = spark.createDataFrame([(1,'2016-01-05'),
(1,'2016-01-31'),
(1,'2016-02-15'),
(2,'2016-01-10'),
(2,'2016-02-01')],['user_id', 'report_date'])
func = udf (lambda x: datetime.strptime(x, '%Y-%m-%d'), DateType()) ### function to convert string data type to date data type
df2 = df2.withColumn('tdate', func(df2.report_date))
df1 = df1.withColumn('tdate', func(df1.tdate))
result = df2.join(df1, (df1.id == df2.user_id) & (df1.tdate < df2.report_date), 'left_outer').select('user_id', 'report_date', 'currency', 'value').groupBy('user_id', 'report_date', 'currency').agg(F.sum('value').alias('value'))
data = result.rdd.map(lambda x: (x.user_id,x.report_date,x.currency,x.value)).keyBy(lambda x: (x[0],x[1])).mapValues(lambda x: filter(lambda x: bool(x),[(x[2],x[3]) if x[2] else None])).reduceByKey(lambda x,y: x + y).map(lambda x: (x[0][0],x[0][1], x[1]))
最终生成的结果如下图。
>>> spark.createDataFrame([ (x[0],x[1],str(x[2])) for x in data.collect()], ['id', 'date', 'values']).orderBy('id', 'date').show(20, False)
+---+----------+--------------------------------------------+
|id |date |values |
+---+----------+--------------------------------------------+
|1 |2016-01-05|[] |
|1 |2016-01-31|[(u'USD', 100), (u'GBP', 150)] |
|1 |2016-02-15|[(u'USD', 150), (u'GBP', 150), (u'JPN', 10)]|
|2 |2016-01-10|[] |
|2 |2016-02-01|[(u'EURO', 50), (u'GBP', 100)] |
+---+----------+--------------------------------------------+
【讨论】:
效果很好|1 |2016-01-05|Map() | |1 |2016-01-31|地图(USD -> 100, GBP -> 150) | |1 |2016-02-15|地图(USD -> 150, GBP -> 150, JPN -> 10)| |2 |2016-01-10|地图() | |2 |2016-02-01|地图(EURO -> 50, GBP -> 100) | +---+----------+---------------------------------- ----+【参考方案2】:如果有人需要 Scala
case class Transaction(id: String, date: java.sql.Date, currency:Option[String], value: Option[Long])
case class Report(id:String, date:java.sql.Date)
def toDate(date: String): java.sql.Date =
val sf = new SimpleDateFormat("yyyy-MM-dd")
new java.sql.Date(sf.parse(date).getTime)
val allTransactions = Seq(
Transaction("1", toDate("2016-01-05"),Some("USD"),Some(100L)),
Transaction("1", toDate("2016-01-09"),Some("GBP"),Some(150L)),
Transaction("1",toDate("2016-02-01"),Some("USD"),Some(50L)),
Transaction("1",toDate("2016-02-10"),Some("JPN"),Some(10L)),
Transaction("2",toDate("2016-01-10"),Some("EURO"),Some(50L)),
Transaction("2",toDate("2016-01-10"),Some("GBP"),Some(100L))
)
val allReports = Seq(
Report("1",toDate("2016-01-05")),
Report("1",toDate("2016-01-31")),
Report("1",toDate("2016-02-15")),
Report("2",toDate("2016-01-10")),
Report("2",toDate("2016-02-01"))
)
val transections:Dataset[Transaction] = spark.createDataFrame(allTransactions).as[Transaction]
val reports: Dataset[Report] = spark.createDataFrame(allReports).as[Report]
val result = reports.alias("rp").join(transections.alias("tx"), (col("tx.id") === col("rp.id")) && (col("tx.date") < col("rp.date")), "left_outer")
.select("rp.id", "rp.date", "currency", "value")
.groupBy("rp.id", "rp.date", "currency").agg(sum("value"))
.toDF("id", "date", "currency", "value")
.as[Transaction]
val data = result.rdd.keyBy(x => (x.id , x.date))
.mapValues(x => if (x.currency.isDefined) collection.Map[String, Long](x.currency.get -> x.value.get) else collection.Map[String, Long]())
.reduceByKey((x,y) => x ++ y).map(x => (x._1._1, x._1._2, x._2))
.toDF("id", "date", "map")
.orderBy("id", "date")
控制台输出
+---+----------+--------------------------------------+
|id |date |map |
+---+----------+--------------------------------------+
|1 |2016-01-05|Map() |
|1 |2016-01-31|Map(GBP -> 150, USD -> 100) |
|1 |2016-02-15|Map(USD -> 150, GBP -> 150, JPN -> 10)|
|2 |2016-01-10|Map() |
|2 |2016-02-01|Map(GBP -> 100, EURO -> 50) |
+---+----------+--------------------------------------+
【讨论】:
以上是关于折叠火花数据框中的列值的主要内容,如果未能解决你的问题,请参考以下文章