Spark数据框databricks csv附加额外的双引号
Posted
技术标签:
【中文标题】Spark数据框databricks csv附加额外的双引号【英文标题】:Spark dataframe databricks csv appends extra double quotes 【发布时间】:2017-06-07 13:51:45 【问题描述】:似乎当我在spark sql
中的dataframe
上应用CONCAT
并将dataframe
作为csv 文件存储在HDFS
位置时,会在concat
列中添加额外的双引号单独在输出文件中。
当我应用显示时不会添加这个双引号。只有当我将 dataframe
存储为 csv 文件时才会添加这个双引号
看来我需要删除在将dataframe
保存为 csv 文件时添加的额外双引号。
我正在使用com.databricks:spark-csv_2.10:1.1.0
jar
Spark 版本为 1.5.0-cdh5.5.1
输入:
campaign_file_name_1, campaign_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89, 1
campaign_file_name_1, campaign_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk, 2
预期输出:
campaign_file_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89, campaign_name_1"="1, 2017-06-06 17:09:31
campaign_file_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk, campaign_name_1"="2, 2017-06-06 17:09:31
火花代码:
object campaignResultsMergerETL extends BaseETL
val now = ApplicationUtil.getCurrentTimeStamp()
val conf = new Configuration()
val fs = FileSystem.get(conf)
val log = LoggerFactory.getLogger(this.getClass.getName)
def main(args: Array[String]): Unit =
//---------------------
code for sqlContext Initialization
//---------------------
val campaignResultsDF = sqlContext.read.format("com.databricks.spark.avro").load(campaignResultsLoc)
campaignResultsDF.registerTempTable("campaign_results")
val campaignGroupedDF = sqlContext.sql(
"""
|SELECT campaign_file_name,
|campaign_name,
|tracker_id,
|SUM(campaign_measure) AS campaign_measure
|FROM campaign_results
|GROUP BY campaign_file_name,campaign_name,tracker_id
""".stripMargin)
campaignGroupedDF.registerTempTable("campaign_results_full")
val campaignMergedDF = sqlContext.sql(
s"""
|SELECT campaign_file_name,
|tracker_id,
|CONCAT(campaign_name,'\"=\"' ,campaign_measure),
|"$now" AS audit_timestamp
|FROM campaign_results_full
""".stripMargin)
campaignMergedDF.show(20)
saveAsCSVFiles(campaignMergedDF, campaignResultsExportLoc, numPartitions)
def saveAsCSVFiles(campaignMeasureDF:DataFrame,hdfs_output_loc:String,numPartitions:Int): Unit =
log.info("saveAsCSVFile method started")
if (fs.exists(new Path(hdfs_output_loc)))
fs.delete(new Path(hdfs_output_loc), true)
campaignMeasureDF.repartition(numPartitions).write.format("com.databricks.spark.csv").save(hdfs_output_loc)
log.info("saveAsCSVFile method ended")
campaignMergedDF.show(20)
的结果正确且工作正常。
campaign_file_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89, campaign_name_1"="1, 2017-06-06 17:09:31
campaign_file_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk, campaign_name_1"="2, 2017-06-06 17:09:31
saveAsCSVFiles
的结果:这是不正确的。
campaign_file_name_1, shagdhsjagdhjsagdhrSqpaKa5saoaus89, "campaign_name_1""=""1", 2017-06-06 17:09:31
campaign_file_name_1, sagdhsagdhasjkjkasihdklas872hjsdjk, "campaign_name_1""=""2", 2017-06-06 17:09:31
有人可以帮我解决这个问题吗?
【问题讨论】:
【参考方案1】:当你使用时
write.format("com.databricks.spark.csv").save(hdfs_output_loc)
为了将包含 "
的文本写入 csv 文件,您会遇到问题,因为 spark-csv
"
符号定义为默认引用
将默认引号从 "
替换为其他内容(例如 NULL)应该允许您按原样将 "
写入文件。
write.format("com.databricks.spark.csv").option("quote", "\u0000").save(hdfs_output_loc)
说明:
您使用的是默认 spark-csv:
转义值为\
quote 值为"
spark-csv doc
引号:默认情况下引号字符是“,但可以设置为任何字符。引号内的分隔符被忽略 转义:默认转义字符为\,但可以设置为任何字符。转义的引号字符被忽略This answer 建议如下:
关闭双引号字符默认转义的方法 (") 与反斜杠字符 () - 即避免所有人转义 完全字符,您必须添加一个 .option() 方法调用 .write() 方法调用后的正确参数。的目标 option() 方法调用是更改 csv() 方法“查找”的方式 “引号”字符的实例,因为它正在发出内容。到 这样做,您必须更改“引用”实际含义的默认值; 即改变从双引号字符寻找的字符 (") 转换为 Unicode "\u0000" 字符(本质上提供 Unicode NUL 字符假设它永远不会出现在文档中)。
【讨论】:
@SurenderRaja - 太棒了! :-)以上是关于Spark数据框databricks csv附加额外的双引号的主要内容,如果未能解决你的问题,请参考以下文章
无法在 azure databricks 中使用 spark 读取 csv 文件
读取 csv 文件时 MS Databricks Spark 中绝对 URI 中的相对路径
使用pyspark,spark + databricks时如何将完全不相关的列添加到数据框中