使用 Struct 列类型读/写 Parquet
Posted
技术标签:
【中文标题】使用 Struct 列类型读/写 Parquet【英文标题】:Read/Write Parquet with Struct column type 【发布时间】:2020-05-30 07:12:15 【问题描述】:我正在尝试将这样的 Dataframe 写入 Parquet:
| foo | bar |
|-----|-------------------|
| 1 | "a": 1, "b": 10 |
| 2 | "a": 2, "b": 20 |
| 3 | "a": 3, "b": 30 |
我正在使用 Pandas 和 Fastparquet:
df = pd.DataFrame(
"foo": [1, 2, 3],
"bar": ["a": 1, "b": 10, "a": 2, "b": 20, "a": 3, "b": 30]
)
import fastparquet
fastparquet.write('/my/parquet/location/toy-fastparquet.parq', df)
我想在 (py)Spark 中加载 Parquet,并使用 Spark SQL 查询数据,例如:
df = spark.read.parquet("/my/parquet/location/")
df.registerTempTable('my_toy_table')
result = spark.sql("SELECT * FROM my_toy_table WHERE bar.b > 15")
我的问题是,即使fastparquet
可以正确读取其 Parquet 文件(bar
字段被正确反序列化为结构),在 Spark 中,bar
被读取为String 类型的列,仅包含原始结构的 JSON 表示:
In [2]: df.head()
Out[2]: Row(foo=1, bar='"a": 1, "b": 10')
我尝试从 PyArrow 编写 Parquet,但没有运气:ArrowNotImplementedError: Level generation for Struct not supported yet
。我也尝试将file_scheme='hive'
传递给 Fastparquet,但我得到了相同的结果。将 Fastparquet 序列化更改为 BSON (object_encoding='bson'
) 会产生不可读的二进制字段。
[编辑]我看到了以下方法:
[answered] 从 Spark 编写 Parquet [open] 查找实现Parquet's specification for nested types 的 Python 库,并且与 Spark 读取它们的方式兼容 [answered] 使用特定的 JSON 反序列化读取 Spark 中的 Fastparquet 文件(我想这会对性能产生影响) 不要完全使用嵌套结构【问题讨论】:
这确实是Arrow目前的局限,见issues.apache.org/jira/browse/ARROW-1644 谢谢@joris,我的 DF 不包含列表和结构的混合,只是一个结构字段(我使描述更清楚)。但是,目前似乎也不支持这种情况。 您是否尝试在加载数据时传递schema
?
@cesar-a-mostacero 我试过了,但没有成功,因为我错过了 Alexandros 在下面的答案中解释的 JSON 解码
【参考方案1】:
这里至少有 3 个选项:
选项 1:
您不需要使用任何额外的库,例如 fastparquet
,因为 Spark 已经提供了该功能:
pdf = pd.DataFrame(
"foo": [1, 2, 3],
"bar": ["a": 1, "b": 10, "a": 2, "b": 20, "a": 3, "b": 30]
)
df = spark.createDataFrame(pdf)
df.write.mode("overwrite").parquet("/tmp/parquet1")
如果尝试使用 df = spark.read.parquet("/tmp/parquet1")
加载您的数据,架构将是:
StructType([
StructField("foo", LongType(), True),
StructField("bar",MapType(StringType(), LongType(), True), True)])
正如您在这种情况下所看到的,Spark 将保留正确的架构。
选项 2:
如果出于任何原因仍需要使用fastparquet
,那么bar
将被视为字符串,因此您可以将bar
作为字符串加载,然后使用from_json 函数将其转换为JSON。在您的情况下,我们会将 json 作为 Map(string, int) 的字典来处理。这是为了我们自己的方便,因为数据似乎是可以用字典完美表示的键/值序列:
from pyspark.sql.types import StringType, MapType,LongType
from pyspark.sql.functions import from_json
df = spark.read.parquet("/tmp/parquet1")
# schema should be a Map(string, string)
df.withColumn("bar", from_json("bar", MapType(StringType(), LongType()))).show()
# +---+-----------------+
# |foo| bar|
# +---+-----------------+
# | 1|[a -> 1, b -> 10]|
# | 2|[a -> 2, b -> 20]|
# | 3|[a -> 3, b -> 30]|
# +---+-----------------+
选项 3:
如果您的架构没有改变,并且您知道 bar 的每个值将始终具有相同的字段组合(a、b),您还可以将 bar
转换为结构:
schema = StructType([
StructField("a", LongType(), True),
StructField("b", LongType(), True)
])
df = df.withColumn("bar", from_json("bar", schema))
df.printSchema()
# root
# |-- foo: long (nullable = true)
# |-- bar: struct (nullable = true)
# | |-- a: long (nullable = true)
# | |-- b: long (nullable = true)
示例:
然后你可以运行你的代码:
df.registerTempTable('my_toy_table')
spark.sql("SELECT * FROM my_toy_table WHERE bar.b > 20").show()
# or spark.sql("SELECT * FROM my_toy_table WHERE bar['b'] > 20")
# +---+-----------------+
# |foo| bar|
# +---+-----------------+
# | 3|[a -> 3, b -> 30]|
# +---+-----------------+
【讨论】:
id 可能与选项 2 一起使用,因为它符合 OP 尝试使用 pyspark 函数 from_json 完成的任务 感谢@alexandros-biratsis 的出色回答!选项 1 将是完美的,但不幸的是 Parquet 的生产者不运行 Spark。所以看起来 Fastparquet 没有实现 Parquet 的原生 Map 规范(github.com/apache/parquet-format/blob/master/…);相反,它们序列化为 JSON,我必须在 Spark 中反序列化......我想知道是否存在另一个库可以做到这一点。如果不是,我同意选项 2 是要走的路 嗨达里奥,不幸的是我真的不知道是否有这样的库支持结构写作。以上是关于使用 Struct 列类型读/写 Parquet的主要内容,如果未能解决你的问题,请参考以下文章
使用嵌套行(类型 STRUCT)对表 SQL 进行重复数据删除