打印数据框中不同的列名

Posted

技术标签:

【中文标题】打印数据框中不同的列名【英文标题】:printing column names that are different in a dataframe 【发布时间】:2022-01-13 06:10:04 【问题描述】:

我有这个数据框

+-----+-------+-----------+-------------------+-----+
|empID|Zipcode|ZipCodeType|City               |State|
+-----+-------+-----------+-------------------+-----+
|1000 |704    |STANDARD   |PARC PARQUE        |PR   |
|1000 |704    |STANDARD   |PASEO COSTA DEL SUR|PR   |
|1001 |709    |STANDARD   |BDA SAN LUIS       |PR   |
|1001 |76166  |UNIQUE     |CINGULAR WIRELESS  |TX   |
|1002 |76177  |STANDARD   |FORT WORTH         |TX   |
|1002 |76177  |STANDARD   |FT WORTH           |TX   |
|1003 |704    |STANDARD   |URB EUGENE RICE    |PR   |
|1003 |85209  |STANDARD   |MESA               |AZ   |
|1004 |85210  |STANDARD   |MESA               |AZ   |
|1004 |32046  |STANDARD   |HILLIARD           |FL   |
+-----+-------+-----------+-------------------+-----+

对于每个 empID 需要打印其值不同的列名。

+-----+---------------------------------+
|empID|nonMatchingColumnNames           |
+-----+---------------------------------+
|1002 |City                             |
|1000 |City                             |
|1001 |State, City, ZipCodeType, Zipcode|
|1003 |State, City, Zipcode             |
|1004 |State, City, Zipcode             |
+-----+---------------------------------+

我采取的策略是,构建一个结构并收集所有值。检查每组的计数是否> 1,然后打印列名。这是我的代码

val schema = new StructType()
  .add("empID", IntegerType, true)
  .add("Zipcode", StringType, true)
  .add("ZipCodeType", StringType, true)
  .add("City", StringType, true)
  .add("State", StringType, true)
    
val idColumn = "empID"
    
val dfJSON = dfFromText.withColumn("jsonData",from_json(col("value"),schema))
  .select("jsonData.*")
    
dfJSON.printSchema()
dfJSON.show(false)
    
val aggMap = dfJSON.columns
  .filterNot(x => x == idColumn)
  .map(colName => (collect_set(colName).alias(s"$colName_asList"), s"$colName_asList"))
   
aggMap.foreach(println)
    
val aggMapColumns = aggMap.map(x => x._1)
    
val columnsAsList = dfJSON.groupBy(col(idColumn)).agg(aggMapColumns.head, aggMapColumns.tail : _ *)
    
columnsAsList.show(false)
    
val combinedDF = columnsAsList.select(col(idColumn), struct(
  aggMap.map(x => col(x._2)) : _ * ).alias("combined_struct")
)
    
combinedDF.printSchema()
combinedDF.show(false)
    
val columnsToCompare = dfJSON.columns.filterNot(x => x == idColumn).zipWithIndex.map( case (x,y) => (y,x))
    
val output = combinedDF.rdd.map(row => 
  val empNo = row.getAs[Int](0)
  val conbinedStruct: Row = row.getAs[AnyRef]("combined_struct").asInstanceOf[Row]
    
  val nonMatchingColumns = columnsToCompare.foldLeft(List[String]())((acc, item) => 
    val counts = conbinedStruct.getAs[Seq[String]](item._1).length
    if (counts == 1) acc else item._2 :: acc
  )
    
  (empNo, nonMatchingColumns.mkString(", "))
).toDF(idColumn, "nonMatchingColumnNames")
    
output.show(false)

它在我的本地机器上工作得很好,当我将它移植到 spark-shell(它是一个临时查询)时,当我试图将数据帧转换为 RDD 并遍历每个项目时,我得到空指针异常结构体。

【问题讨论】:

【参考方案1】:

您只能使用 spark 的内置函数来获取包含值不唯一的列列表的字符串:

使用countDistinct 确定特定列中是否有多个值用于特定empID 如果 count distinct 大于 2,则使用 when 保存列的名称 遍历列并使用array将此迭代保存到数组中 使用concat_ws从这个数组构建一个字符串

完整代码如下:

import org.apache.spark.sql.functions.array, concat_ws, countDistinct, lit, when

val output = dfJSON.groupBy("empID").agg(
  concat_ws(
    ", ",
    array(dfJSON.columns.filter(_ != "empID").map(c => when(countDistinct(c) > 1, lit(c))): _*)
  ).as("nonMatchingColumnNames")
)

使用您的输入数据框,您将获得以下输出:

+-----+---------------------------------+
|empID|nonMatchingColumnNames           |
+-----+---------------------------------+
|1002 |City                             |
|1000 |City                             |
|1001 |Zipcode, ZipCodeType, City, State|
|1003 |Zipcode, City, State             |
|1004 |Zipcode, City, State             |
+-----+---------------------------------+

【讨论】:

以上是关于打印数据框中不同的列名的主要内容,如果未能解决你的问题,请参考以下文章

在附加的数据框中添加列名? [复制]

在具有不同列名的pandas中连接2个数据帧[重复]

熊猫合并:合并同一列上的两个数据框,但保留不同的列

在r语言中怎样在数据框中添加新列

使用不同的列名连接不同的数据框

如何摆脱我们熊猫数据框中的相等列?