使用 pyspark 处理 csv 文件中字段中的逗号
Posted
技术标签:
【中文标题】使用 pyspark 处理 csv 文件中字段中的逗号【英文标题】:Dealing with commas within a field in a csv file using pyspark 【发布时间】:2016-02-23 06:44:50 【问题描述】:我有一个 csv 数据文件,其中包含列值中的逗号。例如,
value_1,value_2,value_3
AAA_A,BBB,B,CCC_C
这里的值为“AAA_A”、“BBB,B”、“CCC_C”。但是,当尝试用逗号分割行时,它给了我 4 个值,即“AAA_A”、“BBB”、“B”、“CCC_C”。
如何在 PySpark 中用逗号分割行后得到正确的值?
【问题讨论】:
你怎么知道BBB、B B会去哪一边? 【参考方案1】:使用 databriks 中的 spark-csv 类。
引号之间的分隔符,默认情况下 ("),被忽略。
例子:
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("inferSchema", "true") // Automatically infer data types
.load("cars.csv")
欲了解更多信息,请查看https://github.com/databricks/spark-csv
如果您的引用是 (") 的 (') 实例,则可以使用此类进行配置。
编辑:
对于python API:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('cars.csv')
最好的问候。
【讨论】:
字段值实际上不在引号内。因此,分隔符不在引号内。这是我没有以正确格式获得输出的主要原因。每当我尝试使用.map(lambda l: l.split(","))
拆分它时,它会在找到分隔符的任何地方拆分。
我不明白你的问题:S 你如何识别一个值到另一个值? (BBB,B) 都是一个值,还是两个 (BBB, B) ?
BBB,B 都是一个值。【参考方案2】:
如果你不介意额外的包依赖,你可以使用 Pandas 来解析 CSV 文件。它可以很好地处理内部逗号。
依赖关系:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
一次将整个文件读入 Spark DataFrame:
sc = SparkContext('local','example') # if using locally
sql_sc = SQLContext(sc)
pandas_df = pd.read_csv('file.csv') # assuming the file contains a header
# If no header:
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2'])
s_df = sql_sc.createDataFrame(pandas_df)
或者,更注重数据,您可以将数据分块为 Spark RDD,然后是 DF:
chunk_100k = pd.read_csv('file.csv', chunksize=100000)
for chunky in chunk_100k:
Spark_temp_rdd = sc.parallelize(chunky.values.tolist())
try:
Spark_full_rdd += Spark_temp_rdd
except NameError:
Spark_full_rdd = Spark_temp_rdd
del Spark_temp_rdd
Spark_DF = Spark_full_rdd.toDF(['column 1','column 2'])
【讨论】:
【参考方案3】:我(真的)是 Pyspark 的新手,但过去几年一直在使用 Pandas。我要放在这里的内容最终可能不是最好的解决方案,但它对我有用,所以我认为值得在这里发布。
我在加载 CSV 文件时遇到了同样的问题,该文件在一个特殊字段中嵌入了额外的逗号,如果使用 Pyspark 会触发错误,但如果使用 Pandas 则没有问题。所以我四处寻找处理这个额外分隔符的解决方案,以下代码解决了我的问题:
df = sqlContext.read.format('csv').option('header','true').option('maxColumns','3').option('escape','"').load('cars.csv')
我个人喜欢强制 'maxColumns' 参数只允许特定数量的列。因此,如果“BBB,B”以某种方式被解析为两个字符串,spark 将给出一条错误消息并为您打印整行。 'escape' 选项是真正解决我问题的选项。我不知道这是否有帮助,但希望这是可以进行实验的东西。
【讨论】:
以上是关于使用 pyspark 处理 csv 文件中字段中的逗号的主要内容,如果未能解决你的问题,请参考以下文章