解析 xml 文件时,由于 spark 中的类型不匹配而无法解决爆炸

Posted

技术标签:

【中文标题】解析 xml 文件时,由于 spark 中的类型不匹配而无法解决爆炸【英文标题】:cannot resolve explode due to type mismatch in spark while parsing xml file 【发布时间】:2018-04-19 09:18:10 【问题描述】:

我有一个具有以下架构的数据框

root
 |-- DataPartition: long (nullable = true)
 |-- TimeStamp: string (nullable = true)
 |-- _organizationId: long (nullable = true)
 |-- _segmentId: long (nullable = true)
 |-- seg:BusinessSegments: struct (nullable = true)
 |    |-- seg:BusinessSegment: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- _VALUE: string (nullable = true)
 |    |    |    |-- _hierarchicalCode: long (nullable = true)
 |    |    |    |-- _industryId: long (nullable = true)
 |    |    |    |-- _ranking: long (nullable = true)
 |-- seg:GeographicSegments: struct (nullable = true)
 |    |-- seg:GeographicSegment: struct (nullable = true)
 |    |    |-- _geographyId: long (nullable = true)
 |    |    |-- seg:IsSubtracted: boolean (nullable = true)
 |    |    |-- seg:Sequence: long (nullable = true)
 |-- seg:IsCorporate: boolean (nullable = true)
 |-- seg:IsElimination: boolean (nullable = true)
 |-- seg:IsOperatingSegment: boolean (nullable = true)
 |-- seg:IsOther: boolean (nullable = true)
 |-- seg:IsShariaCompliant: boolean (nullable = true)
 |-- seg:PredecessorSegments: struct (nullable = true)
 |    |-- seg:PredecessorSegment: long (nullable = true)
 |-- seg:SegmentLocalLanguageLabel: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
 |    |-- _languageId: long (nullable = true)
 |-- seg:SegmentName: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
 |    |-- _languageId: long (nullable = true)
 |-- seg:SegmentType: string (nullable = true)
 |-- seg:SegmentTypeId: long (nullable = true)
 |-- seg:ValidFromPeriodEndDate: string (nullable = true)
 |-- _action: string (nullable = true)

现在我想从架构中获取 seg:BusinessSegments.seg:BusinessSegment 值。

但我的问题是当我使用explode 进行此操作时

val GeographicSegmentchildDF = parentDF.select($"DataPartition".as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId", $"_segmentId", explode($"seg:GeographicSegments.seg:GeographicSegment").as("GeographicSegments"), $"_action")
val GeographicSegmentchildArrayDF = GeographicSegmentchildDF.select(getDataPartition($"DataPartition").as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId".as("OrganizationId"), $"_segmentId".as("SegmentId"), $"GeographicSegments.*", getFFActionChild($"_action").as("FFAction|!|"))

所以在第一行我正在爆炸,在下一行我正在做 * 或扩展 $"GeographicSegments.*",

我收到类似的错误 这就是我正在做的事情

线程“主”org.apache.spark.sql.AnalysisException 中的异常: 无法解决 'explode(seg:GeographicSegments.seg:GeographicSegment)' 由于 数据类型不匹配:

我知道这个问题,因为在架构中我得到 seg:GeographicSegment 作为结构而不是数组,这就是我得到的原因。

所以真正的问题是我没有固定的架构。

当 xml 文件中有两条记录时,seg:GeographicSegment 变为数组,然后我的代码工作正常,但是当我只得到一条记录时,它作为结构工作,我的代码失败。

如何在我的代码中处理这个问题。 解析模式时是否必须设置条件? 还是我有呢

这是一种不起作用的情况

val columnTypePredecessorSegments = parentDF.select($"seg:PredecessorSegments.seg:PredecessorSegment").schema.map(_.dataType).head.toString().startsWith("LongType")
    //if column type is struct then use .* and array function to convert the struct to array else just use array
    val PredecessorSegmentschildDF = if (columnTypePredecessorSegments) 
      parentDF.select($"DataPartition".as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId", $"_segmentId", explode(array($"seg:PredecessorSegments.seg:PredecessorSegment")).as("PredecessorSegments"), $"_action")
     else 
      parentDF.select($"DataPartition".as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId", $"_segmentId", explode($"seg:PredecessorSegments.seg:PredecessorSegment").as("PredecessorSegments"), $"_action")
    
    val PredecessorSegmentsDFFinalChilddDF = PredecessorSegmentschildDF.select(getDataPartition($"DataPartition").as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId".as("OrganizationId"), $"_segmentId".as("SuccessorSegment"), $"PredecessorSegments.*", getFFActionChild($"_action").as("FFAction|!|"))
    PredecessorSegmentsDFFinalChilddDF.show(false)

【问题讨论】:

【参考方案1】:

当 xml 文件中有两条记录时,seg:GeographicSegment 变为数组,然后我的代码工作正常,但当我只得到一条记录时,它作为结构工作,我的代码失败。

那么你需要在使用explode之前检查列的数据类型

//checking for struct or array type in that column
val columnType = parentDF.select($"seg:GeographicSegments.seg:GeographicSegment").schema.map(_.dataType).head.toString().startsWith("StructType")

import org.apache.spark.sql.functions._
//if column type is struct then use .* and array function to convert the struct to array else just use array
val GeographicSegmentchildDF = if(columnType) 
  parentDF.select($"DataPartition".as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId", $"_segmentId", explode(array($"seg:GeographicSegments.seg:GeographicSegment.*")).as("GeographicSegments"), $"_action")

else 
  parentDF.select($"DataPartition".as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId", $"_segmentId", explode($"seg:GeographicSegments.seg:GeographicSegment").as("GeographicSegments"), $"_action")

val GeographicSegmentchildArrayDF = GeographicSegmentchildDF.select(getDataPartition($"DataPartition").as("DataPartition"), $"TimeStamp".as("TimeStamp"), $"_organizationId".as("OrganizationId"), $"_segmentId".as("SegmentId"), $"GeographicSegments.*", getFFActionChild($"_action").as("FFAction|!|"))

希望回答对你有帮助

【讨论】:

当我为 seg:PredecessorSegments 做同样的事情时,我收到错误 Exception in thread "main" org.apache.spark.sql.AnalysisException: Can only star expand struct data types. Attribute: ArrayBuffer(PredecessorSegments); 抱歉回复晚了..我已经更新了问题,因为在评论部分看起来不太好.. @AtharvThakur,更新后的代码正在检查与原始问题不同的 LongType。可以肯定的是,该解决方案不会起作用,因为 explode 对数组或地图很有用。您将不得不为该数据类型尝试另一种技术。可能是另一个问题可以帮助你得到答案。 :)

以上是关于解析 xml 文件时,由于 spark 中的类型不匹配而无法解决爆炸的主要内容,如果未能解决你的问题,请参考以下文章

spark 如何从 JSON 推断数字类型?

如何使用 spark databricks xml 解析器从 Hdfs 目录加载所有 xml 文件

解析 JSON 文件数据后,在结果 XML 中添加注释

利用不安全的 XML 和 ZIP 文件解析器创建 WebShell

将前导零添加到 Spark 数据框中的列 [重复]

Spark读取Hive时schema版本不匹配的解决方法