如何在 spark scala 中检查与其关联的列名和数据是不是匹配
Posted
技术标签:
【中文标题】如何在 spark scala 中检查与其关联的列名和数据是不是匹配【英文标题】:How to check whether column names and data associated with it matches or not in spark scala如何在 spark scala 中检查与其关联的列名和数据是否匹配 【发布时间】:2020-02-27 06:45:13 【问题描述】:假设我有如下几列:
EMP_ID, EMP_NAME, EMP_CONTACT
1, SIDDHESH, 544949461
现在我想验证数据是否与列名架构同步。对于EMP_NAME
,该列中的数据应仅为string
。我在引用this 链接后尝试了下面的代码,但它在我的代码的最后一行显示错误。
package com.sample
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row
class sample1
val spark = SparkSession.builder().master("local[*]").getOrCreate()
val data = spark.read.format("csv").option("header", "true").load("C:/Users/siddheshk2/Desktop/words.txt")
val originalSchema = data.schema
def validateColumns(row: Row): Row =
val emp_id = row.getAs[String]("EMP_ID")
val emp_name = row.getAs[String]("EMP_NAME")
val emp_contact = row.getAs[String]("EMP_CONTACT")
// do checking here and populate (err_col,err_val,err_desc) with values if applicable
Row.merge(row)
val validateDF = data.map row => validateColumns(row)
所以,它不接受我的代码val validateDF = data.map row => validateColumns(row)
的最后一行。我该如何解决这个问题?或者有没有其他有效的方法可以解决我的问题?
我输入了一条无效记录(第三条),如下所示:
EMP_ID,EMP_NAME,EMP_CONTACT
1,SIDDHESH,99009809
2,asdadsa, null
sidh,sidjh,1232
在这种情况下,我已经为 id
列输入了一个 string
值(应该是一个数字),因此在检查列架构及其数据后,它应该会抛出一个错误,指出记录不匹配根据列模式。
【问题讨论】:
在读取数据时,您始终可以添加架构或从文件中推断架构。请查收:***.com/questions/39926411/… 【参考方案1】:您只是错过了将您的 DataFrame 转换为 rdd 以应用 .map
操作的尝试:
import org.apache.spark.sql.Row
val validateDF = data.rdd.map row => validateColumns(row)
如果您想将其转换回 DataFrame,只需使用您的 sparkSession 即可:
val newSchema = // specify the schema of the new dataframe
val updatedDF = spark.createDataFrame(validateDF, newSchema)
【讨论】:
但是如何检查数据是否符合指定的模式? 如果我可以检查并丢弃行或整个数据,而不是创建一个新的数据框会更好。那么,我们该怎么做呢?【参考方案2】:Row 类有一个schema 属性。您可以通过遍历列并比较它们来使用它。为此,您可以使用==
运算符或使用here 描述的架构比较方法之一。
那么 validate 方法可能如下所示:
def isValid(row: Row): Boolean = originalSchema == row.schema
【讨论】:
我该如何处理或实现这个?检查我编辑的帖子 如何在我的代码中合并它?我也是 scala 的新手,所以不确定。 Spark 将尝试从数据本身推断架构。在您的示例中,整个 EMP_ID 将被转换为字符串,因为 Spark 看到有一个字符串。如果您没有指定架构,则会发生这种情况,或者您可以显式指定架构。如果将 EMP_ID 定义为 int Spark 将无法加载数据。总而言之,如果您指定正确的架构,则不需要任何上述验证功能以上是关于如何在 spark scala 中检查与其关联的列名和数据是不是匹配的主要内容,如果未能解决你的问题,请参考以下文章
如何在 if-else 条件下的列中使用 Spark 值 - Scala