如何将 json 对象列表转换为单个 pyspark 数据框?

Posted

技术标签:

【中文标题】如何将 json 对象列表转换为单个 pyspark 数据框?【英文标题】:how to convert list of json object into a single pyspark dataframe? 【发布时间】:2019-05-28 08:00:55 【问题描述】:

我是 pyspark 的新手,我有一个来自 api 的 json 列表,每个 json 对象都有相同的模式(键值对)。像这样

[ 'count': 308,
  'next': 'some_url',
  'previous': None,
  'results': ['assigned_to': 43,
    'category': 'Unused',
    'comments': None,
    'completed_ts': None,
    'created': '2019-05-27T05:14:22.306843Z',
    'description': 'Pollution',
    'display_name': 'admin': False,
     'business_name': 'Test Business',
     'contact_number': 'some_number',
     'dob': None,
     'email': 'some_mail',
     'emp_id': None,
     'first_name': 'Alisha'],
  'count': 309,
  'next': 'some_url',
  'previous': None,
  'results': ['assigned_to': 44,
    'category': 'Unused',
    'comments': None,
    'completed_ts': None,
    'created': '2019-05-27T05:14:22.306843Z',
    'description': 'Pollution',
    'display_name': 'admin': False,
     'business_name': 'Test Business',
     'contact_number': 'some_number',
     'dob': None,
     'email': 'some_mail',
     'emp_id': None,
     'first_name': 'Ali'],......]

如果它是单独的 json 文件。我会使用

创建数据框

df =spark.read.json('myfile.json') 然后将所有数据帧合并为一个。我在直接从列表本身转换 datframe 时遇到问题。我用过这个

from pyspark.sql import SparkSession
spark= SparkSession.builder.appName("Basics").getOrCreate()
sc= spark.sparkContext
df = pyspark.sql.SQLContext(sc.parallelize(data_list))`

它给了我 AttributeError: 'RDD' object has no attribute '_jsc'

【问题讨论】:

你是怎么调用那个 API 的?是否有一个循环或一些基于间隔的守护进程正在运行?所有消息也共享相同的架构? 一个函数里面有循环,如果next key中有url(检查json),那么它会一直获取数据,直到next不为null。 @Rohan Kumar 我有一个类似的问题,我必须批量读取传入的 json 数据并将其转储到某个文件中。因此,输出文件具有 json 对象列表。你能分享一下你是如何循环它们的吗 @Neha0908 不确定我当时是如何做到的,但您可以使用 Apache Kafka 捕获流数据,然后从 Pyspark 中的数据中加载特定变量。 spark.apache.org/docs/2.1.0/… 【参考方案1】:

对于您的问题,我找不到直接的答案。但这个解决方案有效,

import json
import ast

df = sc.wholeTextFiles(path).map(lambda x:ast.literal_eval(x[1]))\
                            .map(lambda x: json.dumps(x))

df = spark.read.json(df)

这会给你输出,

+-----+--------+--------+--------------------+
|count|    next|previous|             results|
+-----+--------+--------+--------------------+
|  308|some_url|    null|[[43,Unused,null,...|
|  309|some_url|    null|[[44,Unused,null,...|
+-----+--------+--------+--------------------+

编辑: 如果它在一个变量中,你所要做的就是,

import json

df = sc.parallelize(data).map(lambda x: json.dumps(x))
df = spark.read.json(df)

【讨论】:

路径??我没有任何文件,只有 json 字符串/对象的列表。你以为我们有这些文件吗?? 是的!你有这个变量吗? 它在一个变量中,而不是一个文件中。 我研究它并意识到还有一些其他问题,比如环境变量 pyspark_python 和 pyspark_driver_python 不一样。我正在运行 sparkContext 的多次迭代。

以上是关于如何将 json 对象列表转换为单个 pyspark 数据框?的主要内容,如果未能解决你的问题,请参考以下文章

C#:如何将对象列表转换为该对象的单个属性列表?

如何将数据从多个输入字段转换为单个 JSON 对象以进一步将其插入单个 mysql 字段

如何使用 linq 将列表中的列表转换为单个列表 [重复]

如何在不使用 C# 中的 T 对象的情况下将 Json 数组转换为单个 JSON 对象?

如何将字符串数组转换为 JSON 对象?

如何将嵌套的 JSON 对象转换为数组 Spring Boot?