使用 pyspark 处理 csv 文件中字段中的逗号

Posted

技术标签:

【中文标题】使用 pyspark 处理 csv 文件中字段中的逗号【英文标题】:Dealing with commas within a field in a csv file using pyspark 【发布时间】:2016-02-23 06:44:50 【问题描述】:

我有一个 csv 数据文件,其中包含列值中的逗号。例如,

value_1,value_2,value_3  
AAA_A,BBB,B,CCC_C  

这里的值为“AAA_A”、“BBB,B”、“CCC_C”。但是,当尝试用逗号分割行时,它给了我 4 个值,即“AAA_A”、“BBB”、“B”、“CCC_C”。

如何在 PySpark 中用逗号分割行后得到正确的值?

【问题讨论】:

你怎么知道BBB、B B会去哪一边? 【参考方案1】:

使用 databriks 中的 spark-csv 类。

引号之间的分隔符,默认情况下 ("),被忽略。

例子:

val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") // Use first line of all files as header
    .option("inferSchema", "true") // Automatically infer data types
    .load("cars.csv")

欲了解更多信息,请查看https://github.com/databricks/spark-csv

如果您的引用是 (") 的 (') 实例,则可以使用此类进行配置。

编辑:

对于python API:

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('cars.csv')

最好的问候。

【讨论】:

字段值实际上不在引号内。因此,分隔符不在引号内。这是我没有以正确格式获得输出的主要原因。每当我尝试使用.map(lambda l: l.split(",")) 拆分它时,它会在找到分隔符的任何地方拆分。 我不明白你的问题:S 你如何识别一个值到另一个值? (BBB,B) 都是一个值,还是两个 (BBB, B) ? BBB,B 都是一个值。【参考方案2】:

如果你不介意额外的包依赖,你可以使用 Pandas 来解析 CSV 文件。它可以很好地处理内部逗号。

依赖关系:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

一次将整个文件读入 Spark DataFrame:

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# If no header:
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) 
s_df = sql_sc.createDataFrame(pandas_df)

或者,更注重数据,您可以将数据分块为 Spark RDD,然后是 DF:

chunk_100k = pd.read_csv('file.csv', chunksize=100000)

for chunky in chunk_100k:
    Spark_temp_rdd = sc.parallelize(chunky.values.tolist())
    try:
        Spark_full_rdd += Spark_temp_rdd
    except NameError:
        Spark_full_rdd = Spark_temp_rdd
    del Spark_temp_rdd

Spark_DF = Spark_full_rdd.toDF(['column 1','column 2'])

【讨论】:

【参考方案3】:

我(真的)是 Pyspark 的新手,但过去几年一直在使用 Pandas。我要放在这里的内容最终可能不是最好的解决方案,但它对我有用,所以我认为值得在这里发布。

我在加载 CSV 文件时遇到了同样的问题,该文件在一个特殊字段中嵌入了额外的逗号,如果使用 Pyspark 会触发错误,但如果使用 Pandas 则没有问题。所以我四处寻找处理这个额外分隔符的解决方案,以下代码解决了我的问题:

df = sqlContext.read.format('csv').option('header','true').option('maxColumns','3').option('escape','"').load('cars.csv')

我个人喜欢强制 'maxColumns' 参数只允许特定数量的列。因此,如果“BBB,B”以某种方式被解析为两个字符串,spark 将给出一条错误消息并为您打印整行。 'escape' 选项是真正解决我问题的选项。我不知道这是否有帮助,但希望这是可以进行实验的东西。

【讨论】:

以上是关于使用 pyspark 处理 csv 文件中字段中的逗号的主要内容,如果未能解决你的问题,请参考以下文章

处理 CSV 文件中的 JSON 对象并保存到 PySpark DataFrame

读取包含嵌入逗号的引用字段的 csv 文件

Pyspark:读取带有双引号和逗号的字段的csv文件

在 pyspark 中处理大数字的数据类型

如何使用pyspark流计算csv文件中的条目数

混合模式 CSV 导入 Pyspark