检查列数据类型并仅对 Spark SQL 中的整数和小数执行 SQL

Posted

技术标签:

【中文标题】检查列数据类型并仅对 Spark SQL 中的整数和小数执行 SQL【英文标题】:Check column datatype and execute SQL only on Integer and Decimal in Spark SQL 【发布时间】:2017-07-25 20:31:54 【问题描述】:

我正在尝试检查输入 Parquet 文件中列的数据类型,如果数据类型是整数或小数,则运行 Spark SQL。

//get Array of structfields 
 val datatypes = parquetRDD_subset.schema.fields

//Check datatype of column
 for (val_datatype <- datatypes)  if (val_datatype.dataType.typeName == "integer" || val_datatype.dataType.typeName.contains("decimal"))  

 //get the field name
val x = parquetRDD_subset.schema.fieldNames

 val dfs = x.map(field => spark.sql(s"select 'DataProfilerStats' as Table_Name,(SELECT 100 * approx_count_distinct($field)/count(1) from parquetDFTable) as Percentage_Unique_Value from parquetDFTable"))

 

问题是,尽管数据类型验证成功,但在获取字段名称后的 for 循环中,它实际上并没有将列限制为整数或小数,查询正在对所有列类型甚至字符串执行好。我们如何获得只有十进制或整数的字段。我们如何解决这个问题。

【问题讨论】:

【参考方案1】:

这就是你如何过滤整数和双精度类型的列

// fiter the columns 
val columns = df.schema.fields.filter(x => x.dataType == IntegerType || x.dataType == DoubleType)

//use these filtered with select 
df.select(columns.map(x => col(x.name)): _*)

我希望这会有所帮助!

【讨论】:

谢谢尚卡尔!提供的解决方案实际上过滤了整数和小数,但我们如何将其传递给我现有的 sql,即 - val dfs = x.map(field => spark.sql(s"select 'DataProfilerStats' as Table_Name,(SELECT 100 * approx_count_distinct ($field)/count(1) 来自 parquetDFTable) 作为 ParquetDFTable 中的 Percentage_Unique_Value"))。这里 $field 是输入数据框中的列名。有什么想法吗? 再次感谢尚卡尔!我想出了最简单的部分:) ..val m = z.columns, m 是代码中提到的上述最后一步,然后是 val dfs = m.map(field => spark.sql(s"select ... ) ..它在受限列上运行..你像往常一样棒极了!!【参考方案2】:

请试试这个:

import org.apache.spark.sql.types._

val names = df.schema.fields.collect  
  case StructField(name, DecimalType(), _, _) => approx_count_distinct(name)
  case StructField(name, IntegerType, _, _)   => approx_count_distinct(name)


spark.table("parquetDFTable").select(names: _*)

【讨论】:

感谢 random-impressions 的回答!但是 approx_count_distinct 使这个操作有点慢,因为数据大小将是数百万。我们如何将以下语句与您提供的代码一起添加... val dfs = names.map(field => spark.sql(s"select 'DataProfilerStats' as Table_Name, '$field' as Column_Name from parquetDFTable" )) val withSum = dfs.reduce((x, y) => x.union(y)).distinct().coalesce(1) withSum.show()

以上是关于检查列数据类型并仅对 Spark SQL 中的整数和小数执行 SQL的主要内容,如果未能解决你的问题,请参考以下文章

Spark DataFrame ArrayType 或 MapType 用于检查列中的值

如何使用对称密钥加密和解密 SQL Server 中的整数数据类型列

散列一个 SQL 行?

SQL Server:检查列的数据类型

Spark SQL数据类型

Spark SQL数据类型