Spark数据框databricks csv附加额外的双引号

Posted

技术标签:

【中文标题】Spark数据框databricks csv附加额外的双引号【英文标题】:Spark dataframe databricks csv appends extra double quotes 【发布时间】:2017-06-07 13:51:45 【问题描述】:

似乎当我在spark sql 中的dataframe 上应用CONCAT 并将dataframe 作为csv 文件存储在HDFS 位置时,会在concat 列中添加额外的双引号单独在输出文件中。

当我应用显示时不会添加这个双引号。只有当我将 dataframe 存储为 csv 文件时才会添加这个双引号

看来我需要删除在将dataframe 保存为 csv 文件时添加的额外双引号。

我正在使用com.databricks:spark-csv_2.10:1.1.0 jar

Spark 版本为 1.5.0-cdh5.5.1

输入:

 campaign_file_name_1, campaign_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89,    1
 campaign_file_name_1, campaign_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk,    2

预期输出:

 campaign_file_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89,     campaign_name_1"="1,  2017-06-06 17:09:31
 campaign_file_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk,   campaign_name_1"="2,  2017-06-06 17:09:31

火花代码:

  object campaignResultsMergerETL extends BaseETL 

  val now  = ApplicationUtil.getCurrentTimeStamp()
  val conf = new Configuration()
  val fs  = FileSystem.get(conf)
  val log = LoggerFactory.getLogger(this.getClass.getName)

  def main(args: Array[String]): Unit = 
    //---------------------
    code for sqlContext Initialization 
    //---------------------
    val campaignResultsDF  = sqlContext.read.format("com.databricks.spark.avro").load(campaignResultsLoc)
    campaignResultsDF.registerTempTable("campaign_results")
    val campaignGroupedDF =  sqlContext.sql(
   """
    |SELECT campaign_file_name,
    |campaign_name,
    |tracker_id,
    |SUM(campaign_measure) AS campaign_measure
    |FROM campaign_results
    |GROUP BY campaign_file_name,campaign_name,tracker_id
  """.stripMargin)

    campaignGroupedDF.registerTempTable("campaign_results_full")

    val campaignMergedDF =  sqlContext.sql(
  s"""
    |SELECT campaign_file_name,
    |tracker_id,
    |CONCAT(campaign_name,'\"=\"' ,campaign_measure),
    |"$now" AS audit_timestamp
    |FROM campaign_results_full
  """.stripMargin)

   campaignMergedDF.show(20)
   saveAsCSVFiles(campaignMergedDF, campaignResultsExportLoc, numPartitions)

   


    def saveAsCSVFiles(campaignMeasureDF:DataFrame,hdfs_output_loc:String,numPartitions:Int): Unit =
    
       log.info("saveAsCSVFile method started")
       if (fs.exists(new Path(hdfs_output_loc)))
          fs.delete(new Path(hdfs_output_loc), true)
       
     campaignMeasureDF.repartition(numPartitions).write.format("com.databricks.spark.csv").save(hdfs_output_loc)
       log.info("saveAsCSVFile method ended")
    

 

campaignMergedDF.show(20) 的结果正确且工作正常。

 campaign_file_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89,   campaign_name_1"="1,  2017-06-06 17:09:31
 campaign_file_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk,   campaign_name_1"="2,  2017-06-06 17:09:31

saveAsCSVFiles 的结果:这是不正确的。

 campaign_file_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89,   "campaign_name_1""=""1",  2017-06-06 17:09:31
 campaign_file_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk,   "campaign_name_1""=""2",  2017-06-06 17:09:31

有人可以帮我解决这个问题吗?

【问题讨论】:

【参考方案1】:

当你使用时

write.format("com.databricks.spark.csv").save(hdfs_output_loc)

为了将包含 " 的文本写入 csv 文件,您会遇到问题,因为 spark-csv

" 符号定义为默认引用

将默认引号从 " 替换为其他内容(例如 NULL)应该允许您按原样将 " 写入文件。

write.format("com.databricks.spark.csv").option("quote", "\u0000").save(hdfs_output_loc)

说明:

您使用的是默认 spark-csv:

转义值为\ quote 值为"

spark-csv doc

引号:默认情况下引号字符是“,但可以设置为任何字符。引号内的分隔符被忽略 转义:默认转义字符为\,但可以设置为任何字符。转义的引号字符被忽略

This answer 建议如下:

关闭双引号字符默认转义的方法 (") 与反斜杠字符 () - 即避免所有人转义 完全字符,您必须添加一个 .option() 方法调用 .write() 方法调用后的正确参数。的目标 option() 方法调用是更改 csv() 方法“查找”的方式 “引号”字符的实例,因为它正在发出内容。到 这样做,您必须更改“引用”实际含义的默认值; 即改变从双引号字符寻找的字符 (") 转换为 Unicode "\u0000" 字符(本质上提供 Unicode NUL 字符假设它永远不会出现在文档中)。

【讨论】:

@SurenderRaja - 太棒了! :-)

以上是关于Spark数据框databricks csv附加额外的双引号的主要内容,如果未能解决你的问题,请参考以下文章

无法在 azure databricks 中使用 spark 读取 csv 文件

读取 csv 文件时 MS Databricks Spark 中绝对 URI 中的相对路径

Databricks spark数据框按每列创建数据框

使用pyspark,spark + databricks时如何将完全不相关的列添加到数据框中

如何使用 Databricks 使用服务原理通过 spark 数据框将批量数据插入 Sql Server 数据仓库

如何在 Databricks 中读取批量 excel 文件数据并加载到 spark 数据框中