使用 Spark Scala 进行表数据转换
Posted
技术标签:
【中文标题】使用 Spark Scala 进行表数据转换【英文标题】:Table data conversion using Spark Scala 【发布时间】:2019-09-30 09:30:13 【问题描述】:我有 Hive 表数据,我需要一些帮助才能将数据转换为下面显示的“预期数据表单”。
注意事项有两点:
-
省略显示为空的列,例如省略第一行的 abs、ada、adw 列。
对于数组类型(例如:abs、ada、adw、alt)且值不为空的列,在数组中包含列名,如预期数据表单中所示,并保留一个名为 EVENTS 的外部列名.
有没有一种方法可以使用 Spark-sql 实现这一点,或者我需要编写一些 scala UDF。我需要 spark-scala 中的解决方案。任何帮助将非常感激。
Hive 表数据:
_______________________________________________________________________________________________________
| vin | tt |msg_type | abs |ada | adw | alt |
|___________________|_____|_________|_______|________________________|_______|__________________________|
| FU7XXXXXXXXXXXXXX | 0 |SIGNAL | (null)|(null) | (null)|["E":15XXXXXXXX,"V":0.0]|
|__________________ |_____|_________|______ |________________________|_______|__________________________|
| FSXXXXXXXXXXXXXXX | 0 |SIGNAL | (null)|["E":15XXXXXXXX,"V":1]| (null)| (null) |
|___________________|_____|_________|_______|________________________|_______|__________________________|
预期数据:
_______________________________________________________________________
| vin | tt |msg_type | EVENTS |
|___________________|_____|_________|______________________________________|
| FU7XXXXXXXXXXXXXX | 0 |SIGNAL | ["SN":"alt","E":15XXXXXXXX,"V":0.0]|
|__________________ |_____|_________|______ _______________________________|
| FSXXXXXXXXXXXXXXX | 0 |SIGNAL | ["SN":"ada","E":15XXXXXXXX,"V":1] |
|___________________|_____|_________|______________________________________|
【问题讨论】:
您能否以正确的表格形式向您提供数据 您好@Nikk,我已将示例数据更新为可读格式。 【参考方案1】:我已经更新了您的输入数据并声明了一个字符串变量以使 UDF 通用。
scala> df.show(false)
+-----------------+---+--------+--------------------------+--------------------------+--------------------------+--------------------------+
|vin |tt |msg_type|abs |ada |adw |alt |
+-----------------+---+--------+--------------------------+--------------------------+--------------------------+--------------------------+
|FU7XXXXXXXXXXXXXX|0 |SIGNAL |null |null |null |["E":15XXXXXXXX,"V":0.0]|
|FU7XXXXXXXXXXXXXX|0 |SIGNAL |null |["E":15XXXXXXXX,"V":1] |null |null |
|FU7XXXXXXXXXXXXXX|0 |SIGNAL |["E":15XXXXXXXX,"V":2] |null |null |null |
|FU7XXXXXXXXXXXXXX|0 |SIGNAL |null |null |["E":15XXXXXXXX,"V":3] |null |
|FU7XXXXXXXXXXXXXX|0 |SIGNAL |null |null |["E":15XXXXXXXX,"V":4.1]|["E":15XXXXXXXX,"V":4.2]|
|FU7XXXXXXXXXXXXXX|0 |SIGNAL |["E":15XXXXXXXX,"V":5.1]|null |["E":15XXXXXXXX,"V":5.2]|null |
|FU7XXXXXXXXXXXXXX|0 |SIGNAL |["E":15XXXXXXXX,"V":6.1]|["E":15XXXXXXXX,"V":6.2]|["E":15XXXXXXXX,"V":6.3]|null |
+-----------------+---+--------+--------------------------+--------------------------+--------------------------+--------------------------+
//String of column name that we need to check for null in Event
scala> val SingalColumns = "abs,ada, adw,alt"
SingColumns: String = abs,ada, adw,alt
//UDF declaration
scala> def EventUDF:UserDefinedFunction = udf((flagCol:String, r:Row) => var signal = ""
| val flagColList:List[String] = flagCol.reverse.split(s""",""").map(x => x.trim).mkString(",").reverse.split(s",").toList
| flagColList.foreach x =>
| if (r.getAs(x) != null)
| signal= signal + "," + """"SN":""" + x.toString +"," + r.getAs(x).toString.replaceFirst("\\[\\","").replaceFirst("\\]","")
|
|
| signal.replaceFirst(s""",""", """[""").concat("]")
| )
//final DataFrame
scala> df.withColumn("Event", EventUDF(lit(SingalColumns),struct(df.columns map col: _*))).select("vin","tt","msg_type","Event").show(false)
+-----------------+---+--------+-------------------------------------------------------------------------------------------------------+
|vin |tt |msg_type|Event |
+-----------------+---+--------+-------------------------------------------------------------------------------------------------------+
|FU7XXXXXXXXXXXXXX|0 |SIGNAL |["SN":alt,"E":15XXXXXXXX,"V":0.0] |
|FU7XXXXXXXXXXXXXX|0 |SIGNAL |["SN":ada,"E":15XXXXXXXX,"V":1] |
|FU7XXXXXXXXXXXXXX|0 |SIGNAL |["SN":abs,"E":15XXXXXXXX,"V":2] |
|FU7XXXXXXXXXXXXXX|0 |SIGNAL |["SN":adw,"E":15XXXXXXXX,"V":3] |
|FU7XXXXXXXXXXXXXX|0 |SIGNAL |["SN":adw,"E":15XXXXXXXX,"V":4.1,"SN":alt,"E":15XXXXXXXX,"V":4.2] |
|FU7XXXXXXXXXXXXXX|0 |SIGNAL |["SN":abs,"E":15XXXXXXXX,"V":5.1,"SN":adw,"E":15XXXXXXXX,"V":5.2] |
|FU7XXXXXXXXXXXXXX|0 |SIGNAL |["SN":abs,"E":15XXXXXXXX,"V":6.1,"SN":ada,"E":15XXXXXXXX,"V":6.2,"SN":adw,"E":15XXXXXXXX,"V":6.3]|
+-----------------+---+--------+-------------------------------------------------------------------------------------------------------+
【讨论】:
非常感谢@Nikk 的解决方案,效果很好。 你好@Nikk,我正面临一个问题,在一行中,如果 SN 名称保持不变,那么我得到的是:["SN":ALT,"E":157XXXXXXX ,"V":20.3XXXXX, "E":157XXXXXXXX,"V":20.3XXXXXXXX] 预计为:["SN":ALT,"E":157XXXXXX,"V":20.3XXXXXXX, "SN":ALT,"E":157XXXXXXX,"V":20.3XXXXXXX] 你能提供那行吗?以上是关于使用 Spark Scala 进行表数据转换的主要内容,如果未能解决你的问题,请参考以下文章
使用 Spark Scala 将 Sql Server 数据类型转换为 Hive 数据类型
基于Spark Scala中的条件转置Dataframe中的特定列和行
将 DATASet Api 与 Spark Scala 结合使用