如何以编程方式生成基于数据框的创建表语句
Posted
技术标签:
【中文标题】如何以编程方式生成基于数据框的创建表语句【英文标题】:How to programmatically generate create table statement based on data frame 【发布时间】:2021-10-12 05:23:47 【问题描述】:我正在尝试基于 avro 数据的 data
部分创建 Hive 表的功能。源数据的架构如下所示。目标表需要通过源数据中的partition
字段进行分区,有name
和description
两列。我可以通过df.select('data.*')
获得源data
部分,通过df.select('data.*').schema
获得表模式,但partition
列不在其中。我的目标是创建一个表子句create table mytable (name string, description string) partitioned by (partition integer) store as parquet
我该怎么做?我需要先将df.select('partition.*')
附加到 df.select('data.*') 吗?非常感谢您的帮助。
已编辑:目标是您不需要指定列的级别,例如 data.name 和分区,而只需传入“columns”和“partition column”(可以是任何嵌套级别,并且然后生成创建表语句。
root
|--metadata: struct
| |---id: string
| |---time : string
|--data:struct
| |---name : string
| |---description : string
|--partition:integer
【问题讨论】:
【参考方案1】:以下独立示例向您展示了如何创建和编写您指定的表。您需要提供自己的path_for_saving
。
import pyspark.sql.functions as F
import pyspark.sql.types as T
schema = T.StructType([
T.StructField('metadata', T.StructType([
T.StructField("id",T.StringType()),
T.StructField("time",T.StringType())])),
T.StructField('data', T.StructType([
T.StructField("name",T.StringType()),
T.StructField("description",T.StringType()),
])),
T.StructField("partition", T.IntegerType()),
T.StructField("Level1", T.StructType([
T.StructField("Level2",T.StructType([
T.StructField("Level3", T.StructType([
T.StructField("partition_alt", T.IntegerType())]))]))]))
])
df_sample_data = spark.createDataFrame([(("id1", "time1"), ("name1", "desc1"), 1, (((3,),),)), (("id2", "time2"), ("name2", "desc2"), 2, (((4,),),)) ], schema)
df_sample_data.printSchema()
df_sample_data.show()
def parse_fields(schema, path=""):
collect = []
for struct_field in schema:
this_field_name = struct_field.name
if type(struct_field.dataType) == T.StructType:
collect = collect + parse_fields(struct_field.dataType, path + this_field_name + ".")
else:
collect = collect + [path + this_field_name]
return collect
parsed_fields = parse_fields(schema) # Find all leaf fields in the schema and return as '.' seperated path
print("Parsed fields:" + str(parsed_fields))
def get_column(col_name):
for field in parsed_fields:
if col_name in field:
return F.col(field).alias(col_name)
name_col = "name"
description_col = "description"
partition_col = "partition_alt"
df_mytable = df_sample_data.select(get_column(name_col), get_column(description_col), get_column(partition_col))
df_mytable.show()
df_mytable.write.partitionBy(partition_col).format("parquet").save(path_for_saving)
输出:
root
|-- metadata: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- time: string (nullable = true)
|-- data: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- description: string (nullable = true)
|-- partition: integer (nullable = true)
|-- Level1: struct (nullable = true)
| |-- Level2: struct (nullable = true)
| | |-- Level3: struct (nullable = true)
| | | |-- partition_alt: integer (nullable = true)
+------------+--------------+---------+-------+
| metadata| data|partition| Level1|
+------------+--------------+---------+-------+
|id1, time1|name1, desc1| 1|3|
|id2, time2|name2, desc2| 2|4|
+------------+--------------+---------+-------+
Parsed fields:['metadata.id', 'metadata.time', 'data.name', 'data.description', 'partition', 'Level1.Level2.Level3.partition_alt']
+-----+-----------+-------------+
| name|description|partition_alt|
+-----+-----------+-------------+
|name1| desc1| 3|
|name2| desc2| 4|
+-----+-----------+-------------+
该示例演示了如何查找深度嵌套的字段。您需要使用自己的标准重写get_column
,以将字段名称与完整的列名称匹配。在这里,get_column
只返回名称中包含col_name
的第一个字段。
【讨论】:
感谢您的回答。抱歉,我可能没有很好地解释它。我的观点是您不需要指定列的级别,例如data.name
和partition
,而只需传入“列”和“分区列”(可以是任何嵌套级别,然后生成一个创建表语句。
我已更新答案以反映您的编辑以上是关于如何以编程方式生成基于数据框的创建表语句的主要内容,如果未能解决你的问题,请参考以下文章