如何使用 Spark 数据框列上的函数或方法使用 Scala 进行转换

Posted

技术标签:

【中文标题】如何使用 Spark 数据框列上的函数或方法使用 Scala 进行转换【英文标题】:How to use a function or method on a Spark data frame column for transformation using Scala 【发布时间】:2017-01-24 00:22:16 【问题描述】:

我在 scala 中创建了一个与 ORACLE DECODE 函数等效的函数。我想将该函数与 SPARK 数据框列一起使用。我已经尝试过了,但遇到了数据类型不匹配的多个问题。 我不想为每个程序创建 UDF。我想创建一些通用的东西并多次重复使用它。

功能:

def ODECODE(column: Any, Param: Any*) : Any = 
    var index = 0
    while (index < Param.length) 
      var P = Param(index)
      var Q = column
      if (P.equals(Q))
        return Param(index + 1)
      else index = index + 1
    
    return Param (Param.length - 1)

我想像这样使用它:

假设“Emp”是一个数据框,其中包含来自员工表的数据,其中包含列(名字、姓氏、等级)。

Emp.select(ODECODE("grade", "A", 1, "B", 2, "C", 3, "FAIL")).show()

这是一个例子。成绩列中的数据类型可以是字符串或整数。因此,我将解码函数(上图)中的数据类型设为 ANY,但使用 Dataframes 它不执行转换。它会导致数据类型不匹配。

我想为一些不受支持的 Oracle 函数创建单独的函数/方法,并在我的转换需要时重用它们。因此,任何使这项工作的建议都值得赞赏。

【问题讨论】:

Spark 是否支持 DECODE 功能?我搜索了文档但找不到它 【参考方案1】:

我知道这已经晚了,但我确实需要这个并找到了你的例子。我能够通过一些更改来实现它。虽然我不是专家,但可能有更好的方法。

import util.control.Breaks._;

def ODECODE[T](column: String, params: Seq[T]) : String = 

    try 

        var index = 0;

        breakable 

          while (index < params.length) 

              var P = params(index);
              var Q = column;

              if(P.equals(Q)) 

                  break;
              

              index += 1;
          
        

        params(index - 1).toString;

    catch 

        case ife: Exception => 

            ife.printStackTrace();

            "0";
    




println(ODECODE("TEST", 0, "TEgST", 8, "***", 0))

【讨论】:

嗨@protocal-X,你能把上面的函数转换成PySpark函数吗?

以上是关于如何使用 Spark 数据框列上的函数或方法使用 Scala 进行转换的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark - 在作为列表的 spark 数据框列上使用 reducebykey

PySpark:根据另一列的顺序收集数据框列上的集合

熊猫数据框列上的子字符串

如何在大量数据框列上应用我的 single_space 函数? [关闭]

如何使用 Spark(Java)在数据集的所有列上并行应用相同的函数

在 Spark 中将数据框列转换为向量