使用 Spark Scala 进行表数据转换

Posted

技术标签:

【中文标题】使用 Spark Scala 进行表数据转换【英文标题】:Table data conversion using Spark Scala 【发布时间】:2019-09-30 09:30:13 【问题描述】:

我有 Hive 表数据,我需要一些帮助才能将数据转换为下面显示的“预期数据表单”。

注意事项有两点:

    省略显示为空的列,例如省略第一行的 abs、ada、adw 列。 对于数组类型(例如:abs、ada、adw、alt)且值不为空的列,在数组中包含列名,如预期数据表单中所示,并保留一个名为 EVENTS 的外部列名.

有没有一种方法可以使用 Spark-sql 实现这一点,或者我需要编写一些 scala UDF。我需要 spark-scala 中的解决方案。任何帮助将非常感激。

Hive 表数据:

     _______________________________________________________________________________________________________
   | vin               |  tt |msg_type |  abs  |ada                     |  adw  | alt                      |
   |___________________|_____|_________|_______|________________________|_______|__________________________|
   | FU7XXXXXXXXXXXXXX |  0  |SIGNAL   | (null)|(null)                  | (null)|["E":15XXXXXXXX,"V":0.0]|
   |__________________ |_____|_________|______ |________________________|_______|__________________________|
   | FSXXXXXXXXXXXXXXX |  0  |SIGNAL   | (null)|["E":15XXXXXXXX,"V":1]| (null)|  (null)                  |
   |___________________|_____|_________|_______|________________________|_______|__________________________|

预期数据:

       _______________________________________________________________________
   | vin               |  tt |msg_type |  EVENTS                              |
   |___________________|_____|_________|______________________________________|
   | FU7XXXXXXXXXXXXXX |  0  |SIGNAL   | ["SN":"alt","E":15XXXXXXXX,"V":0.0]|
   |__________________ |_____|_________|______ _______________________________|
   | FSXXXXXXXXXXXXXXX |  0  |SIGNAL   | ["SN":"ada","E":15XXXXXXXX,"V":1]  |                
   |___________________|_____|_________|______________________________________|

【问题讨论】:

您能否以正确的表格形式向您提供数据 您好@Nikk,我已将示例数据更新为可读格式。 【参考方案1】:

我已经更新了您的输入数据并声明了一个字符串变量以使 UDF 通用。

 scala> df.show(false)
    +-----------------+---+--------+--------------------------+--------------------------+--------------------------+--------------------------+
    |vin              |tt |msg_type|abs                       |ada                       |adw                       |alt                       |
    +-----------------+---+--------+--------------------------+--------------------------+--------------------------+--------------------------+
    |FU7XXXXXXXXXXXXXX|0  |SIGNAL  |null                      |null                      |null                      |["E":15XXXXXXXX,"V":0.0]|
    |FU7XXXXXXXXXXXXXX|0  |SIGNAL  |null                      |["E":15XXXXXXXX,"V":1]  |null                      |null                      |
    |FU7XXXXXXXXXXXXXX|0  |SIGNAL  |["E":15XXXXXXXX,"V":2]  |null                      |null                      |null                      |
    |FU7XXXXXXXXXXXXXX|0  |SIGNAL  |null                      |null                      |["E":15XXXXXXXX,"V":3]  |null                      |
    |FU7XXXXXXXXXXXXXX|0  |SIGNAL  |null                      |null                      |["E":15XXXXXXXX,"V":4.1]|["E":15XXXXXXXX,"V":4.2]|
    |FU7XXXXXXXXXXXXXX|0  |SIGNAL  |["E":15XXXXXXXX,"V":5.1]|null                      |["E":15XXXXXXXX,"V":5.2]|null                      |
    |FU7XXXXXXXXXXXXXX|0  |SIGNAL  |["E":15XXXXXXXX,"V":6.1]|["E":15XXXXXXXX,"V":6.2]|["E":15XXXXXXXX,"V":6.3]|null                      |
    +-----------------+---+--------+--------------------------+--------------------------+--------------------------+--------------------------+

    //String of column name that we need to check for null in Event
    scala> val SingalColumns = "abs,ada, adw,alt"
    SingColumns: String = abs,ada, adw,alt

    //UDF declaration 
    scala> def EventUDF:UserDefinedFunction = udf((flagCol:String, r:Row) => var signal = ""
         |             val flagColList:List[String] = flagCol.reverse.split(s""",""").map(x => x.trim).mkString(",").reverse.split(s",").toList
         |             flagColList.foreach x => 
         |                   if (r.getAs(x) != null)
         |                     signal= signal + "," + """"SN":""" + x.toString +"," + r.getAs(x).toString.replaceFirst("\\[\\","").replaceFirst("\\]","")
         |                   
         |             
         |             signal.replaceFirst(s""",""", """[""").concat("]")
         |             )

    //final DataFrame
    scala> df.withColumn("Event", EventUDF(lit(SingalColumns),struct(df.columns map col: _*))).select("vin","tt","msg_type","Event").show(false)
    +-----------------+---+--------+-------------------------------------------------------------------------------------------------------+
    |vin              |tt |msg_type|Event                                                                                                  |
    +-----------------+---+--------+-------------------------------------------------------------------------------------------------------+
    |FU7XXXXXXXXXXXXXX|0  |SIGNAL  |["SN":alt,"E":15XXXXXXXX,"V":0.0]                                                                    |
    |FU7XXXXXXXXXXXXXX|0  |SIGNAL  |["SN":ada,"E":15XXXXXXXX,"V":1]                                                                      |
    |FU7XXXXXXXXXXXXXX|0  |SIGNAL  |["SN":abs,"E":15XXXXXXXX,"V":2]                                                                      |
    |FU7XXXXXXXXXXXXXX|0  |SIGNAL  |["SN":adw,"E":15XXXXXXXX,"V":3]                                                                      |
    |FU7XXXXXXXXXXXXXX|0  |SIGNAL  |["SN":adw,"E":15XXXXXXXX,"V":4.1,"SN":alt,"E":15XXXXXXXX,"V":4.2]                                  |
    |FU7XXXXXXXXXXXXXX|0  |SIGNAL  |["SN":abs,"E":15XXXXXXXX,"V":5.1,"SN":adw,"E":15XXXXXXXX,"V":5.2]                                  |
    |FU7XXXXXXXXXXXXXX|0  |SIGNAL  |["SN":abs,"E":15XXXXXXXX,"V":6.1,"SN":ada,"E":15XXXXXXXX,"V":6.2,"SN":adw,"E":15XXXXXXXX,"V":6.3]|
    +-----------------+---+--------+-------------------------------------------------------------------------------------------------------+

【讨论】:

非常感谢@Nikk 的解决方案,效果很好。 你好@Nikk,我正面临一个问题,在一行中,如果 SN 名称保持不变,那么我得到的是:["SN":ALT,"E":157XXXXXXX ,"V":20.3XXXXX, "E":157XXXXXXXX,"V":20.3XXXXXXXX] 预计为:["SN":ALT,"E":157XXXXXX,"V":20.3XXXXXXX, "SN":ALT,"E":157XXXXXXX,"V":20.3XXXXXXX] 你能提供那行吗?

以上是关于使用 Spark Scala 进行表数据转换的主要内容,如果未能解决你的问题,请参考以下文章

使用 Spark Scala 将 Sql Server 数据类型转换为 Hive 数据类型

基于Spark Scala中的条件转置Dataframe中的特定列和行

将 DATASet Api 与 Spark Scala 结合使用

Spark程序进行单元测试-使用scala

如何在 Scala 中将数据帧转换为 Apache Spark 中的数据集?

如何将行数据转置/旋转到 Spark Scala 中的列? [复制]