无法过滤存储在 spark 2.2.0 数据框中的 CSV 列

Posted

技术标签:

【中文标题】无法过滤存储在 spark 2.2.0 数据框中的 CSV 列【英文标题】:Unable to filter CSV columns stored in dataframe in spark 2.2.0 【发布时间】:2019-07-08 17:45:56 【问题描述】:

我正在使用 spark 和 scala 从本地计算机读取 CSV 文件并将其存储到数据帧中(称为 df)。我只需要从df 中选择几个具有新别名名称的选定列,然后保存到新数据框newDf。我也尝试过这样做,但出现以下错误。

main" org.apache.spark.sql.AnalysisException: cannot resolve '`history_temp.time`' given input columns: [history_temp.time, history_temp.poc]

下面是为从我的本地机器读取 csv 文件而编写的代码。

import org.apache.spark.sql.SparkSession

object DataLoadConversion 


  def main(args: Array[String]): Unit = 

    System.setProperty("spark.sql.warehouse.dir", "file:///C:/spark-warehouse")
    val spark = SparkSession.builder().master("local").appName("DataConversion").getOrCreate()

    val df = spark.read.format("com.databricks.spark.csv")
      .option("quote", "\"")
      .option("escape", "\"")
      .option("delimiter", ",")
      .option("header", "true")
      .option("mode", "FAILFAST")
      .option("inferSchema","true")
      .load("file:///C:/Users/an/Desktop/ct_temp.csv")

    df.show(5)   // Till this code is working fine

    val newDf = df.select("history_temp.time","history_temp.poc")

以下是我尝试过但不起作用的代码。

//  val newDf = df.select($"history_temp.time",$"history_temp.poc")
//  val newDf = df.select("history_temp.time","history_temp.poc")
//  val newDf = df.select( df("history_temp.time").as("TIME"))
//  val newDf = df.select(df.col("history_temp.time"))

//  df.select(df.col("*"))    // This is working

    newDf.show(10)
  

【问题讨论】:

请提供一些示例数据(包括您正在使用的标头) 【参考方案1】:

从外观上看。您的列名格式是这里的问题。我猜它们只是常规的 stringType 但是当你有类似 history_temp.time 的东西时,火花认为它是一个排列的列。事实并非如此。我将重命名所有列并替换“。”到“”。然后您可以运行相同的选择,它应该可以工作。您可以使用 foldleft 替换所有“。”使用“”,如下所示。

val replacedDF = df.columns.foldleft(df) (newdf, colname)=>
newdf.withColumnRenamed (colname, colname.replace(".","_"))

完成后,您可以从以下替换DF中进行选择

val newDf= replacedDf.select("history_temp_time","history_temp_poc")

让我知道你的效果如何。

【讨论】:

我已经尝试了以下方法并且它有效。 val newDf = df.columns.foldLeft(df)((curr, n) => curr.withColumnRenamed(n, n.replace(".", "_"))) val finalDf = newDf.select(newDf("history_temp_time").as("TIME") 谢谢。使用您提供的解决方案。 确定 Np。很高兴能帮上忙。

以上是关于无法过滤存储在 spark 2.2.0 数据框中的 CSV 列的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark 数据框中过滤多列的最佳方法是啥?

如何访问存储在scala spark中的数据框中的映射值和键

如何在 Spark 数据框中进行分组和聚合后过滤?

遍历火花数据框中的列并计算最小值最大值

有没有办法使用scala过滤不包含spark数据框中某些内容的字段?

使用 pyspark 将镶木地板文件(在 aws s3 中)存储到 spark 数据框中