检查列数据类型并仅对 Spark SQL 中的整数和小数执行 SQL
Posted
技术标签:
【中文标题】检查列数据类型并仅对 Spark SQL 中的整数和小数执行 SQL【英文标题】:Check column datatype and execute SQL only on Integer and Decimal in Spark SQL 【发布时间】:2017-07-25 20:31:54 【问题描述】:我正在尝试检查输入 Parquet 文件中列的数据类型,如果数据类型是整数或小数,则运行 Spark SQL。
//get Array of structfields
val datatypes = parquetRDD_subset.schema.fields
//Check datatype of column
for (val_datatype <- datatypes) if (val_datatype.dataType.typeName == "integer" || val_datatype.dataType.typeName.contains("decimal"))
//get the field name
val x = parquetRDD_subset.schema.fieldNames
val dfs = x.map(field => spark.sql(s"select 'DataProfilerStats' as Table_Name,(SELECT 100 * approx_count_distinct($field)/count(1) from parquetDFTable) as Percentage_Unique_Value from parquetDFTable"))
问题是,尽管数据类型验证成功,但在获取字段名称后的 for 循环中,它实际上并没有将列限制为整数或小数,查询正在对所有列类型甚至字符串执行好。我们如何获得只有十进制或整数的字段。我们如何解决这个问题。
【问题讨论】:
【参考方案1】:这就是你如何过滤整数和双精度类型的列
// fiter the columns
val columns = df.schema.fields.filter(x => x.dataType == IntegerType || x.dataType == DoubleType)
//use these filtered with select
df.select(columns.map(x => col(x.name)): _*)
我希望这会有所帮助!
【讨论】:
谢谢尚卡尔!提供的解决方案实际上过滤了整数和小数,但我们如何将其传递给我现有的 sql,即 - val dfs = x.map(field => spark.sql(s"select 'DataProfilerStats' as Table_Name,(SELECT 100 * approx_count_distinct ($field)/count(1) 来自 parquetDFTable) 作为 ParquetDFTable 中的 Percentage_Unique_Value"))。这里 $field 是输入数据框中的列名。有什么想法吗? 再次感谢尚卡尔!我想出了最简单的部分:) ..val m = z.columns, m 是代码中提到的上述最后一步,然后是 val dfs = m.map(field => spark.sql(s"select ... ) ..它在受限列上运行..你像往常一样棒极了!!【参考方案2】:请试试这个:
import org.apache.spark.sql.types._
val names = df.schema.fields.collect
case StructField(name, DecimalType(), _, _) => approx_count_distinct(name)
case StructField(name, IntegerType, _, _) => approx_count_distinct(name)
spark.table("parquetDFTable").select(names: _*)
【讨论】:
感谢 random-impressions 的回答!但是 approx_count_distinct 使这个操作有点慢,因为数据大小将是数百万。我们如何将以下语句与您提供的代码一起添加... val dfs = names.map(field => spark.sql(s"select 'DataProfilerStats' as Table_Name, '$field' as Column_Name from parquetDFTable" )) val withSum = dfs.reduce((x, y) => x.union(y)).distinct().coalesce(1) withSum.show()以上是关于检查列数据类型并仅对 Spark SQL 中的整数和小数执行 SQL的主要内容,如果未能解决你的问题,请参考以下文章
Spark DataFrame ArrayType 或 MapType 用于检查列中的值