使用 Pyspark 解析 JSON 字符串以查找列表中每个值的出现情况

Posted

技术标签:

【中文标题】使用 Pyspark 解析 JSON 字符串以查找列表中每个值的出现情况【英文标题】:Using Pyspark to parse JSON strings for occurrences of each value in the list 【发布时间】:2021-12-01 08:52:53 【问题描述】:

我是 PySpark 的新手,我正在努力找出每个 IP 地址在以下列表中出现的次数:

sampleJson = [('"user":100, "ips" : ["191.168.192.101", "191.168.192.103", "191.168.192.96", "191.168.192.99"]',), ('"user":101, "ips" : ["191.168.192.102", "191.168.192.105", "191.168.192.103", "191.168.192.107"]',), ('"user":102, "ips" : ["191.168.192.105", "191.168.192.101", "191.168.192.105", "191.168.192.107"]',), ('"user":103, "ips" : ["191.168.192.96", "191.168.192.100", "191.168.192.107", "191.168.192.101"]',), ('"user":104, "ips" : ["191.168.192.99", "191.168.192.99", "191.168.192.102", "191.168.192.99"]',),('"user":105, "ips" : ["191.168.192.99", "191.168.192.99", "191.168.192.100", "191.168.192.96"]',),]

理想情况下,我需要这样的结果:

ip count
191.168.192.96 3
191.168.192.99 6
191.168.192.100 2
191.168.192.101 3
191.168.192.102 2
191.168.192.103 2
191.168.192.105 3
191.168.192.107 3

我已执行以下代码,将结果放入包含用户的列和显示其 ips 但现在无法将每个 ip 的计数提取到所需输出的另一列中。有人可以帮忙吗?

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import json

json_df = spark.createDataFrame(sampleJson)

sch=StructType([StructField('user', StringType(), 
False),StructField('ips',ArrayType(StringType()))])

json_df = json_df.withColumn("n",from_json(col("_1"),sch)).select("n.*").show(10,False)

|user|ips                                                                 |
|:---|-------------------------------------------------------------------:|
|100 |[191.168.192.101, 191.168.192.103, 191.168.192.96, 191.168.192.99]  |
|101 |[191.168.192.102, 191.168.192.105, 191.168.192.103, 191.168.192.107]|
|102 |[191.168.192.105, 191.168.192.101, 191.168.192.105, 191.168.192.107]|
|103 |[191.168.192.96, 191.168.192.100, 191.168.192.107, 191.168.192.101] |
|104 |[191.168.192.99, 191.168.192.99, 191.168.192.102, 191.168.192.99]   |
|105 |[191.168.192.99, 191.168.192.99, 191.168.192.100, 191.168.192.96]   |

【问题讨论】:

【参考方案1】:

您可以使用explode 函数将数组元素转换为行:

json_df = spark.createDataFrame(sampleJson)

sch=StructType([StructField('user', StringType(), 
False),StructField('ips',ArrayType(StringType()))])

json_df = json_df.withColumn("n",from_json(col("_1"),sch)).select("n.*")

json_df = json_df \
    .withColumn('ip', explode("ips")) \
    .groupby('ip') \
    .agg(count('*').alias('count'))


json_df.show()

【讨论】:

不幸的是,我在运行您的代码时遇到了以下问题:AttributeError: 'NoneType' object has no attribute 'withColumn' 这可能是因为您的代码中已经有一个操作,例如.show(10,False),我已经更新了示例并将您的代码包含在其中。请再试一次

以上是关于使用 Pyspark 解析 JSON 字符串以查找列表中每个值的出现情况的主要内容,如果未能解决你的问题,请参考以下文章

解析 Json 字符串以查找相同元素的所有值

Spark:如何解析嵌套列表的 JSON 字符串以触发数据框?

如何解析 pyspark 的 DataStreamReader 中的 json 字符串列并创建数据框

查找 JSON 对象大小而不将其解析为字符串

Pyspark 中的 JSON 文件解析

PySpark 使用 RDD 和 json.load 解析 Json