当函数在具有自动检测模式的 spark 数据帧中不起作用时
Posted
技术标签:
【中文标题】当函数在具有自动检测模式的 spark 数据帧中不起作用时【英文标题】:When function in not working in spark data frame with auto detect schema 【发布时间】:2017-10-04 11:54:01 【问题描述】:我有一个文本文件,我正在将其作为 CSV 文件读取到 spark 数据框中。 现在加入后,当我写一个函数以选择列时,我得到了以下异常。
这是我的代码加载 csv 文件
val df = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/MAIN")
val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*)
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*)
val df2 = sqlContext.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("s3://trfsdisu/SPARK/FinancialLineItem/INCR")
val df2With_ = df2.toDF(df2.columns.map(_.replace(".", "_")): _*)
val df2column_to_keep = df2With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq
val df2result = df2With_.select(df2column_to_keep.head, df2column_to_keep.tail: _*)
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("LineItem_organizationId", "LineItem_lineItemId").orderBy($"TimeStamp".cast(LongType).desc)
val latestForEachKey = df2result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")
这是我的架构
latestForEachKey.printSchema()
root
|-- DataPartiotion: string (nullable = true)
|-- LineItem_organizationId: long (nullable = true)
|-- LineItem_lineItemId: integer (nullable = true)
|-- StatementTypeCode_1: string (nullable = true)
|-- LineItemName_1: string (nullable = true)
|-- LocalLanguageLabel_1: string (nullable = true)
|-- FinancialConceptLocal_1: string (nullable = true)
|-- FinancialConceptGlobal_1: string (nullable = true)
|-- IsDimensional_1: boolean (nullable = true)
|-- InstrumentId_1: string (nullable = true)
|-- LineItemSequence_1: string (nullable = true)
|-- PhysicalMeasureId_1: string (nullable = true)
|-- FinancialConceptCodeGlobalSecondary_1: string (nullable = true)
|-- IsRangeAllowed_1: string (nullable = true)
|-- IsSegmentedByOrigin_1: string (nullable = true)
|-- SegmentGroupDescription_1: string (nullable = true)
|-- SegmentChildDescription_1: string (nullable = true)
|-- SegmentChildLocalLanguageLabel_1: string (nullable = true)
|-- LocalLanguageLabel_languageId_1: string (nullable = true)
|-- LineItemName_languageId_1: integer (nullable = true)
|-- SegmentChildDescription_languageId_1: string (nullable = true)
|-- SegmentChildLocalLanguageLabel_languageId_1: string (nullable = true)
|-- SegmentGroupDescription_languageId_1: string (nullable = true)
|-- SegmentMultipleFundbDescription_1: string (nullable = true)
|-- SegmentMultipleFundbDescription_languageId_1: string (nullable = true)
|-- IsCredit_1: string (nullable = true)
|-- FinancialConceptLocalId_1: string (nullable = true)
|-- FinancialConceptGlobalId_1: string (nullable = true)
|-- FinancialConceptCodeGlobalSecondaryId_1: string (nullable = true)
|-- FFAction_1: string (nullable = true)
df1result.printSchema()
root
|-- LineItem_organizationId: long (nullable = true)
|-- LineItem_lineItemId: integer (nullable = true)
|-- StatementTypeCode: string (nullable = true)
|-- LineItemName: string (nullable = true)
|-- LocalLanguageLabel: string (nullable = true)
|-- FinancialConceptLocal: string (nullable = true)
|-- FinancialConceptGlobal: string (nullable = true)
|-- IsDimensional: boolean (nullable = true)
|-- InstrumentId: string (nullable = true)
|-- LineItemSequence: string (nullable = true)
|-- PhysicalMeasureId: string (nullable = true)
|-- FinancialConceptCodeGlobalSecondary: string (nullable = true)
|-- IsRangeAllowed: boolean (nullable = true)
|-- IsSegmentedByOrigin: boolean (nullable = true)
|-- SegmentGroupDescription: string (nullable = true)
|-- SegmentChildDescription: string (nullable = true)
|-- SegmentChildLocalLanguageLabel: string (nullable = true)
|-- LocalLanguageLabel_languageId: integer (nullable = true)
|-- LineItemName_languageId: integer (nullable = true)
|-- SegmentChildDescription_languageId: integer (nullable = true)
|-- SegmentChildLocalLanguageLabel_languageId: integer (nullable = true)
|-- SegmentGroupDescription_languageId: integer (nullable = true)
|-- SegmentMultipleFundbDescription: string (nullable = true)
|-- SegmentMultipleFundbDescription_languageId: integer (nullable = true)
|-- IsCredit: boolean (nullable = true)
|-- FinancialConceptLocalId: integer (nullable = true)
|-- FinancialConceptGlobalId: integer (nullable = true)
|-- FinancialConceptCodeGlobalSecondaryId: string (nullable = true)
|-- FFAction: string (nullable = true)
这是我得到错误的地方
val dfMainOutput = df1result.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer")
.select($"LineItem_organizationId", $"LineItem_lineItemId",
when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"),
when($"LocalLanguageLabel_1".isNotNull, $"LocalLanguageLabel_1").otherwise($"LocalLanguageLabel").as("LocalLanguageLabel"),
when($"FinancialConceptLocal_1".isNotNull, $"FinancialConceptLocal_1").otherwise($"FinancialConceptLocal").as("FinancialConceptLocal"),
when($"FinancialConceptGlobal_1".isNotNull, $"FinancialConceptGlobal_1").otherwise($"FinancialConceptGlobal").as("FinancialConceptGlobal"),
when($"IsDimensional_1".isNotNull, $"IsDimensional_1").otherwise($"IsDimensional").as("IsDimensional"),
when($"InstrumentId_1".isNotNull, $"InstrumentId_1").otherwise($"InstrumentId").as("InstrumentId"),
when($"LineItemLineItemName_1".isNotNull, $"LineItemLineItemName_1").otherwise($"LineItemLineItemName").as("LineItemLineItemName"),
when($"PhysicalMeasureId_1".isNotNull, $"PhysicalMeasureId_1").otherwise($"PhysicalMeasureId").as("PhysicalMeasureId"),
when($"FinancialConceptCodeGlobalSecondary_1".isNotNull, $"FinancialConceptCodeGlobalSecondary_1").otherwise($"FinancialConceptCodeGlobalSecondary").as("FinancialConceptCodeGlobalSecondary"),
when($"IsRangeAllowed_1".isNotNull, $"IsRangeAllowed_1").otherwise($"IsRangeAllowed").as("IsRangeAllowed"),
when($"IsSegmentedByOrigin_1".isNotNull, $"IsSegmentedByOrigin_1").otherwise($"IsSegmentedByOrigin").as("IsSegmentedByOrigin"),
when($"SegmentGroupDescription_1".isNotNull, $"SegmentGroupDescription_1").otherwise($"SegmentGroupDescription").as("SegmentGroupDescription"),
when($"SegmentChildDescription_1".isNotNull, $"SegmentChildDescription_1").otherwise($"SegmentChildDescription").as("SegmentChildDescription"),
when($"SegmentChildLocalLanguageLabel_1".isNotNull, $"SegmentChildLocalLanguageLabel_1").otherwise($"SegmentChildLocalLanguageLabel").as("SegmentChildLocalLanguageLabel"),
when($"LocalLanguageLabel_languageId_1".isNotNull, $"LocalLanguageLabel_languageId_1").otherwise($"LocalLanguageLabel_languageId").as("LocalLanguageLabel_languageId"),
when($"LineItemName_languageId_1".isNotNull, $"LineItemName_languageId_1").otherwise($"LineItemName_languageId").as("LineItemName_languageId"),
when($"SegmentChildDescription_languageId_1".isNotNull, $"SegmentChildDescription_languageId_1").otherwise($"SegmentChildDescription_languageId").as("SegmentChildDescription_languageId"),
when($"SegmentChildLocalLanguageLabel_languageId_1".isNotNull, $"SegmentChildLocalLanguageLabel_languageId_1").otherwise($"SegmentChildLocalLanguageLabel_languageId").as("SegmentChildLocalLanguageLabel_languageId"),
when($"SegmentGroupDescription_languageId_1".isNotNull, $"SegmentGroupDescription_languageId_1").otherwise($"SegmentGroupDescription_languageId").as("SegmentGroupDescription_languageId"),
when($"SegmentMultipleFundbDescription_1".isNotNull, $"SegmentMultipleFundbDescription_1").otherwise($"SegmentMultipleFundbDescription").as("SegmentMultipleFundbDescription"),
when($"SegmentMultipleFundbDescription_languageId_1".isNotNull, $"SegmentMultipleFundbDescription_languageId_1").otherwise($"SegmentMultipleFundbDescription_languageId").as("SegmentMultipleFundbDescription_languageId"),
when($"IsCredit_1".isNotNull, $"IsCredit_1").otherwise($"IsCredit").as("IsCredit"),
when($"FinancialConceptLocalId_1".isNotNull, $"FinancialConceptLocalId_1").otherwise($"FinancialConceptLocalId").as("FinancialConceptLocalId"),
when($"FinancialConceptGlobalId_1".isNotNull, $"FinancialConceptGlobalId_1").otherwise($"FinancialConceptGlobalId").as("FinancialConceptGlobalId"),
when($"FinancialConceptCodeGlobalSecondaryId_1".isNotNull, $"FinancialConceptCodeGlobalSecondaryId_1").otherwise($"FinancialConceptCodeGlobalSecondaryId").as("FinancialConceptCodeGlobalSecondaryId"),
when($"FFAction_1".isNotNull, $"FFAction_1").otherwise($"FFAction").as("FFAction"))
.filter(!$"FFAction".contains("D"))
dfMainOutput.write
.format("csv")
.option("quote", "\uFEFF")
.option("codec", "gzip")
.save("s3://trfsdisu/SPARK/FinancialLineItem/output")
下面是我的例外
org.apache.spark.sql.AnalysisException: cannot resolve 'CASE WHEN (`IsRangeAllowed_1` IS NOT NULL) THEN `IsRangeAllowed_1` ELSE `IsRangeAllowed` END' due to data type mismatch: THEN and ELSE expressions should all be same type or coercible to a common type;
我在加载 CSV 时没有提到任何类型。
我将两者都加载为没有架构的 csv 文件,但其中一个 IsRangeAllowed_1 是字符串,而另一个是 BooleanType
有了这个,我还想问一个问题。
我们如何删除数据帧输出中的默认分隔符,并将我们的自定义分隔符与分区和 gzip 压缩一起放置?
dfMainOutput.rdd.saveAsTextFile("s3://trfsdisu/SPARK/FinancialLineItem/output")
【问题讨论】:
【参考方案1】:对于第一个问题,即WHEN THEN和ELSE表达式都应该是相同类型或通用类型;
但是这里IsRangeAllowed
是Boolean
而IsRangeAllowed_1
是String
。因此,将其中一列转换为字符串或布尔值。所以代码更改可能是
import org.apache.spark.sql.types.DataTypes
when($"IsRangeAllowed_1".isNotNull, $"IsRangeAllowed_1")
.otherwise($"IsRangeAllowed".cast(DataTypes.StringType))
.as("IsRangeAllowed")
我们如何删除数据帧输出中的默认分隔符,并将我们的自定义分隔符与分区和 gzip 压缩一起放置?
DataFrame
可以直接用delimiter
和codec
保存,无需调用底层rdd
例如dfMainOutput.rdd
。即:
dfMainOutput.write
.format("csv")
.option("delimiter", "!")
.option("codec", "gzip")
.save("s3://trfsdisu/SPARK/FinancialLineItem/output")
编辑:根据 concat_ws 示例的注释
df.withColumn("colmn", concat_ws("|!|", $"IsRangeAllowed_1", "IsRangeAllowed", ...)
.selectExpr("colmn")
.show()
//to add all columns in df
df.withColumn("colmn", concat_ws("|!|", df.cols:_*))
.selectExpr("colmn")
.show()
【讨论】:
当函数工作正常时谢谢...我现在正在搜索分隔符。再做一件事如果我进行分区,那么我正在执行分区的列确实会出现在输出中..跨度> 是的,分区列不会出现在输出文件中。但存在于路径中。 有什么办法可以在输出文件中添加它..在火花创建目录而不是文件..我必须手动做吗 使用别名复制分区列,以便文件中可以存在一列 让我们continue this discussion in chat。以上是关于当函数在具有自动检测模式的 spark 数据帧中不起作用时的主要内容,如果未能解决你的问题,请参考以下文章
是否可以以相同或不同的顺序将具有相同标题或标题子集的多个 csv 文件读取到 spark 数据帧中?
当 shuffle 分区大于 200 时会发生啥(数据帧中的 spark.sql.shuffle.partitions 200(默认情况下))