NotNull 条件不适用于 spark 数据框 scala 中的 withColumn 条件
Posted
技术标签:
【中文标题】NotNull 条件不适用于 spark 数据框 scala 中的 withColumn 条件【英文标题】:NotNull condition is not working for withColumn condition in spark data frame scala 【发布时间】:2018-04-23 12:53:44 【问题描述】:所以我试图在找到它时添加列,但我不想在 xml 架构中不存在列时添加。 这就是我正在做的事情,我想我在检查条件时做错了什么。
val temp = tempNew1
.withColumn("BookMark", when($"AsReportedItem.fs:BookMark".isNotNull or $"AsReportedItem.fs:BookMark" =!= "", 0))
.withColumn("DocByteOffset", when($"AsReportedItem.fs:DocByteOffset".isNotNull or $"AsReportedItem.fs:DocByteOffset" =!= "", 0))
.withColumn("DocByteLength", when($"AsReportedItem.fs:DocByteLength".isNotNull or $"AsReportedItem.fs:DocByteLength" =!= "", 0))
.withColumn("EditedDescription", when($"AsReportedItem.fs:EditedDescription".isNotNull or $"AsReportedItem.fs:EditedDescription" =!= "", 0))
.withColumn("EditedDescription", when($"AsReportedItem.fs:EditedDescription._VALUE".isNotNull or $"AsReportedItem.fs:EditedDescription._VALUE" =!= "", 0))
.withColumn("EditedDescription_languageId", when($"AsReportedItem.fs:EditedDescription._languageId".isNotNull or $"AsReportedItem.fs:EditedDescription._languageId" =!= "", 0))
.withColumn("ReportedDescription", when($"AsReportedItem.fs:ReportedDescription._VALUE".isNotNull or $"AsReportedItem.fs:ReportedDescription._VALUE" =!= "", 0))
.withColumn("ReportedDescription_languageId", when($"AsReportedItem.fs:ReportedDescription._languageId".isNotNull or $"AsReportedItem.fs:ReportedDescription._languageId" =!= "", 0))
.withColumn("FinancialAsReportedLineItemName_languageId", when($"FinancialAsReportedLineItemName._languageId".isNotNull or $"FinancialAsReportedLineItemName._languageId" =!= "", 0))
.withColumn("FinancialAsReportedLineItemName", when($"FinancialAsReportedLineItemName._VALUE".isNotNull or $"FinancialAsReportedLineItemName._VALUE" =!= "", 0))
.withColumn("PeriodPermId_objectTypeId", when($"PeriodPermId._objectTypeId".isNotNull or $"PeriodPermId._objectTypeId" =!= "", 0))
.withColumn("PeriodPermId", when($"PeriodPermId._VALUE".isNotNull or $"PeriodPermId._VALUE" =!= "", 0))
.drop($"AsReportedItem").drop($"AsReportedItem")
但是当我找到列时,它对我来说很好,但是当 tempNew1
中不存在列时,我得到错误。
如果在架构中找不到标签,我根本不想使用 withColumn。
安慰我在这里失踪。请帮我找出问题。
我得到的错误如下
线程“主”org.apache.spark.sql.AnalysisException 中的异常: 无法解析给定输入列的“
AsReportedItem.fs:BookMark
”: [IsAsReportedCurrencySetManually,
这也是我试过的
def hasColumn(df: DataFrame, path: String) = Try(df(path)).isSuccess
val temp = tempNew1.withColumn("BookMark", when(hasColumn(tempNew1,"AsReportedItem.fs:BookMark") == true, $"AsReportedItem.fs:BookMark"))
但无法使其充分发挥作用..
这是可行的,但我怎样才能为所有列编写它。
val temp = if (hasColumn(tempNew1, "AsReportedItem"))
tempNew1
.withColumn("BookMark", $"AsReportedItem.fs:BookMark")
.withColumn("DocByteOffset", $"AsReportedItem.fs:DocByteOffset")
.withColumn("DocByteLength", $"AsReportedItem.fs:DocByteLength")
.withColumn("EditedDescription", $"AsReportedItem.fs:EditedDescription")
.withColumn("EditedDescription", $"AsReportedItem.fs:EditedDescription._VALUE")
.withColumn("EditedDescription_languageId", $"AsReportedItem.fs:EditedDescription._languageId")
.withColumn("ReportedDescription", $"AsReportedItem.fs:ReportedDescription._VALUE")
.withColumn("ReportedDescription_languageId", $"AsReportedItem.fs:ReportedDescription._languageId")
.withColumn("FinancialAsReportedLineItemName_languageId", $"FinancialAsReportedLineItemName._languageId")
.withColumn("FinancialAsReportedLineItemName", $"FinancialAsReportedLineItemName._VALUE")
.withColumn("PeriodPermId_objectTypeId", $"PeriodPermId._objectTypeId")
.withColumn("PeriodPermId", $"PeriodPermId._VALUE")
.drop($"AsReportedItem")
else
tempNew1
.withColumn("BookMark", lit(null))
.withColumn("DocByteOffset", lit(null))
.withColumn("DocByteLength", lit(null))
.withColumn("EditedDescription", lit(null))
.withColumn("EditedDescription", lit(null))
.withColumn("EditedDescription_languageId", lit(null))
.withColumn("ReportedDescription", lit(null))
.withColumn("ReportedDescription_languageId", lit(null))
.withColumn("FinancialAsReportedLineItemName_languageId", $"FinancialAsReportedLineItemName._languageId")
.withColumn("FinancialAsReportedLineItemName", $"FinancialAsReportedLineItemName._VALUE")
.withColumn("PeriodPermId_objectTypeId", $"PeriodPermId._objectTypeId")
.withColumn("PeriodPermId", $"PeriodPermId._VALUE")
.drop($"AsReportedItem")
添加主数据框的架构
root
|-- DataPartition: string (nullable = true)
|-- TimeStamp: string (nullable = true)
|-- PeriodId: long (nullable = true)
|-- SourceId: long (nullable = true)
|-- FinancialStatementLineItem_lineItemId: long (nullable = true)
|-- FinancialStatementLineItem_lineItemInstanceKey: long (nullable = true)
|-- StatementCurrencyId: long (nullable = true)
|-- StatementTypeCode: string (nullable = true)
|-- uniqueFundamentalSet: long (nullable = true)
|-- AuditID: string (nullable = true)
|-- EstimateMethodCode: string (nullable = true)
|-- EstimateMethodId: long (nullable = true)
|-- FinancialAsReportedLineItemName: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _languageId: long (nullable = true)
|-- FinancialStatementLineItemSequence: long (nullable = true)
|-- FinancialStatementLineItemValue: double (nullable = true)
|-- FiscalYear: long (nullable = true)
|-- IsAnnual: boolean (nullable = true)
|-- IsAsReportedCurrencySetManually: boolean (nullable = true)
|-- IsCombinedItem: boolean (nullable = true)
|-- IsDerived: boolean (nullable = true)
|-- IsExcludedFromStandardization: boolean (nullable = true)
|-- IsFinal: boolean (nullable = true)
|-- IsTotal: boolean (nullable = true)
|-- PeriodEndDate: string (nullable = true)
|-- PeriodPermId: struct (nullable = true)
| |-- _VALUE: long (nullable = true)
| |-- _objectTypeId: long (nullable = true)
|-- ReportedCurrencyId: long (nullable = true)
|-- StatementSectionCode: string (nullable = true)
|-- StatementSectionId: long (nullable = true)
|-- StatementSectionIsCredit: boolean (nullable = true)
|-- SystemDerivedTypeCode: string (nullable = true)
|-- SystemDerivedTypeCodeId: long (nullable = true)
|-- Unit: double (nullable = true)
|-- UnitEnumerationId: long (nullable = true)
|-- FFAction|!|: string (nullable = true)
|-- PartitionYear: long (nullable = true)
|-- PartitionStatement: string (nullable = true)
在模式中出现列后添加模式
|-- uniqueFundamentalSet: long (nullable = true)
|-- AsReportedItem: struct (nullable = true)
| |-- fs:BookMark: string (nullable = true)
| |-- fs:DocByteLength: long (nullable = true)
| |-- fs:DocByteOffset: long (nullable = true)
| |-- fs:EditedDescription: struct (nullable = true)
| | |-- _VALUE: string (nullable = true)
| | |-- _languageId: long (nullable = true)
| |-- fs:ItemDisplayedNegativeFlag: boolean (nullable = true)
| |-- fs:ItemDisplayedValue: double (nullable = true)
| |-- fs:ItemScalingFactor: long (nullable = true)
| |-- fs:ReportedDescription: struct (nullable = true)
| | |-- _VALUE: string (nullable = true)
| | |-- _languageId: long (nullable = true)
| |-- fs:ReportedValue: double (nullable = true)
|-- EstimateMethodCode: string (nullable = true)
|-- EstimateMethodId: long (nullable = true)
|-- FinancialAsReportedLineItemName: struct (nullable = true)
| |-- _VALUE: string (nullable = true)
| |-- _languageId: long (nullable = true)
|-- FinancialLineItemSource: long (nullable = true)
【问题讨论】:
您可以检查tempNew1
数据集的columns
属性是否存在AsReportedItem.fs:BookMark
列,并根据结果有条件地调用withColumn
。请参阅 (***.com/questions/35904136/…) 了解更多详情
@AlexSavitsky 但我有 10 个这样的专栏,我必须一一做吗?
是的。但是,您可以将列放在 Seq
中,根据数据集列对其进行过滤,然后使用 withColumn
折叠数据集,使其具有某种功能风格
@AlexSavitsky 我刚刚尝试使用 hasColumn 但缺少一些东西..如果你能抽出一些时间,请看一下语法..
正如@AlexSavitsky 指出你要使用 foldleft,使用这个想法,但提供的代码将不起作用。您将不得不使用他的想法来处理您拥有的结构列,并且您将解决它;)
【参考方案1】:
我将向您展示在 AsReportedItem 结构列上应用逻辑的一般方法(为了清楚起见,我在代码中进行了注释)
//required column names even though the elements are not present in AsReportedItem struct column
val requiredAsReportedItemColumns = Array("BookMark", "DocByteOffset", "DocByteLength", "EditedDescription", "EditedDescription", "EditedDescription_languageId", "ReportedDescription", "ReportedDescription_languageId")
//selecting the elements of AsReportedItem struct columns for checking condition using when
//checking for structfields inside the selected struct field
def getFields(parent: String, schema: StructType): Seq[String] = schema.fields.flatMap
case StructField(name, t: StructType, _, _) => getFields(parent + name + ".", t)
case StructField(name, _, _, _) => Seq(s"$parent$name")
//checking for struct column if present the get the fields of nested structs as well
val AsReportedItemColumns = if(tempNew1.columns.contains("AsReportedItem")) getFields("", tempNew1.select("AsReportedItem.*").schema).toArray.map(x => x.substring(3, x.length)) else Array.empty[String]
//finding difference between required number of columns and the elements present in AsReportedItem struct column
val notInAsReportedItemColumns = requiredAsReportedItemColumns.diff(AsReportedItemColumns.map(x => x.toString.replace(".", "")))
//checking condition for the elements present in AsReportedItem struct column
val temp_for_AsReportedItem = AsReportedItemColumns.foldLeft(tempNew1)(tempdf, name) => tempdf.withColumn(name.replace(".", ""), col(s"AsReportedItem.fs:$name"))
//populating nulls for the columns that are not present in AsReportedItem struct column
val final_AsReportedItem = notInAsReportedItemColumns.foldLeft(temp_for_AsReportedItem)(tempdf, name)=> tempdf.withColumn(name, lit(null)).drop("AsReportedItem")
对其余两个结构列 FinancialAsReportedLineItemName
和 PeriodPermId
应用相同的逻辑,但在转换后的数据帧上,即 在 final_AsReportedItem
和 不在 tempNew1
>
归功于 https://***.com/a/47104148/5880706
【讨论】:
收到错误Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'AsReportedItem.*' give input columns 'PartitionYear, EstimateMethodId, EstimateMethodCode
我在问题中添加了这一点
好的,所以这不会引发任何错误..但是我需要在获得列后爆炸..
我正在寻找一个对不起延迟的例子
你应该看看***.com/questions/47103823/…【参考方案2】:
将其作为答案,因为它对于 cmets 来说太大了。
假设您有一组要添加的列:
val cols = Seq("BookMark")
您需要在原来的DataFrame
上反复调用withColumn
,将结果分配给新的DataFrame
。有一个函数式操作可以做到这一点,称为fold
:
val result = cols.foldLeft(tempNew1)((df, name) =>
df.withColumn(name, if (df.column.contains(s"AsReportedItem.fs:$name"))
col(s"AsReportedItem.fs:$name") else lit("null")))
fold
接受第一个参数(在您的情况下为tempNew1
)并为cols
中的每个元素调用提供的函数,每次将结果分配给新的DataFrame
【讨论】:
我没有收到错误,但对于所有记录,我只收到空值以上是关于NotNull 条件不适用于 spark 数据框 scala 中的 withColumn 条件的主要内容,如果未能解决你的问题,请参考以下文章
mkString 和 sortByKey 不适用于 Spark 中的数组