spark scala将嵌套的数据框转换为嵌套的数据集
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark scala将嵌套的数据框转换为嵌套的数据集相关的知识,希望对你有一定的参考价值。
我有一个嵌套的数据框“ inputFlowRecordsAgg
”,它具有以下模式
root
|-- FlowI.key: string (nullable = true)
|-- FlowS.minFlowTime: long (nullable = true)
|-- FlowS.maxFlowTime: long (nullable = true)
|-- FlowS.flowStartedCount: long (nullable = true)
|-- FlowI.DestPort: integer (nullable = true)
|-- FlowI.SrcIP: struct (nullable = true)
| |-- bytes: binary (nullable = true)
|-- FlowI.DestIP: struct (nullable = true)
| |-- bytes: binary (nullable = true)
|-- FlowI.L4Protocol: byte (nullable = true)
|-- FlowI.Direction: byte (nullable = true)
|-- FlowI.Status: byte (nullable = true)
|-- FlowI.Mac: string (nullable = true)
希望转换为以下案例类别的嵌套数据集
case class InputFlowV1(val FlowI: FlowI,
val FlowS: FlowS)
case class FlowI(val Mac: String,
val SrcIP: IPAddress,
val DestIP: IPAddress,
val DestPort: Int,
val L4Protocol: Byte,
val Direction: Byte,
val Status: Byte,
var key: String = "")
case class FlowS(var minFlowTime: Long,
var maxFlowTime: Long,
var flowStartedCount: Long)
但是当我尝试使用inputFlowRecordsAgg.as [InputFlowV1]
cannot resolve '`FlowI`' given input columns: [FlowI.DestIP,FlowI.Direction, FlowI.key, FlowS.maxFlowTime, FlowI.SrcIP, FlowS.flowStartedCount, FlowI.L4Protocol, FlowI.Mac, FlowI.DestPort, FlowS.minFlowTime, FlowI.Status];
org.apache.spark.sql.AnalysisException: cannot resolve '`FlowI`' given input columns: [FlowI.DestIP,FlowI.Direction, FlowI.key, FlowS.maxFlowTime, FlowI.SrcIP, FlowS.flowStartedCount, FlowI.L4Protocol, FlowI.Mac, FlowI.DestPort, FlowS.minFlowTime, FlowI.Status];
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
一个评论问我完整的代码,在这里
def getReducedFlowR(inputFlowRecords: Dataset[InputFlowV1],
@transient spark: SparkSession): Dataset[InputFlowV1]={
val inputFlowRecordsAgg = inputFlowRecords.groupBy(column("FlowI.key") as "FlowI.key")
.agg(min("FlowS.minFlowTime") as "FlowS.minFlowTime" , max("FlowS.maxFlowTime") as "FlowS.maxFlowTime",
sum("FlowS.flowStartedCount") as "FlowS.flowStartedCount"
, first("FlowI.Mac") as "FlowI.Mac"
, first("FlowI.SrcIP") as "FlowI.SrcIP" , first("FlowI.DestIP") as "FlowI.DestIP"
,first("FlowI.DestPort") as "FlowI.DestPort"
, first("FlowI.L4Protocol") as "FlowI.L4Protocol"
, first("FlowI.Direction") as "FlowI.Direction" , first("FlowI.Status") as "FlowI.Status")
inputFlowRecordsAgg.printSchema()
return inputFlowRecordsAgg.as[InputFlowV1]
}
答案
原因是您的案例类架构与实际数据架构不匹配,请检查下面的案例类架构。尝试将案例类模式与数据模式匹配,它将起作用。
您的案例类模式为:
scala> df.printSchema
root
|-- FlowI: struct (nullable = true)
| |-- Mac: string (nullable = true)
| |-- SrcIP: string (nullable = true)
| |-- DestIP: string (nullable = true)
| |-- DestPort: integer (nullable = false)
| |-- L4Protocol: byte (nullable = false)
| |-- Direction: byte (nullable = false)
| |-- Status: byte (nullable = false)
| |-- key: string (nullable = true)
|-- FlowS: struct (nullable = true)
| |-- minFlowTime: long (nullable = false)
| |-- maxFlowTime: long (nullable = false)
| |-- flowStartedCount: long (nullable = false)
以上是关于spark scala将嵌套的数据框转换为嵌套的数据集的主要内容,如果未能解决你的问题,请参考以下文章
Scala Spark - 从简单的数据框创建嵌套的 json 输出