在pyspark中的一行中解析多个json
Posted
技术标签:
【中文标题】在pyspark中的一行中解析多个json【英文标题】:Parse multiple json in one line in pyspark 【发布时间】:2021-11-19 21:24:08 【问题描述】:我有一个json行如下:
"test":"valid2","workflowId":79370,"email":"d1@example.com""email":"d1@example.com","eventName":"emailOpen","dataFields":"campaignId":1125010,"ip":"100.100.200.243","userAgentDevice":"Gmail","messageId":"be4e071c11594bb0b4ee3c444fd08b99","emailId":"d1@example.com","userAgent":"Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko Firefox/11.0 (via ggpht.com GoogleImageProxy)","workflowName":"DWH TEST 06042020 WITH CALL","locale":null,"templateId":1576122,"emailSubject":"DWH TEST","labels":[],"createdAt":"2020-04-06 15:06:16 +00:00","templateName":"DWH TEST","messageTypeId":27043,"experimentId":79413,"campaignName":"DWH Test Automation","workflowId":79370,"email":"d1@example.com","channelId":24365"email":"dd1@example.com","eventName":"emailOpen","dataFields":"campaignId":1100,"ip":"50.100.200.243","userAgentDevice":"Gmail","messageId":"zz4e071c11594bb0b4ee3c444fd08b99","emailId":"dd1@example.com","userAgent":"Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko Firefox/11.0 (via ggpht.com GoogleImageProxy)","workflowName":"TEST","locale":null,"templateId":1576122,"emailSubject":"TEST","labels":"Cambbridge test","createdAt":"2020-04-10 15:06:16 +00:00","templateName":"TEST","messageTypeId":27043,"experimentId":89413,"campaignName":"Cambridge Test","workflowId":18370,"email":"dd1@example.com","channelId":1111"email":"dd1@example.com","eventName":"emailClick","dataFields":"campaignId":1100,"ip":"50.100.200.243","userAgentDevice":"Gmail","messageId":"zzee071c11594bb0b4ee3c444fd08b99","emailId":"dd1@example.com","userAgent":"Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko Firefox/11.0 (via ggpht.com GoogleImageProxy)","workflowName":"TEST","locale":null,"templateId":1576122,"emailSubject":"TEST","labels":"Cambbridge test","createdAt":"2020-04-10 15:08:16 +00:00","templateName":"TEST","messageTypeId":27043,"experimentId":89413,"campaignName":"Cambridge Test","workflowId":18370,"email":"dd1@example.com","channelId":1111"test":"valid2","workflowId":79370,"email":"d1@example.com""email":"d1@example.com","eventName":"emailOpen","dataFields":"campaignId":1125010,"ip":"100.100.200.243","userAgentDevice":"Gmail","messageId":"be4e071c11594bb0b4ee3c444fd08b99","emailId":"d1@example.com","userAgent":"Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko Firefox/11.0 (via ggpht.com GoogleImageProxy)","workflowName":"DWH TEST 06042020 WITH CALL","locale":null,"templateId":1576122,"emailSubject":"DWH TEST","labels":[],"createdAt":"2020-04-06 15:06:16 +00:00","templateName":"DWH TEST","messageTypeId":27043,"experimentId":79413,"campaignName":"DWH Test Automation","workflowId":79370,"email":"d1@example.com","channelId":24365
如您所见,一行中有多个 json。 我需要将额外的 json json 对象 ""test":"valid2","workflowId":79370,"email":"d1@example.com"" 与所有/任何事件 json 相关联因为额外 json 的 workflowId 和 email 与事件的 workflowId 和 email 匹配。
可以有多个这样的额外 json,然后是一个事件。 我不知道如何使用 python 和 pyspark 的组合来准备这样的文件。 使用 pyspark 是强制性的。 我试过了:
df = sql_context.read.json('test.json')
df.show()
但输出只是额外的 json :
+--------------+------+----------+
| email| test|workflowId|
+--------------+------+----------+
|c1@example.com|valid1| 79370|
+--------------+------+----------+
我希望输出看起来像:
id email event workflow_id custom createdatdate createdattime
0 be4e071c11594bb0b4ee3c444fd08b99 d1@example.com emailOpen 79370 valid2 2020414 154248
1 be4e071c11594bb0b4ee3c444fd08b99 d1@example.com emailOpen 79370 valid2 2020414 154248
谁能指导我如何使用 pyspark 处理这样的文件并获得结果 df
【问题讨论】:
那不是合法的 JSON。如果您可以假设每个 JSON 文档都是一个对象,您可以将
替换为 \n
,然后使用 .splitlines()
。
【参考方案1】:
因为这是格式错误的 JSON,我建议您运行一个预处理步骤来修复文件。这可以使用jq
命令行实用程序轻松完成。见here。
-c
标志用于紧凑输出,将生成换行符分隔的 JSON,而不是漂亮的打印。
jq -c . test.json > test_repaired.json
然后您可以使用 Spark 读取该文件,如下所示:
>>> spark \
... .read \
... .json('test_repaired.json') \
... .show()
+--------------------+---------------+----------+------+----------+
| dataFields| email| eventName| test|workflowId|
+--------------------+---------------+----------+------+----------+
| null| d1@example.com| null|valid2| 79370|
|1125010, DWH Tes...| d1@example.com| emailOpen| null| null|
|1100, Cambridge ...|dd1@example.com| emailOpen| null| null|
|1100, Cambridge ...|dd1@example.com|emailClick| null| null|
| null| d1@example.com| null|valid2| 79370|
|1125010, DWH Tes...| d1@example.com| emailOpen| null| null|
+--------------------+---------------+----------+------+----------+
【讨论】:
这对于读取数据框中的 json 非常有用。但我不必为那个额外的 json 设置一行。我需要将它与基于电子邮件和 worflowid 的事件进行映射。因此,理想情况下,df 中应该只有额外 json 中的电子邮件和工作流 id 与事件匹配的行。数据字段|电子邮件|事件名称|测试|工作流标识| +--------------------+--------------+----------+- -----+----------+ |1125010, DWH 测试...| d1@example.com|电子邮件打开|有效2| 79370| |1125010, DWH 测试...| d1@example.com|电子邮件打开|有效2| 79370| 我明白了。在这种情况下,您可以以编程方式修复 JSON 并丢弃不需要的数据。或者您可以将其全部读入 Dataframe,然后使用df.where
子句删除 email
为空的条目。
关于如何在 python 中修复 jsons 的任何建议
Tim Roberts 建议用换行符替换“”字符对此很有用,或者您可以创建一个 JSON 数组:j: str = "[" + my_json.replace("", ",") + "]"
,然后可以使用 @ 反序列化987654329@。然后,您可以过滤掉没有您需要的属性的项目。但是,如果您的数据很大,我建议您在 Spark 中尽可能多地尝试。以上是关于在pyspark中的一行中解析多个json的主要内容,如果未能解决你的问题,请参考以下文章
为 pyspark 数据帧的每一行评估多个 if elif 条件
org.apache.spark.sql.AnalysisException:给定pyspark中的输入列,无法解析'sub_tot`'