在pyspark中的一行中解析多个json

Posted

技术标签:

【中文标题】在pyspark中的一行中解析多个json【英文标题】:Parse multiple json in one line in pyspark 【发布时间】:2021-11-19 21:24:08 【问题描述】:

我有一个json行如下:

"test":"valid2","workflowId":79370,"email":"d1@example.com""email":"d1@example.com","eventName":"emailOpen","dataFields":"campaignId":1125010,"ip":"100.100.200.243","userAgentDevice":"Gmail","messageId":"be4e071c11594bb0b4ee3c444fd08b99","emailId":"d1@example.com","userAgent":"Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko Firefox/11.0 (via ggpht.com GoogleImageProxy)","workflowName":"DWH TEST 06042020 WITH CALL","locale":null,"templateId":1576122,"emailSubject":"DWH TEST","labels":[],"createdAt":"2020-04-06 15:06:16 +00:00","templateName":"DWH TEST","messageTypeId":27043,"experimentId":79413,"campaignName":"DWH Test Automation","workflowId":79370,"email":"d1@example.com","channelId":24365"email":"dd1@example.com","eventName":"emailOpen","dataFields":"campaignId":1100,"ip":"50.100.200.243","userAgentDevice":"Gmail","messageId":"zz4e071c11594bb0b4ee3c444fd08b99","emailId":"dd1@example.com","userAgent":"Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko Firefox/11.0 (via ggpht.com GoogleImageProxy)","workflowName":"TEST","locale":null,"templateId":1576122,"emailSubject":"TEST","labels":"Cambbridge test","createdAt":"2020-04-10 15:06:16 +00:00","templateName":"TEST","messageTypeId":27043,"experimentId":89413,"campaignName":"Cambridge Test","workflowId":18370,"email":"dd1@example.com","channelId":1111"email":"dd1@example.com","eventName":"emailClick","dataFields":"campaignId":1100,"ip":"50.100.200.243","userAgentDevice":"Gmail","messageId":"zzee071c11594bb0b4ee3c444fd08b99","emailId":"dd1@example.com","userAgent":"Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko Firefox/11.0 (via ggpht.com GoogleImageProxy)","workflowName":"TEST","locale":null,"templateId":1576122,"emailSubject":"TEST","labels":"Cambbridge test","createdAt":"2020-04-10 15:08:16 +00:00","templateName":"TEST","messageTypeId":27043,"experimentId":89413,"campaignName":"Cambridge Test","workflowId":18370,"email":"dd1@example.com","channelId":1111"test":"valid2","workflowId":79370,"email":"d1@example.com""email":"d1@example.com","eventName":"emailOpen","dataFields":"campaignId":1125010,"ip":"100.100.200.243","userAgentDevice":"Gmail","messageId":"be4e071c11594bb0b4ee3c444fd08b99","emailId":"d1@example.com","userAgent":"Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko Firefox/11.0 (via ggpht.com GoogleImageProxy)","workflowName":"DWH TEST 06042020 WITH CALL","locale":null,"templateId":1576122,"emailSubject":"DWH TEST","labels":[],"createdAt":"2020-04-06 15:06:16 +00:00","templateName":"DWH TEST","messageTypeId":27043,"experimentId":79413,"campaignName":"DWH Test Automation","workflowId":79370,"email":"d1@example.com","channelId":24365

如您所见,一行中有多个 json。 我需要将额外的 json json 对象 ""test":"valid2","workflowId":79370,"email":"d1@example.com"" 与所有/任何事件 json 相关联因为额外 json 的 workflowId 和 email 与事件的 workflowId 和 email 匹配。

可以有多个这样的额外 json,然后是一个事件。 我不知道如何使用 python 和 pyspark 的组合来准备这样的文件。 使用 pyspark 是强制性的。 我试过了:

df = sql_context.read.json('test.json')
df.show() 

但输出只是额外的 json :

+--------------+------+----------+
|         email|  test|workflowId|
+--------------+------+----------+
|c1@example.com|valid1|     79370|
+--------------+------+----------+

我希望输出看起来像:

        id                              email           event     workflow_id   custom  createdatdate   createdattime
0   be4e071c11594bb0b4ee3c444fd08b99    d1@example.com  emailOpen   79370       valid2  2020414         154248
1   be4e071c11594bb0b4ee3c444fd08b99    d1@example.com  emailOpen   79370       valid2  2020414         154248

谁能指导我如何使用 pyspark 处理这样的文件并获得结果 df

【问题讨论】:

那不是合法的 JSON。如果您可以假设每个 JSON 文档都是一个对象,您可以将 替换为 \n,然后使用 .splitlines() 【参考方案1】:

因为这是格式错误的 JSON,我建议您运行一个预处理步骤来修复文件。这可以使用jq 命令行实用程序轻松完成。见here。

-c 标志用于紧凑输出,将生成换行符分隔的 JSON,而不是漂亮的打印。

jq -c . test.json > test_repaired.json

然后您可以使用 Spark 读取该文件,如下所示:

>>> spark \
...     .read \
...     .json('test_repaired.json') \
...     .show()
+--------------------+---------------+----------+------+----------+
|          dataFields|          email| eventName|  test|workflowId|
+--------------------+---------------+----------+------+----------+
|                null| d1@example.com|      null|valid2|     79370|
|1125010, DWH Tes...| d1@example.com| emailOpen|  null|      null|
|1100, Cambridge ...|dd1@example.com| emailOpen|  null|      null|
|1100, Cambridge ...|dd1@example.com|emailClick|  null|      null|
|                null| d1@example.com|      null|valid2|     79370|
|1125010, DWH Tes...| d1@example.com| emailOpen|  null|      null|
+--------------------+---------------+----------+------+----------+

【讨论】:

这对于读取数据框中的 json 非常有用。但我不必为那个额外的 json 设置一行。我需要将它与基于电子邮件和 worflowid 的事件进行映射。因此,理想情况下,df 中应该只有额外 json 中的电子邮件和工作流 id 与事件匹配的行。数据字段|电子邮件|事件名称|测试|工作流标识| +--------------------+--------------+----------+- -----+----------+ |1125010, DWH 测试...| d1@example.com|电子邮件打开|有效2| 79370| |1125010, DWH 测试...| d1@example.com|电子邮件打开|有效2| 79370| 我明白了。在这种情况下,您可以以编程方式修复 JSON 并丢弃不需要的数据。或者您可以将其全部读入 Dataframe,然后使用 df.where 子句删除 email 为空的条目。 关于如何在 python 中修复 jsons 的任何建议 Tim Roberts 建议用换行符替换“”字符对此很有用,或者您可以创建一个 JSON 数组:j: str = "[" + my_json.replace("", ",") + "]",然后可以使用 @ 反序列化987654329@。然后,您可以过滤掉没有您需要的属性的项目。但是,如果您的数据很大,我建议您在 Spark 中尽可能多地尝试。

以上是关于在pyspark中的一行中解析多个json的主要内容,如果未能解决你的问题,请参考以下文章

如何在pyspark中解析csv格式的元组数据?

我想用 Pyspark 中的最后一行值填充缺失值:

为 pyspark 数据帧的每一行评估多个 if elif 条件

从pyspark中的文本文件中删除第一行和最后一行

PYSPARK:根据条件用另一个行值更新一行中的值?

org.apache.spark.sql.AnalysisException:给定pyspark中的输入列,无法解析'sub_tot`'