使用 Scala 将列分配给 Spark Dataframe 中的另一列
Posted
技术标签:
【中文标题】使用 Scala 将列分配给 Spark Dataframe 中的另一列【英文标题】:Assigning columns to another columns in a Spark Dataframe using Scala 【发布时间】:2019-03-09 08:07:46 【问题描述】:我正在研究这个极好的问题,以提高我的 Scala 技能和答案:Extract a column value and assign it to another column as an array in spark dataframe
我创建了修改后的代码,如下所示,但还有几个问题:
import spark.implicits._
import org.apache.spark.sql.functions._
val df = sc.parallelize(Seq(
("r1", 1, 1),
("r2", 6, 4),
("r3", 4, 1),
("r4", 1, 2)
)).toDF("ID", "a", "b")
val uniqueVal = df.select("b").distinct().map(x => x.getAs[Int](0)).collect.toList
def myfun: Int => List[Int] = _ => uniqueVal
def myfun_udf = udf(myfun)
df.withColumn("X", myfun_udf( col("b") )).show
+---+---+---+---------+
| ID| a| b| X|
+---+---+---+---------+
| r1| 1| 1|[1, 4, 2]|
| r2| 6| 4|[1, 4, 2]|
| r3| 4| 1|[1, 4, 2]|
| r4| 1| 2|[1, 4, 2]|
+---+---+---+---------+
有效,但是:
我注意到 b 列被放入了两次。 我也可以在第二条语句的 a 列中输入,得到相同的结果。例如。那是什么意思呢?如果我输入 col ID,那么它会变为 null。 那么,我想知道为什么要输入第二个列? 如何使这对所有列都通用?df.withColumn("X", myfun_udf( col("a") )).show
所以,这是我在别处看过的代码,但我遗漏了一些东西。
【问题讨论】:
【参考方案1】:您显示的代码没有多大意义:
它不可扩展 - 在最坏的情况下,每行的大小与大小成正比 您已经发现它根本不需要争论。 在编写它时不需要(重要的是它不需要)udf
(在 2016 年 12 月 23 日 Spark 1.6 和 2.0 已经发布)
如果您仍想使用 udf
零变量就足够了
总体而言,这只是当时为 OP 服务的另一个令人费解且具有误导性的答案。我会忽略(或vote accordingly)并继续前进。
那么如何做到这一点:
如果你有一个本地列表并且你真的想使用udf
。对于单个序列,使用 udf
和 nullary
函数:
val uniqueBVal: Seq[Int] = ???
val addUniqueBValCol = udf(() => uniqueBVal)
df.withColumn("X", addUniqueBValCol())
概括为:
import scala.reflect.runtime.universe.TypeTag
def addLiteral[T : TypeTag](xs: Seq[T]) = udf(() => xs)
val x = addLiteral[Int](uniqueBVal)
df.withColumn("X", x())
最好不要使用udf
:
import org.apache.spark.sql.functions._
df.withColumn("x", array(uniquBVal map lit: _*))
截至
以及如何使其对所有列通用?
如开头所述,整个概念很难辩护。任一窗口函数(完全不可扩展)
import org.apache.spark.sql.expressions.Window
val w = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.select($"*" +: df.columns.map(c => collect_set(c).over(w).alias(s"$c_unique")): _*)
或与聚合交叉连接(大部分时间不可扩展)
val uniqueValues = df.select(
df.columns map (c => collect_set(col(c)).alias(s"$c_unique")):_*
)
df.crossJoin(uniqueValues)
但总的来说 - 你必须重新考虑你的方法,如果这出现在任何实际应用程序中,除非你确定,列的基数很小并且有严格的上限。
带走的信息是 - 不要相信随机人在互联网上发布的随机代码。包括这个。
【讨论】:
以上是关于使用 Scala 将列分配给 Spark Dataframe 中的另一列的主要内容,如果未能解决你的问题,请参考以下文章
如何读取 csv 文件并将值分配给 spark scala 中的变量
如何将列添加到 mapPartitions 内的 org.apache.spark.sql.Row