过滤计数等于输入文件 rdd Spark 的列

Posted

技术标签:

【中文标题】过滤计数等于输入文件 rdd Spark 的列【英文标题】:Filter columns having count equal to the input file rdd Spark 【发布时间】:2017-08-07 21:51:01 【问题描述】:

我正在使用以下逻辑从输入 parquet 文件中过滤整数列,并尝试修改此逻辑以添加额外的验证,以查看是否有任何一个输入列的计数等于输入 parquet 文件 rdd 计数。我想过滤掉这样的列。

更新

输入文件中的列数和名称不会是静态的,每次我们获取文件时都会改变。 目标是还过滤掉计数等于输入文件 rdd 计数的列。过滤整数列已经用下面的逻辑实现了。

e.g input parquet file count = 100
    count of values in column A in the input file  = 100 

过滤掉任何这样的列。

当前逻辑

 //Get array of structfields

val columns = df.schema.fields.filter(x => 
                x.dataType.typeName.contains("integer"))

  //Get the column names
  val z = df.select(columns.map(x => col(x.name)): _*)

  //Get array of string 
  val m = z.columns

新逻辑就像

  val cnt = spark.read.parquet("inputfile").count()

  val d = z.column.where column count is not equals cnt 

我不想将列名显式传递给新条件,因为计数等于输入文件的列会改变(上面的 val d = ..) 我们如何为此编写逻辑?

【问题讨论】:

【参考方案1】:

根据我对您的问题的理解,您正在尝试filter 在以integer 作为数据类型的列中,并且其distinct count 不等于另一个输入parquet 文件中rowscount。如果我的理解是正确的,您可以在现有过滤器中添加列计数过滤器

val cnt = spark.read.parquet("inputfile").count()
val columns = df.schema.fields.filter(x =>
  x.dataType.typeName.contains("string") && df.select(x.name).distinct().count() != cnt)

其余代码应照原样遵循。

希望回答对你有帮助。

【讨论】:

【参考方案2】:

Jeanr 和 Ramesh 提出了正确的方法,这就是我为获得所需输出所做的工作:)

cnt = (inputfiledf.count())

val r = df.select(df.col("*")).where(df.col("MY_COLUMN_NAME").<(cnt))

【讨论】:

以上是关于过滤计数等于输入文件 rdd Spark 的列的主要内容,如果未能解决你的问题,请参考以下文章

Spark中rdd分区数量的决定因素

Spark Scala 根据另一个 RDD 的列删除一个 RDD 中的行

Python 中 Spark RDD 的列操作

深入探究Spark -- RDD详解

where子句值的pyspark数据帧计数等于

转换CassandraTableScanRDD org.apache.spark.rdd.RDD