在 Spark 的嵌套 XML 中为来自父数据框的子数据框添加额外的列
Posted
技术标签:
【中文标题】在 Spark 的嵌套 XML 中为来自父数据框的子数据框添加额外的列【英文标题】:Add extra column for child data frame from parent data frame in nested XML in Spark 【发布时间】:2018-02-07 14:27:05 【问题描述】:我在加载许多 XML 文件后创建数据。
每个 xml 文件都有一个唯一字段fun:DataPartitionId
我正在从一个 XML 文件创建许多行。
现在我想为 XML 生成的行中的每一行添加此 fun:DataPartitionId
。
例如,假设第一个 XML 有 100 行,那么每 100 行将有相同的 fun:DataPartitionId
字段。
所以fun:DataPartitionId
是每个 XML 中的标头。
这就是我正在做的。
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark. SparkConf, SparkContext
import java.sql.Date, Timestamp
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
val getDataPartition = udf (DataPartition: String) =>
if (DataPartition=="1") "SelfSourcedPublic"
else if (DataPartition=="2") "Japan"
else if (DataPartition=="3") "SelfSourcedPrivate"
else "ThirdPartyPrivate"
val getFFActionParent = udf (FFAction: String) =>
if (FFAction=="Insert") "I|!|"
else if (FFAction=="Overwrite") "I|!|"
else "D|!|"
val getFFActionChild = udf (FFAction: String) =>
if (FFAction=="Insert") "I|!|"
else if (FFAction=="Overwrite") "O|!|"
else "D|!|"
val dfContentEnvelope = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "env:ContentEnvelope").load("s3://trfsmallfffile/XML")
val dfDataPartition=getDataPartition(dfContentEnvelope("env:Header.fun:DataPartitionId"))
val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select("column1.*")
val df =dfContentItem.withColumn("DataPartition",dfDataPartition)
df.show()
【问题讨论】:
【参考方案1】:当您使用
阅读xml
文件时
val dfContentEnvelope = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "env:ContentEnvelope").load("s3://trfsmallfffile/XML")
DataParitionId
列被读取为Long
fun:DataPartitionId: long (nullable = true)
所以你应该把udf
函数改成
val getDataPartition = udf (DataPartition: Long) =>
if (DataPartition== 1) "SelfSourcedPublic"
else if (DataPartition== 2) "Japan"
else if (DataPartition== 3) "SelfSourcedPrivate"
else "ThirdPartyPrivate"
如果可能,您应该使用 when 函数而不是 udf 函数来提高处理速度和内存使用率
现在我想为 xml 生成的行中的每一行添加这个 fun:DataPartitionId。
你的错误是你忘了select
那个特定的列,所以下面的代码
val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select("column1.*")
应该是
val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select($"env:Header.fun:DataPartitionId".as("DataPartitionId"),$"column1.*")
然后就可以应用udf
函数了
val df = dfContentItem.select(getDataPartition($"DataPartitionId"), $"env:Data.sr:Source.*", $"_action".as("FFAction|!|"))
所以作为一个整体的工作代码应该是
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark. SparkConf, SparkContext
import java.sql.Date, Timestamp
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
val getDataPartition = udf (DataPartition: Long) =>
if (DataPartition=="1") "SelfSourcedPublic"
else if (DataPartition=="2") "Japan"
else if (DataPartition=="3") "SelfSourcedPrivate"
else "ThirdPartyPrivate"
val dfContentEnvelope = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "env:ContentEnvelope").load("s3://trfsmallfffile/XML")
val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select($"env:Header.fun:DataPartitionId".as("DataPartitionId"),$"column1.*")
val df = dfContentItem.select(getDataPartition($"DataPartitionId"), $"env:Data.sr:Source.*", $"_action".as("FFAction|!|"))
df.show(false)
您可以继续执行其余代码。
【讨论】:
以上是关于在 Spark 的嵌套 XML 中为来自父数据框的子数据框添加额外的列的主要内容,如果未能解决你的问题,请参考以下文章
在 spark 数据框中的嵌套 json 中将部分父 Schema 列添加到子项