在 spark scala 中为数据帧中的每个组采样不同数量的随机行
Posted
技术标签:
【中文标题】在 spark scala 中为数据帧中的每个组采样不同数量的随机行【英文标题】:Sample a different number of random rows for every group in a dataframe in spark scala 【发布时间】:2018-03-01 19:25:12 【问题描述】:目标是为每个组在数据框中采样(不替换)不同数量的行。为特定组采样的行数在另一个数据框中。
示例:idDF 是要从中采样的数据帧。这些组由 ID 列表示。数据框 planDF 指定要为每个组采样的行数,其中“datesToUse”表示行数,“ID”表示组。 “totalDates”是该组的总行数,可能有用也可能没用。
最终结果应该有从第一组 (ID 1) 中采样的 3 行、从第二组 (ID 2) 中采样的 2 行和从第三组 (ID 3) 中采样的 1 行。
val idDF = Seq(
(1, "2017-10-03"),
(1, "2017-10-22"),
(1, "2017-11-01"),
(1, "2017-10-02"),
(1, "2017-10-09"),
(1, "2017-12-24"),
(1, "2017-10-20"),
(2, "2017-11-17"),
(2, "2017-11-12"),
(2, "2017-12-02"),
(2, "2017-10-03"),
(3, "2017-12-18"),
(3, "2017-11-21"),
(3, "2017-12-13"),
(3, "2017-10-08"),
(3, "2017-10-16"),
(3, "2017-12-04")
).toDF("ID", "date")
val planDF = Seq(
(1, 3, 7),
(2, 2, 4),
(3, 1, 6)
).toDF("ID", "datesToUse", "totalDates")
这是一个结果数据框应该是什么样子的示例:
+---+----------+
| ID| date|
+---+----------+
| 1|2017-10-22|
| 1|2017-11-01|
| 1|2017-10-20|
| 2|2017-11-12|
| 2|2017-10-03|
| 3|2017-10-16|
+---+----------+
到目前为止,我尝试使用 DataFrame 的示例方法:https://spark.apache.org/docs/1.5.0/api/java/org/apache/spark/sql/DataFrame.html 这是一个适用于整个数据框的示例。
def sampleDF(DF: DataFrame, datesToUse: Int, totalDates: Int): DataFrame =
val fraction = datesToUse/totalDates.toFloat.toDouble
DF.sample(false, fraction)
我不知道如何为每个组使用这样的东西。我尝试将 planDF 表连接到 idDF 表并使用窗口分区。
我的另一个想法是以某种方式创建一个随机标记为 True / false 的新列,然后对该列进行过滤。
【问题讨论】:
【参考方案1】:另一个完全保留在 Dataframes 中的选项是使用您的 planDF
计算概率,加入 idDF
,附加一列随机数,然后过滤。有用的是,sql.functions
有一个 rand
函数。
import org.apache.spark.sql.functions._
import spark.implicits._
val probabilities = planDF.withColumn("prob", $"datesToUse" / $"totalDates")
val dfWithProbs = idDF.join(probabilities, Seq("ID"))
.withColumn("rand", rand())
.where($"rand" < $"prob")
(您需要仔细检查这不是整数除法。)
【讨论】:
【参考方案2】:假设您的planDF 小到足以成为collect
ed,您可以使用Scala 的foldLeft
来遍历id
列表并按id
累积样本数据帧:
import org.apache.spark.sql.Row, DataFrame
def sampleByIdDF(DF: DataFrame, id: Int, datesToUse: Int, totalDates: Int): DataFrame =
val fraction = datesToUse.toDouble / totalDates
DF.where($"id" === id ).sample(false, fraction)
val emptyDF = Seq.empty[(Int, String)].toDF("ID", "date")
val planList = planDF.rdd.collect.map case Row(x: Int, y: Int, z: Int) => (x, y, z)
// planList: Array[(Int, Int, Int)] = Array((1,3,7), (2,2,4), (3,1,6))
planList.foldLeft( emptyDF )
case (accDF: DataFrame, (id: Int, num: Int, total: Int)) =>
accDF union sampleByIdDF(idDF, id, num, total)
// res1: org.apache.spark.sql.DataFrame = [ID: int, date: string]
// res1.show
// +---+----------+
// | ID| date|
// +---+----------+
// | 1|2017-10-03|
// | 1|2017-11-01|
// | 1|2017-10-02|
// | 1|2017-12-24|
// | 1|2017-10-20|
// | 2|2017-11-17|
// | 2|2017-11-12|
// | 2|2017-12-02|
// | 3|2017-11-21|
// | 3|2017-12-13|
// +---+----------+
请注意,方法sample()
不一定会生成方法参数中指定的确切样本数。这是一个相关的SO Q&A。
如果您的planDF 很大,您可能不得不考虑使用RDD 的aggregate,它具有以下签名(跳过隐式参数):
def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U
它的工作方式有点像foldLeft
,只是它在一个分区内有一个累加运算符,还有一个用于合并来自不同分区的结果。
【讨论】:
以上是关于在 spark scala 中为数据帧中的每个组采样不同数量的随机行的主要内容,如果未能解决你的问题,请参考以下文章
为spark scala中的数据框中的每个组采样不同数量的随机行
Spark SCALA - 连接两个数据帧,其中一个数据帧中的连接值位于第二个数据帧中的两个字段之间
如何根据 Spark Scala 中其他数据帧中的多列匹配过滤数据帧
过滤包含Scala Spark数据帧中数组的列中的数组长度[重复]