Pyspark 将 json 数组转换为数据帧行

Posted

技术标签:

【中文标题】Pyspark 将 json 数组转换为数据帧行【英文标题】:Pyspark convert json array to dataframe rows 【发布时间】:2020-01-23 19:02:39 【问题描述】:

pyspark 初学者在这里 - 我有一个 spark 数据框,其中每一行都是 s3 上的 url。 每个 url 都是 JSON 数组的 GZIP 文件,我可以将数据框中的每一行(链接)解析为 python 列表,但我不知道如何从这个 JSON 列表中创建多行。

这是我使用的返回 json 列表的函数:

def distributed_read_file(url):
    s3_client = boto3.client('s3')
    result = s3_client.get_object(Bucket=raw_data_bucket_name, Key=url)
    bytestream = BytesIO(result['Body'].read())
    string_json = GzipFile(None, 'rb', fileobj=bytestream).read().decode('utf-8')
    list_of_jsons = json.loads(string_json) 

例如,如果这些是列表中的 JSON 对象:

["a": 99, "b": 102, "a": 43, "b": 87]

我想在 URLS 数据帧上运行一个函数,例如:

result_df = urls_rdd.map(distributed_read_file)

并获得包含以下列的数据框:ab(JSON 键)。 当我尝试这样做时,我将每个 json 对象作为 MapType 列取回,我很难使用它。

非常感谢,我希望它清楚!

【问题讨论】:

你能显示你得到的当前输出吗? 【参考方案1】:

因此,如果它对某人有所帮助,我找到了一个非常简单的解决方案:

def distributed_read_gzip(url):
    s3_client = boto3.client('s3')
    result = s3_client.get_object(Bucket=raw_data_bucket_name, Key=url)
    bytestream = BytesIO(result['Body'].read())
    string_json = GzipFile(None, 'rb', fileobj=bytestream).read().decode('utf-8')
    for json_obj in json.loads(string_json):
        yield Row(**json_obj)

虽然调用函数是使用平面地图完成的,因为每个 URL 都会返回几行:

new_rdd = urls_rdd.flatMap(distributed_read_gzip)

【讨论】:

以上是关于Pyspark 将 json 数组转换为数据帧行的主要内容,如果未能解决你的问题,请参考以下文章

如何使用pyspark将具有多个可能值的Json数组列表转换为数据框中的列

在 Pyspark 中将复杂的数据帧行划分为简单的行

尝试使用 JSON 结构将时间箱转换为 Pyspark 中的分钟和小时数组

Pyspark 将结构数组转换为字符串

将 CountVectorizer 和 TfidfTransformer 稀疏矩阵转换为单独的 Pandas 数据帧行

在 Pyspark 中将结构转换为数组