PySpark 使用 RDD 和 json.load 解析 Json

Posted

技术标签:

【中文标题】PySpark 使用 RDD 和 json.load 解析 Json【英文标题】:PySpark parse Json using RDD and json.load 【发布时间】:2018-02-08 04:34:51 【问题描述】:


  "city": "Tempe",
  "state": "AZ",
  ...
  "attributes": [
    "BikeParking: True",
    "BusinessAcceptsBitcoin: False",
    "BusinessAcceptsCreditCards: True",
    "BusinessParking: 'garage': False, 'street': False, 'validated': False, 'lot': True, 'valet': False",
    "DogsAllowed: False",
    "RestaurantsPriceRange2: 2",
    "WheelchairAccessible: True"
  ],
  ...

您好,我正在使用 PySpark,我正在尝试输出 (state, BusinessAcceptsBitcoin) 的元组,目前我正在做:

csr = (dataset
        .filter(lambda e:"city" in e and "BusinessAcceptsBitcoin" in e)
        .map(lambda e: (e["city"],e["BusinessAcceptsBitcoin"]))
        .collect()
        )

但是这个命令失败了。如何获取“BusinessAcceptsBitcoin”和“city”字段?

【问题讨论】:

How to make good reproducible Apache Spark Dataframe examples 最好猜它是Read multiline JSON in Apache Spark的副本 抱歉,不能使用数据框。它只能是 RDD! 【参考方案1】:

您可以使用 Dataframe 和 UDF 来解析“属性”字符串。

根据您提供的示例数据,“属性”似乎不是正确的 JSON 或字典。

假设 'attributes' 只是一个字符串,这里是一个使用数据框和 Udf 的示例代码。

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession \
            .builder \
            .appName("test") \
            .getOrCreate()

#sample data
data=[

  "city": "Tempe",
  "state": "AZ",
  "attributes": [
    "BikeParking: True",
    "BusinessAcceptsBitcoin: False",
    "BusinessAcceptsCreditCards: True",
    "BusinessParking: 'garage': False, 'street': False, 'validated': False, 'lot': True, 'valet': False",
    "DogsAllowed: False",
    "RestaurantsPriceRange2: 2",
    "WheelchairAccessible: True"
  ]
]
df=spark.sparkContext.parallelize(data).toDF()

用于解析字符串的用户定义函数

def get_attribute(data,attribute):
    return [list_item for list_item in data if attribute in list_item][0]

注册 udf

udf_get_attribute=udf(get_attribute, StringType

数据框

df.withColumn("BusinessAcceptsBitcoin",udf_get_attribute("attributes",lit("BusinessAcceptsBitcoin"))).select("city","BusinessAcceptsBitcoin").show(truncate=False)

样本输出

+-----+-----------------------------+
|city |BusinessAcceptsBitcoin       |
+-----+-----------------------------+
|Tempe|BusinessAcceptsBitcoin: False|
+-----+-----------------------------+

您也可以使用相同的 udf 查询任何其他字段,例如

df.withColumn("DogsAllowed",udf_get_attribute("attributes",lit("DogsAllowed"))).select("city","DogsAllowed").show(truncate=False)

【讨论】:

抱歉,我不能为此使用数据框!!只能是 RDD!

以上是关于PySpark 使用 RDD 和 json.load 解析 Json的主要内容,如果未能解决你的问题,请参考以下文章

PySpark 使用 RDD 和 json.load 解析 Json

我应该在 PySpark 中选择 RDD 还是 DataFrame 之一?

使用 pyspark 过滤数组中基于 RDD 的值

pyspark - 使用 RDD 进行聚合比 DataFrame 快得多

使用 pyspark 交叉组合两个 RDD

如何通过 pyspark 正确使用 rdd.map 中的模块