火花数据框中列表列的样本值

Posted

技术标签:

【中文标题】火花数据框中列表列的样本值【英文标题】:Sample values from a list colum in spark dataframe 【发布时间】:2017-04-08 20:50:26 【问题描述】:

我有一个 spark-scala 数据框,如下面的 df1 所示:我想根据 df1 另一列中的计数从分数列(列表)中进行替换采样。

val df1 = sc.parallelize(Seq(("a1",2,List(20,10)),("a2",1,List(30,10)),
("a3",3,List(10)),("a4",2,List(10,20,40)))).toDF("colA","counts","scores")

df1.show()
+----+------+------------+
|colA|counts|      scores|
+----+------+------------+
|  a1|     2|    [20, 10]|
|  a2|     1|    [30, 10]|
|  a3|     3|        [10]|
|  a4|     2|[10, 20, 40]|
+----+------+------------+

预期输出显示在 df2 中:从第 1 行开始,从列表 [20,10] 中采样 2 个值;从列表 [30,10] 中的第 2 行采样 1 个值;从第 3 行采样 3 个来自 list[10] 的重复值……等等。

df2.show() //expected output
+----+------+------------+-------------+
|colA|counts|      scores|sampledScores|
+----+------+------------+-------------+
|  a1|     2|    [20, 10]|     [20, 10]|
|  a2|     1|    [30, 10]|         [30]|
|  a3|     3|        [10]| [10, 10, 10]|
|  a4|     2|[10, 20, 40]|     [10, 40]|
+----+------+------------+-------------+

我写了一个 udf 'takeSample' 并应用于 df1 但没有按预期工作。

val takeSample = udf((a:Array[Int], count1:Int) => Array.fill(count1)(
a(new Random(System.currentTimeMillis).nextInt(a.size)))
)

val df2 = df1.withColumn("SampledScores", takeSample(df1("Scores"),df1("counts")))

我收到以下运行时错误;执行时

df2.printSchema()
root
 |-- colA: string (nullable = true)
 |-- counts: integer (nullable = true)
 |-- scores: array (nullable = true)
 |    |-- element: integer (containsNull = false)
 |-- SampledScores: array (nullable = true)
 |    |-- element: integer (containsNull = false)

df2.show()
org.apache.spark.SparkException: Failed to execute user defined   
function($anonfun$1: (array<int>, int) => array<int>)
Caused by: java.lang.ClassCastException:  
scala.collection.mutable.WrappedArray$ofRef cannot be cast to [I
at $anonfun$1.apply(<console>:47)

非常感谢任何解决方案。

【问题讨论】:

您应该尝试澄清输出中的确切内容与预期不符。还要尝试使您的 udf 更具可读性而不是长线。 【参考方案1】:

在 UDF 中将数据类型从 Array[Int] 更改为 Seq[Int] 将解决该问题:

val takeSample = udf((a:Seq[Int], count1:Int) => Array.fill(count1)(
a(new Random(System.currentTimeMillis).nextInt(a.size)))
)

val df2 = df1.withColumn("SampledScores", takeSample(df1("Scores"),df1("counts")))

它会给我们预期的输出:

df2.printSchema()
root
 |-- colA: string (nullable = true)
 |-- counts: integer (nullable = true)
 |-- scores: array (nullable = true)
 |    |-- element: integer (containsNull = false)
 |-- SampledScores: array (nullable = true)
 |    |-- element: integer (containsNull = false)

df2.show
+----+------+------------+-------------+
|colA|counts|      scores|SampledScores|
+----+------+------------+-------------+
|  a1|     2|    [20, 10]|     [20, 20]|
|  a2|     1|    [30, 10]|         [30]|
|  a3|     3|        [10]| [10, 10, 10]|
|  a4|     2|[10, 20, 40]|     [20, 20]|
+----+------+------------+-------------+

【讨论】:

以上是关于火花数据框中列表列的样本值的主要内容,如果未能解决你的问题,请参考以下文章

计算火花数据框中所有列(300 列)的每个不同值的出现次数

在火花数据框中聚合期间过滤数组值

如何截断火花数据框列的值? [复制]

在火花数据框中使用 for 循环添加新列

聚合火花数据框中的多列(所有组合)

Pyspark:如何将现有非空列的元组列表作为数据框中的列值之一返回