如何使用 Spark 数据框列上的函数或方法使用 Scala 进行转换
Posted
技术标签:
【中文标题】如何使用 Spark 数据框列上的函数或方法使用 Scala 进行转换【英文标题】:How to use a function or method on a Spark data frame column for transformation using Scala 【发布时间】:2017-01-24 00:22:16 【问题描述】:我在 scala 中创建了一个与 ORACLE DECODE 函数等效的函数。我想将该函数与 SPARK 数据框列一起使用。我已经尝试过了,但遇到了数据类型不匹配的多个问题。 我不想为每个程序创建 UDF。我想创建一些通用的东西并多次重复使用它。
功能:
def ODECODE(column: Any, Param: Any*) : Any =
var index = 0
while (index < Param.length)
var P = Param(index)
var Q = column
if (P.equals(Q))
return Param(index + 1)
else index = index + 1
return Param (Param.length - 1)
我想像这样使用它:
假设“Emp”是一个数据框,其中包含来自员工表的数据,其中包含列(名字、姓氏、等级)。
Emp.select(ODECODE("grade", "A", 1, "B", 2, "C", 3, "FAIL")).show()
这是一个例子。成绩列中的数据类型可以是字符串或整数。因此,我将解码函数(上图)中的数据类型设为 ANY,但使用 Dataframes 它不执行转换。它会导致数据类型不匹配。
我想为一些不受支持的 Oracle 函数创建单独的函数/方法,并在我的转换需要时重用它们。因此,任何使这项工作的建议都值得赞赏。
【问题讨论】:
Spark 是否支持 DECODE 功能?我搜索了文档但找不到它 【参考方案1】:我知道这已经晚了,但我确实需要这个并找到了你的例子。我能够通过一些更改来实现它。虽然我不是专家,但可能有更好的方法。
import util.control.Breaks._;
def ODECODE[T](column: String, params: Seq[T]) : String =
try
var index = 0;
breakable
while (index < params.length)
var P = params(index);
var Q = column;
if(P.equals(Q))
break;
index += 1;
params(index - 1).toString;
catch
case ife: Exception =>
ife.printStackTrace();
"0";
println(ODECODE("TEST", 0, "TEgST", 8, "***", 0))
【讨论】:
嗨@protocal-X,你能把上面的函数转换成PySpark函数吗?以上是关于如何使用 Spark 数据框列上的函数或方法使用 Scala 进行转换的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark - 在作为列表的 spark 数据框列上使用 reducebykey
如何在大量数据框列上应用我的 single_space 函数? [关闭]