如何在pyspark中解析csv格式的元组数据?

Posted

技术标签:

【中文标题】如何在pyspark中解析csv格式的元组数据?【英文标题】:How to parse tuple data from csv format in pyspark? 【发布时间】:2018-03-15 17:42:33 【问题描述】:

数据集为 CSV 格式。该文件中的每一行都包含一个元组,其中第一个元素是植物的名称,其余元素是找到植物的状态。

例子:

abelia,fl,nc
abelia x grandiflora,fl,nc
abelmoschus,ct,dc,fl,hi,il,ky,la,md,mi,ms,nc,sc,va,pr,vi

如何解析并创建一个包含植物和状态列的数据框(这将包含除植物之外的所有数据,如列表)?

【问题讨论】:

你试过什么?你在哪里卡住?你能和我们分享一些代码吗?您想要的输出是什么样的? How to make good reproducible apache spark dataframe examples. 我被困在解析中。 Map 在这里没用,因为它会产生不同长度的输出。我将它加载为 rdd 然后尝试使用 map 解析。 期望的输出是什么?你想要一个包含 51 列(植物名称加每个州 1 列)还是 2 列的数据框?在这两种情况下,尤其是后者,地图都不是没用的。也请分享您尝试过的代码。 只需要 2 列。一个用于工厂,另一个用于州 【参考方案1】:

您可以使用rdd.map() 或使用DataFrames 和udf()

RDD

首先创建一个示例数据集:

text = """abelia,fl,nc
abelia x grandiflora,fl,nc
abelmoschus,ct,dc,fl,hi,il,ky,la,md,mi,ms,nc,sc,va,pr,vi"""

rdd = sc.parallelize(map(lambda x: (x,), text.split("\n")))
rdd.toDF(["rawText"]).show(truncate=False)
#+--------------------------------------------------------+
#|rawText                                                 |
#+--------------------------------------------------------+
#|abelia,fl,nc                                            |
#|abelia x grandiflora,fl,nc                              |
#|abelmoschus,ct,dc,fl,hi,il,ky,la,md,mi,ms,nc,sc,va,pr,vi|
#+--------------------------------------------------------+

现在使用map() 两次。首先通过拆分, 将每条记录映射到一个列表。第二个将拆分字符串映射为(x[0], x[1:]) 形式的元组:

rdd.map(lambda x: x[0].split(','))\
    .map(lambda x: (x[0], x[1:]))\
    .toDF(["plant", "states"])\
    .show(truncate=False)
#+--------------------+------------------------------------------------------------+
#|plant               |states                                                      |
#+--------------------+------------------------------------------------------------+
#|abelia              |[fl, nc]                                                    |
#|abelia x grandiflora|[fl, nc]                                                    |
#|abelmoschus         |[ct, dc, fl, hi, il, ky, la, md, mi, ms, nc, sc, va, pr, vi]|
#+--------------------+------------------------------------------------------------+

您也可以在一次调用 map() 时完成此操作,但为了便于阅读,我将其分成两部分。

数据框

import pyspark.sql.functions as f
df = sqlCtx.createDataFrame(map(lambda x: (x,), text.split("\n")), ["rawText"])

# define udf to split a string on comma and return all
# of the elements except the first one
get_states = f.udf(lambda x: x.split(',')[1:], ArrayType(StringType()))

df.withColumn('plant', f.split('rawText', ',')[0])\
    .withColumn('states', get_states('rawText'))\
    .select('plant', 'states')\
    .show(truncate=False)
#+--------------------+------------------------------------------------------------+
#|plant               |states                                                      |
#+--------------------+------------------------------------------------------------+
#|abelia              |[fl, nc]                                                    |
#|abelia x grandiflora|[fl, nc]                                                    |
#|abelmoschus         |[ct, dc, fl, hi, il, ky, la, md, mi, ms, nc, sc, va, pr, vi]|
#+--------------------+------------------------------------------------------------+

【讨论】:

以上是关于如何在pyspark中解析csv格式的元组数据?的主要内容,如果未能解决你的问题,请参考以下文章

PySpark Dataframe 将两列转换为基于第三列值的元组新列

PySpark UDF 返回可变大小的元组

将日期和列表的元组转换为 Pandas 数据框

如何从 pyspark 数据框中更快地保存 csv 文件?

将有序元组列表保存为 CSV [重复]

如何解析具有不同键值数量的元组列表