在火花作业scala中添加新列之前检查空条件[重复]

Posted

技术标签:

【中文标题】在火花作业scala中添加新列之前检查空条件[重复]【英文标题】:Checking null condition before adding new column in spark job scala [duplicate] 【发布时间】:2018-04-10 03:12:55 【问题描述】:

我有一个下面的架构

root
 |-- DataPartition: long (nullable = true)
 |-- TimeStamp: string (nullable = true)
 |-- _action: string (nullable = true)
 |-- env:Data: struct (nullable = true)
 |    |-- _type: string (nullable = true)
 |    |-- al:FundamentalAnalytic: struct (nullable = true)
 |    |    |-- _analyticItemInstanceKey: long (nullable = true)
 |    |    |-- _financialPeriodEndDate: string (nullable = true)
 |    |    |-- _financialPeriodType: string (nullable = true)
 |    |    |-- _isYearToDate: boolean (nullable = true)
 |    |    |-- _lineItemId: long (nullable = true)
 |    |    |-- al:AnalyticConceptCode: string (nullable = true)
 |    |    |-- al:AnalyticConceptId: long (nullable = true)
 |    |    |-- al:AnalyticIsEstimated: boolean (nullable = true)
 |    |    |-- al:AnalyticValue: struct (nullable = true)
 |    |    |    |-- _VALUE: double (nullable = true)
 |    |    |    |-- _currencyId: long (nullable = true)
 |    |    |-- al:AuditID: string (nullable = true)
 |    |    |-- al:FinancialPeriodTypeId: long (nullable = true)
 |    |    |-- al:FundamentalSeriesId: struct (nullable = true)
 |    |    |    |-- _VALUE: long (nullable = true)
 |    |    |    |-- _objectType: string (nullable = true)
 |    |    |    |-- _objectTypeId: long (nullable = true)
 |    |    |-- al:InstrumentId: long (nullable = true)
 |    |    |-- al:IsAnnual: boolean (nullable = true)
 |    |    |-- al:TaxonomyId: long (nullable = true)

现在这是一个经常变化的 xml 文件。 我只想处理包含 env:Data.sr:Source.* 的税款 为此我写了下面的代码

val dfType = dfContentItem.
    select(getDataPartition($"DataPartition").
        as("DataPartition"), 
        $"TimeStamp".as("TimeStamp"), 
        $"env:Data.sr:Source.*", 
        getFFActionParent($"_action")
        .as("FFAction|!|")
    ).filter($"env:Data.sr:Source._organizationId".isNotNull)
dfType.show(false)

但这仅在架构中找到sr:Source 时才有效,否则我会遇到异常

线程“主”org.apache.spark.sql.AnalysisException 中的异常:否 这样的结构字段 sr:Source in _type, cr:TRFCoraxData, fun:Fundamental, md:Identifier, md:Relationship;

忽略我对sr:Source 进行了空检查,但这对我不起作用。 对于该检查,我也遇到了同样的错误。

基本上我需要的是 env:Data.sr:Source.* 为 null 然后我想退出处理,下一个标签处理将重新开始。

【问题讨论】:

你的空值检查怎么样?请发布更多代码 @StevenBlack 类似这样的东西 ...val check=dfContentItem.select($"env:Data.sr:Source.*.isNotNull") spark 的哪个版本?是DataFrame 的架构吗? @StevenBlack SPARK VERSION IS 2.2.0 是的,它是我从 xml 文件创建的数据框 【参考方案1】:

org.apache.spark.sql.AnalysisException 通常在查询出现问题时被抛出 - 所以我很确定这是因为您试图在这些情况下过滤 null

scala 中的错误处理通常使用 Option 完成,还有 good article on it 试试看

def handleNulls(organizationId: String): Option[Boolean] = 
     val orgId = Option(organizationId).getOrElse(return None)
     Some()

val betterNullsUdf = udf[Option[Boolean], Integer](handleNulls)

val dfType = dfContentItem.
    select(getDataPartition($"DataPartition").
        as("DataPartition"), 
        $"TimeStamp".as("TimeStamp"), 
        betterNullsUdf($"env:Data.sr:Source.*"), 
        getFFActionParent($"_action")
        .as("FFAction|!|")
    ).filter($"env:Data.sr:Source._organizationId".isNotNull)
dfType.show(false)

【讨论】:

我在某些地方得到not enough arguments for method apply: (x: A)Some[A] in object Some. Unspecified value parameter x. ..我也有 env:Data.sr:Source.* 作为数组类型 在我的 env:Data.sr:Source.* 的情况下,Steven 也是 null 然后我想退出处理该标签的作业。我认为这不能处理

以上是关于在火花作业scala中添加新列之前检查空条件[重复]的主要内容,如果未能解决你的问题,请参考以下文章

如何将新列和相应的行特定值添加到火花数据帧?

如何使用scala数据框添加具有以下行值的新列[重复]

Spark和Scala,通过映射公用键添加具有来自另一个数据帧的值的新列[重复]

使用其他现有列 Spark/Scala 添加新列

以键为新列重塑键值对的火花数据框

根据scala中的条件对列进行火花数据框聚合