从 Python 编写嵌套拼花格式

Posted

技术标签:

【中文标题】从 Python 编写嵌套拼花格式【英文标题】:Write nested parquet format from Python 【发布时间】:2020-10-26 06:05:45 【问题描述】:

首先免责声明:我对这两个主题(python 和镶木地板)都很陌生,所以如果我的想法很复杂,请与我交流。

我正在寻找有关如何以最有效的方式最好地完成以下转换的一些指导:

我有一个平面 parquet 文件,其中一个 varchar 列将 JSON 数据存储为字符串,我想将此数据转换为嵌套结构,即 JSON 数据变为嵌套 parquet。如果这有任何帮助,我会提前知道 JSON 的架构。

这是我迄今为止“完成”的事情:


构建样本数据

# load packages

import pandas as pd
import json
import pyarrow as pa
import pyarrow.parquet as pq

# Create dummy data

# dummy data with JSON as string
person_data = 'Name':  ['Bob'],
        'Age': [25],
        'languages': "'mother_language': 'English', 'other_languages': ['German', 'French']"     
        

# from dict to panda df
person_df = pd.DataFrame.from_dict(person_data)

# from panda df to pyarrow table
person_pat = pa.Table.from_pandas(person_df)

# save as parquet file
pq.write_table(person_pat, 'output/example.parquet')

脚本提案

# load dummy data
sample = pa.parquet.read_table('output/example.parquet')

# transform to dict
sample_dict = sample.to_pydict()
# print with indent for checking
print(json.dumps(sample_dict, sort_keys=True, indent=4))
# load json from string and replace string
sample_dict['languages'] = json.loads(str(sample_dict['languages']))
print(json.dumps(sample_dict, sort_keys=True, indent=4))
#type(sample_dict['languages'])

# how to keep the nested structure when going from dict —> panda df —> pyarrow table?
# save dict as nested parquet...

所以,我这里是我的具体问题:

    这种方法是可行的方法还是可以以任何方式进行优化? dict、df 和 pa table 之间的所有转换都感觉效率不高,很高兴在这里接受教育。 在执行 dict —> df 转换时如何保留嵌套结构?还是根本不需要? 编写嵌套 parquet 文件的最佳方法是什么?我已经阅读了Nested data in Parquet with Python,这里提到了快速镶木地板用于阅读,但缺乏写作能力——在此期间是否有任何可行的解决方案?

非常感谢 斯蒂芬

【问题讨论】:

你可以使用 PySpark 吗?我认为使用它应该更容易。如果你愿意,我可以使用 PySpark 编写一个解决方案,你可以决定使用它是否是个好主意 似乎不支持在您的情况下写入嵌套数据,您检查过issues.apache.org/jira/browse/ARROW-1644 吗?我建议使用 Pyspark 嗨@OscarLopezM.,对不起,我出去了一段时间。非常感谢使用 PySpark 的解决方案。已经非常感谢了! 【参考方案1】:

如下所示,PySpark 可以通过一种简单的方式来实现。使用 PySpark 的主要好处是随着数据的增长,基础架构的可扩展性,但是使用普通的 Python 可能会出现问题,就好像你不使用像 Dask 这样的框架,你需要更大的机器来运行它。

from pyspark.sql import HiveContext
hc = HiveContext(sc)

# This is a way to create a PySpark dataframe from your sample, but there are others 
nested_df = hc.read.json(sc.parallelize(["""
'Name':  ['Bob'],
        'Age': [25],
        'languages': "'mother_language': 'English', 'other_languages': ['German', 'French']"     
        
"""]))

# You have nested Spark dataframe here. This shows the content of the spark dataframe. 20 is the max number of rows to show on the console and False means don't cut the columns that don't fit on the screen (show all columns content)
nested_df.show(20,False)

# Writes to a location as parquet
nested_df.write.parquet('/path/parquet')

# Reads the file from the previous location
spark.read.parquet('/path/parquet').show(20, False)

这段代码的输出是

+----+-----+-----------------------------------------------------------------------+
|Age |Name |languages                                                              |
+----+-----+-----------------------------------------------------------------------+
|[25]|[Bob]|'mother_language': 'English', 'other_languages': ['German', 'French']|
+----+-----+-----------------------------------------------------------------------+

+----+-----+-----------------------------------------------------------------------+
|Age |Name |languages                                                              |
+----+-----+-----------------------------------------------------------------------+
|[25]|[Bob]|'mother_language': 'English', 'other_languages': ['German', 'French']|
+----+-----+-----------------------------------------------------------------------+

回答你的问题

    我认为这更有效,因为如果您可以在 Spark 中使用更多执行器,那么您拥有多少数据并不重要 您可以看到,加载 parquet 文件时,所有 dict 和列表都被保留 这取决于“最佳”的定义,但我认为这是一个不错的选择;)

【讨论】:

您好,奥斯卡,感谢您的回答。在 Jupyter 中运行此程序时出现错误:NameError: name 'sc' is not defined。任何建议如何解决这个问题? 嗨斯蒂芬,没问题。 sc 指的是 sparkContext 对象。这取决于您使用的 Spark 版本,您可能必须按原样使用它或以不同的方式使用它。实际上那部分只是为了模拟输入,因为我假设你有一个 json 文件而不是这样的字符串。您能试试hc = HiveContext(spark.sparkContext) 看看是否有效吗?请注意,我将 sc 替换为 spark.sparkContext 其实如果你想从一个json文件中读取你应该使用spark.read.json(path_to_your_file)它会直接为你加载数据框,而不是假装有一行。 感谢您的快速回复。我按照你说的替换了,现在得到一个 NameError: name 'spark' is not defined。我在localhost:4040/environment 上运行了 spark,但不确定是否需要导入其他任何东西。 阅读该单行的另一种方法可以看到***.com/questions/49399245/…。如果您对此有更多疑问,请告诉我。谢谢

以上是关于从 Python 编写嵌套拼花格式的主要内容,如果未能解决你的问题,请参考以下文章

如何从复杂的 JSON API 返回嵌套值

python3嵌套字典解包格式字符串

从嵌套元组键入的python字典重建向量场

Python处理yaml和嵌套数据结构的一些技巧

在 Python 中使用 BS4 抓取数据,嵌套表

如何使用python取消嵌套json格式的数据