使用 Scala 将列分配给 Spark Dataframe 中的另一列

Posted

技术标签:

【中文标题】使用 Scala 将列分配给 Spark Dataframe 中的另一列【英文标题】:Assigning columns to another columns in a Spark Dataframe using Scala 【发布时间】:2019-03-09 08:07:46 【问题描述】:

我正在研究这个极好的问题,以提高我的 Scala 技能和答案:Extract a column value and assign it to another column as an array in spark dataframe

我创建了修改后的代码,如下所示,但还有几个问题:

import spark.implicits._   
import org.apache.spark.sql.functions._

val df = sc.parallelize(Seq(
    ("r1", 1, 1),
    ("r2", 6, 4),
    ("r3", 4, 1),
    ("r4", 1, 2)
  )).toDF("ID", "a", "b")

val uniqueVal = df.select("b").distinct().map(x => x.getAs[Int](0)).collect.toList    
def myfun: Int => List[Int] = _ => uniqueVal 
def myfun_udf = udf(myfun)

df.withColumn("X", myfun_udf( col("b") )).show

+---+---+---+---------+
| ID|  a|  b|        X|
+---+---+---+---------+
| r1|  1|  1|[1, 4, 2]|
| r2|  6|  4|[1, 4, 2]|
| r3|  4|  1|[1, 4, 2]|
| r4|  1|  2|[1, 4, 2]|
+---+---+---+---------+

有效,但是:

我注意到 b 列被放入了两次。 我也可以在第二条语句的 a 列中输入,得到相同的结果。例如。那是什么意思呢?

df.withColumn("X", myfun_udf( col("a") )).show

如果我输入 col ID,那么它会变为 null。 那么,我想知道为什么要输入第二个列? 如何使这对所有列都通用?

所以,这是我在别处看过的代码,但我遗漏了一些东西。

【问题讨论】:

【参考方案1】:

您显示的代码没有多大意义:

它不可扩展 - 在最坏的情况下,每行的大小与大小成正比 您已经发现它根本不需要争论。 在编写它时不需要(重要的是它不需要)udf(在 2016 年 12 月 23 日 Spark 1.6 和 2.0 已经发布) 如果您仍想使用 udf 零变量就足够了

总体而言,这只是当时为 OP 服务的另一个令人费解且具有误导性的答案。我会忽略(或vote accordingly)并继续前进。

那么如何做到这一点:

如果你有一个本地列表并且你真的想使用udf。对于单个序列,使用 udfnullary 函数:

val uniqueBVal: Seq[Int] = ???
val addUniqueBValCol = udf(() => uniqueBVal)

df.withColumn("X", addUniqueBValCol())

概括为:

import scala.reflect.runtime.universe.TypeTag

def addLiteral[T : TypeTag](xs: Seq[T]) = udf(() => xs)

val x = addLiteral[Int](uniqueBVal)
df.withColumn("X", x())

最好不要使用udf

import org.apache.spark.sql.functions._

df.withColumn("x", array(uniquBVal map lit: _*))

截至

以及如何使其对所有列通用?

如开头所述,整个概念很难辩护。任一窗口函数(完全不可扩展)

import org.apache.spark.sql.expressions.Window

val w = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.select($"*" +: df.columns.map(c => collect_set(c).over(w).alias(s"$c_unique")): _*)

或与聚合交叉连接(大部分时间不可扩展)

val uniqueValues = df.select(
  df.columns map (c => collect_set(col(c)).alias(s"$c_unique")):_*
)
df.crossJoin(uniqueValues)

但总的来说 - 你必须重新考虑你的方法,如果这出现在任何实际应用程序中,除非你确定,列的基数很小并且有严格的上限。

带走的信息是 - 不要相信随机人在互联网上发布的随机代码。包括这个。

【讨论】:

以上是关于使用 Scala 将列分配给 Spark Dataframe 中的另一列的主要内容,如果未能解决你的问题,请参考以下文章

如何读取 csv 文件并将值分配给 spark scala 中的变量

Spark scala将数据框列复制到新数据框

将列添加到 RDD Spark 1.2.1

如何将列添加到 mapPartitions 内的 org.apache.spark.sql.Row

如何在 if-else 条件下的列中使用 Spark 值 - Scala

如何规范化 spark (scala) 中的列中的全角字符