如何在pyspark中解析csv格式的元组数据?
Posted
技术标签:
【中文标题】如何在pyspark中解析csv格式的元组数据?【英文标题】:How to parse tuple data from csv format in pyspark? 【发布时间】:2018-03-15 17:42:33 【问题描述】:数据集为 CSV 格式。该文件中的每一行都包含一个元组,其中第一个元素是植物的名称,其余元素是找到植物的状态。
例子:
abelia,fl,nc
abelia x grandiflora,fl,nc
abelmoschus,ct,dc,fl,hi,il,ky,la,md,mi,ms,nc,sc,va,pr,vi
如何解析并创建一个包含植物和状态列的数据框(这将包含除植物之外的所有数据,如列表)?
【问题讨论】:
你试过什么?你在哪里卡住?你能和我们分享一些代码吗?您想要的输出是什么样的? How to make good reproducible apache spark dataframe examples. 我被困在解析中。 Map 在这里没用,因为它会产生不同长度的输出。我将它加载为 rdd 然后尝试使用 map 解析。 期望的输出是什么?你想要一个包含 51 列(植物名称加每个州 1 列)还是 2 列的数据框?在这两种情况下,尤其是后者,地图都不是没用的。也请分享您尝试过的代码。 只需要 2 列。一个用于工厂,另一个用于州 【参考方案1】:
您可以使用rdd.map()
或使用DataFrames 和udf()
:
RDD
首先创建一个示例数据集:
text = """abelia,fl,nc
abelia x grandiflora,fl,nc
abelmoschus,ct,dc,fl,hi,il,ky,la,md,mi,ms,nc,sc,va,pr,vi"""
rdd = sc.parallelize(map(lambda x: (x,), text.split("\n")))
rdd.toDF(["rawText"]).show(truncate=False)
#+--------------------------------------------------------+
#|rawText |
#+--------------------------------------------------------+
#|abelia,fl,nc |
#|abelia x grandiflora,fl,nc |
#|abelmoschus,ct,dc,fl,hi,il,ky,la,md,mi,ms,nc,sc,va,pr,vi|
#+--------------------------------------------------------+
现在使用map()
两次。首先通过拆分,
将每条记录映射到一个列表。第二个将拆分字符串映射为(x[0], x[1:])
形式的元组:
rdd.map(lambda x: x[0].split(','))\
.map(lambda x: (x[0], x[1:]))\
.toDF(["plant", "states"])\
.show(truncate=False)
#+--------------------+------------------------------------------------------------+
#|plant |states |
#+--------------------+------------------------------------------------------------+
#|abelia |[fl, nc] |
#|abelia x grandiflora|[fl, nc] |
#|abelmoschus |[ct, dc, fl, hi, il, ky, la, md, mi, ms, nc, sc, va, pr, vi]|
#+--------------------+------------------------------------------------------------+
您也可以在一次调用 map()
时完成此操作,但为了便于阅读,我将其分成两部分。
数据框
import pyspark.sql.functions as f
df = sqlCtx.createDataFrame(map(lambda x: (x,), text.split("\n")), ["rawText"])
# define udf to split a string on comma and return all
# of the elements except the first one
get_states = f.udf(lambda x: x.split(',')[1:], ArrayType(StringType()))
df.withColumn('plant', f.split('rawText', ',')[0])\
.withColumn('states', get_states('rawText'))\
.select('plant', 'states')\
.show(truncate=False)
#+--------------------+------------------------------------------------------------+
#|plant |states |
#+--------------------+------------------------------------------------------------+
#|abelia |[fl, nc] |
#|abelia x grandiflora|[fl, nc] |
#|abelmoschus |[ct, dc, fl, hi, il, ky, la, md, mi, ms, nc, sc, va, pr, vi]|
#+--------------------+------------------------------------------------------------+
【讨论】:
以上是关于如何在pyspark中解析csv格式的元组数据?的主要内容,如果未能解决你的问题,请参考以下文章