折叠火花数据框中的列值

Posted

技术标签:

【中文标题】折叠火花数据框中的列值【英文标题】:Collapsing column values in spark dataframes 【发布时间】:2017-05-30 23:14:22 【问题描述】:

我有 2 个数据帧

case class UserTransactions(id: Long, transactionDate: java.sql.Date, currencyUsed: String, value: Long)

ID, TransactionDate, CurrencyUsed, value
1, 2016-01-05, USD, 100
1, 2016-01-09, GBP, 150 
1, 2016-02-01, USD, 50
1, 2016-02-10, JPN, 10
2, 2016-01-10, EURO, 50
2, 2016-01-10, GBP, 100

case class ReportingTime(userId: Long, reportDate: java.sql.Date)

userId, reportDate
1, 2016-01-05
1, 2016-01-31
1, 2016-02-15
2, 2016-01-10
2, 2016-02-01

现在我想通过组合userIdreportDatesum 之前使用的所有货币来获得摘要。结果应如下所示:

userId, reportDate, trasactionSummary
1, 2016-01-05, None
1, 2016-01-31, (USD -> 100)(GBP-> 150) // combined above 2 transactions less than 2016-01-31
1, 2016-02-15, (USD -> 150)(GBP-> 150)(JPN->10) // combined transactions less than 2016-02-15
2, 2016-01-10, None
2, 2016-02-01, (EURO-> 50) (GBP-> 100)

执行此操作的最佳方法是什么?我们有超过 3 亿笔交易,每个用户最多可以有 10,000 笔交易。

【问题讨论】:

在您的示例输出中,您为什么显示None 对应于reportDates 对应于UserTransactions DataFrame 中的第一个事务?你总是想“跳过”第一笔交易吗? 因为在第一笔交易中用户没有任何历史记录,所以总而言之,它显示 None 并且从第二笔交易开始,它将有类似 (USD -> 100)(GBP-> 150) 的总结 【参考方案1】:

下面的 sn-p 将满足您的要求。初始加入和聚合是通过 pyspark 的 Dataframe API 完成的。然后数据的分组(使用reduceByKey)和最终的数据集准备是通过RDD api完成的,因为它更适合这样的操作。

from datetime import datetime
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType
from pyspark.sql import functions as F

df1 = spark.createDataFrame([(1,'2016-01-05','USD',100),
(1,'2016-01-09','GBP',150),
(1,'2016-02-01','USD',50),
(1,'2016-02-10','JPN',10),
(2,'2016-01-10','EURO',50),
(2,'2016-01-10','GBP',100)],['id', 'tdate', 'currency', 'value'])

df2 = spark.createDataFrame([(1,'2016-01-05'),
(1,'2016-01-31'),
(1,'2016-02-15'),
(2,'2016-01-10'),
(2,'2016-02-01')],['user_id', 'report_date'])


func =  udf (lambda x: datetime.strptime(x, '%Y-%m-%d'), DateType()) ### function to convert string data type to date data type

df2 = df2.withColumn('tdate', func(df2.report_date))
df1 = df1.withColumn('tdate', func(df1.tdate))
result = df2.join(df1, (df1.id == df2.user_id) & (df1.tdate < df2.report_date), 'left_outer').select('user_id', 'report_date', 'currency', 'value').groupBy('user_id', 'report_date', 'currency').agg(F.sum('value').alias('value'))

data = result.rdd.map(lambda x: (x.user_id,x.report_date,x.currency,x.value)).keyBy(lambda x: (x[0],x[1])).mapValues(lambda x: filter(lambda x: bool(x),[(x[2],x[3]) if x[2] else None])).reduceByKey(lambda x,y: x + y).map(lambda x: (x[0][0],x[0][1], x[1]))

最终生成的结果如下图。

>>> spark.createDataFrame([ (x[0],x[1],str(x[2])) for x in data.collect()], ['id', 'date', 'values']).orderBy('id', 'date').show(20, False)
+---+----------+--------------------------------------------+
|id |date      |values                                      |
+---+----------+--------------------------------------------+
|1  |2016-01-05|[]                                          |
|1  |2016-01-31|[(u'USD', 100), (u'GBP', 150)]              |
|1  |2016-02-15|[(u'USD', 150), (u'GBP', 150), (u'JPN', 10)]|
|2  |2016-01-10|[]                                          |
|2  |2016-02-01|[(u'EURO', 50), (u'GBP', 100)]              |
+---+----------+--------------------------------------------+

【讨论】:

效果很好|1 |2016-01-05|Map() | |1 |2016-01-31|地图(USD -> 100, GBP -> 150) | |1 |2016-02-15|地图(USD -> 150, GBP -> 150, JPN -> 10)| |2 |2016-01-10|地图() | |2 |2016-02-01|地图(EURO -> 50, GBP -> 100) | +---+----------+---------------------------------- ----+【参考方案2】:

如果有人需要 Scala

case class Transaction(id: String, date: java.sql.Date, currency:Option[String], value: Option[Long])
case class Report(id:String, date:java.sql.Date)

def toDate(date: String): java.sql.Date = 
  val sf = new SimpleDateFormat("yyyy-MM-dd")
  new java.sql.Date(sf.parse(date).getTime)


val allTransactions = Seq(
  Transaction("1", toDate("2016-01-05"),Some("USD"),Some(100L)),
  Transaction("1", toDate("2016-01-09"),Some("GBP"),Some(150L)),
  Transaction("1",toDate("2016-02-01"),Some("USD"),Some(50L)),
  Transaction("1",toDate("2016-02-10"),Some("JPN"),Some(10L)),
  Transaction("2",toDate("2016-01-10"),Some("EURO"),Some(50L)),
  Transaction("2",toDate("2016-01-10"),Some("GBP"),Some(100L))
)
val allReports = Seq(
  Report("1",toDate("2016-01-05")),
  Report("1",toDate("2016-01-31")),
  Report("1",toDate("2016-02-15")),
  Report("2",toDate("2016-01-10")),
  Report("2",toDate("2016-02-01"))
)

val transections:Dataset[Transaction]  = spark.createDataFrame(allTransactions).as[Transaction]
val reports: Dataset[Report] = spark.createDataFrame(allReports).as[Report]

val result = reports.alias("rp").join(transections.alias("tx"), (col("tx.id") === col("rp.id")) && (col("tx.date") < col("rp.date")), "left_outer")
  .select("rp.id", "rp.date", "currency", "value")
  .groupBy("rp.id", "rp.date", "currency").agg(sum("value"))
  .toDF("id", "date", "currency", "value")
  .as[Transaction]

val data = result.rdd.keyBy(x => (x.id , x.date))
  .mapValues(x => if (x.currency.isDefined) collection.Map[String, Long](x.currency.get -> x.value.get) else collection.Map[String, Long]())
  .reduceByKey((x,y) => x ++ y).map(x => (x._1._1, x._1._2, x._2))
  .toDF("id", "date", "map")
  .orderBy("id", "date")

控制台输出

+---+----------+--------------------------------------+
|id |date      |map                                   |
+---+----------+--------------------------------------+
|1  |2016-01-05|Map()                                 |
|1  |2016-01-31|Map(GBP -> 150, USD -> 100)           |
|1  |2016-02-15|Map(USD -> 150, GBP -> 150, JPN -> 10)|
|2  |2016-01-10|Map()                                 |
|2  |2016-02-01|Map(GBP -> 100, EURO -> 50)           |
+---+----------+--------------------------------------+

【讨论】:

以上是关于折叠火花数据框中的列值的主要内容,如果未能解决你的问题,请参考以下文章

将行值转换为火花数据框中的列数组

根据火花数据框scala中的列值过滤行

在火花中比较两个数据框中的列

如何对一个数据框中的列值求和并将结果添加为另一个数据框中的列?

遍历火花数据框中的列并计算最小值最大值

如何从 SQL 中的列值中提取特定部分(Redshift 平台)