如何将包含多个键值对的列拆分为pyspark中的不同列
Posted
技术标签:
【中文标题】如何将包含多个键值对的列拆分为pyspark中的不同列【英文标题】:How to split a column that contains multiple key-value pairs into different columns in pyspark 【发布时间】:2019-04-23 03:48:17 【问题描述】:我正在研究一个非常大的数据集,称为 AWS 上的 Reddit。我首先阅读了一个小样本:
file_lzo = sc.newAPIHadoopFile("s3://mv559/reddit/sample-data/",
"com.hadoop.mapreduce.LzoTextInputFormat",
"org.apache.hadoop.io.LongWritable",
"org.apache.hadoop.io.Text")
所以我得到了一个名为file_lzo
的rdd。我取了第一个元素,数据看起来像:
[(0,
'"archived":false,"author":"TistedLogic","author_created_utc":1312615878,"author_flair_background_color":null,"author_flair_css_class":null,"author_flair_richtext":[],"author_flair_template_id":null,"author_flair_text":null,"author_flair_text_color":null,"author_flair_type":"text","author_fullname":"t2_5mk6v","author_patreon_flair":false,"body":"Is it still r\\/BoneAppleTea worthy if it\'s the opposite?","can_gild":true,"can_mod_post":false,"collapsed":false,"collapsed_reason":null,"controversiality":0,"created_utc":1538352000,"distinguished":null,"edited":false,"gilded":0,"gildings":"gid_1":0,"gid_2":0,"gid_3":0,"id":"e6xucdd","is_submitter":false,"link_id":"t3_9ka1hp","no_follow":true,"parent_id":"t1_e6xu13x","permalink":"\\/r\\/Unexpected\\/comments\\/9ka1hp\\/jesus_fking_woah\\/e6xucdd\\/","removal_reason":null,"retrieved_on":1539714091,"score":2,"send_replies":true,"stickied":false,"subreddit":"Unexpected","subreddit_id":"t5_2w67q","subreddit_name_prefixed":"r\\/Unexpected","subreddit_type":"public"')]
然后我使用这个 rdd 创建一个数据框
df = spark.createDataFrame(file_lzo,['idx','map_col'])
df.show(4)
看起来像这样
+-----+--------------------+
| idx| map_col|
+-----+--------------------+
| 0|"archived":false...|
|70139|"archived":false...|
|70139|"archived":false...|
|70139|"archived":false...|
+-----+--------------------+
only showing top 4 rows
最后我想以如下所示的数据帧格式获取数据,并将其保存为 S3 中的拼花格式以供将来使用。
我尝试创建一个 Schema,然后使用 read.json
,但是我得到的所有值都是 Null
fields = [StructField("archived", BooleanType(), True),
StructField("author", StringType(), True),
StructField("author_flair_css_class", StringType(), True),
StructField("author_flair_text", StringType(), True),
StructField("body", StringType(), True),
StructField("can_gild", BooleanType(), True),
StructField("controversiality", LongType(), True),
StructField("created_utc", StringType(), True),
StructField("distinguished", StringType(), True),
StructField("edited", StringType(), True),
StructField("gilded", LongType(), True),
StructField("id", StringType(), True),
StructField("is_submitter", StringType(), True),
StructField("link_id", StringType(), True),
StructField("parent_id", StringType(), True),
StructField("permalink", StringType(), True),
StructField("permalink", StringType(), True),
StructField("removal_reason", StringType(), True),
StructField("retrieved_on", LongType(), True),
StructField("score",LongType() , True),
StructField("stickied", BooleanType(), True),
StructField("subreddit", StringType(), True),
StructField("subreddit_id", StringType(), True)]
schema = StructType(fields)
+--------+------+----------------------+-----------------+----+--------+----------------+-----------+-------------+------+------+----+------------+-------+---------+---------+---------+--------------+------------+-----+--------+---------+------------+
|archived|author|author_flair_css_class|author_flair_text|body|can_gild|controversiality|created_utc|distinguished|edited|gilded| id|is_submitter|link_id|parent_id|permalink|permalink|removal_reason|retrieved_on|score|stickied|subreddit|subreddit_id|
+--------+------+----------------------+-----------------+----+--------+----------------+-----------+-------------+------+------+----+------------+-------+---------+---------+---------+--------------+------------+-----+--------+---------+------------+
| null| null| null| null|null| null| null| null| null| null| null|null| null| null| null| null| null| null| null| null| null| null| null|
| null| null| null| null|null| null| null| null| null| null| null|null| null| null| null| null| null| null| null| null| null| null| null|
| null| null| null| null|null| null| null| null| null| null| null|null| null| null| null| null| null| null| null| null| null| null| null|
+--------+------+----------------------+-----------------+----+--------+----------------+-----------+-------------+------+------+----+------------+-------+---------+---------+---------+--------------+------------+-----+--------+---------+------------+
【问题讨论】:
【参考方案1】:查看您想要的输出,您可以将您的 json 视为 MapType() 的列,然后从中提取列。
开始创建数据框:
my_rdd = [(0, "author": "abc", "id": "012", "archived": "False"),
(1, "author": "bcd", "id": "013", "archived": "False"),
(2, "author": "cde", "id": "014", "archived": "True"),
(3, "author": "edf", "id": "015", "archived": "False")]
df = sqlContext.createDataFrame(my_rdd,['idx','map_col'])
df.show()
# +---+--------------------+
# |idx| map_col|
# +---+--------------------+
# | 0|Map(id -> 012, au...|
# | 1|Map(id -> 013, au...|
# | 2|Map(id -> 014, au...|
# | 3|Map(id -> 015, au...|
# +---+--------------------+
然后,如果您事先不知道要提取哪些密钥,则收集一个并获取密钥,例如:
from pyspark.sql import functions as f
one = df.select(f.col('map_col')).rdd.take(1)
my_dict = one[0][0].keys()
my_dict
# dict_keys(['id', 'author', 'archived'])
如果您已经知道密钥列表,请直接使用该列表。
因此,您可以将地图列展平:
keep_cols = [f.col('map_col').getItem(k).alias(k) for k in my_dict]
df.select(keep_cols).show()
#+---+------+--------+
#| id|author|archived|
#+---+------+--------+
#|012| abc| False|
#|013| bcd| False|
#|014| cde| True|
#|015| edf| False|
#+---+------+--------+
方法getItem()
和alias()
正在发挥作用:第一个从映射列中提取选定的键,第二个根据需要重命名获得的列。
【讨论】:
您好,感谢您的回复。我已经尝试过了,并且在获取钥匙的步骤中。我有一个错误说'str'对象没有属性'keys'。所以我拿不到钥匙。顺便说一句,我们可以通过在创建数据框时设置适当的模式直接将这种 rdd 转换为我想要的格式吗?非常感谢 您的数据似乎不是存储为 json 而是存储为字符串,或者您可能只需要my_dict = one[0].keys()
而不是 my_dict = one[0][0].keys()
如果 json 字典被保存为字符串,您可以尝试使用 json.loads
将其更改为字典。还请提供一个新样本,准确再现您的实际数据(看看我如何更改您的样本才能在没有 SyntaxError 的情况下导入 pyspark)。
您可以获得与之前的df
相同的数据帧,也可以使用from pyspark.sql import Row
和sc.parallelize(my_rdd).map(lambda x: Row(x[1])).toDF()
。生成的数据框仍然是 MapType() 并且必须按前面所示进行转换。
您好,感谢您的解释。我想也许我不够清楚,所以会引起一些混乱。我更新了我的问题,这样你就可以看到我的真实数据和我之前的程序。稍后我会尝试您的建议并给您反馈:) 非常感谢!以上是关于如何将包含多个键值对的列拆分为pyspark中的不同列的主要内容,如果未能解决你的问题,请参考以下文章