Pyspark - 将结构列合并到数组中

Posted

技术标签:

【中文标题】Pyspark - 将结构列合并到数组中【英文标题】:Pyspark - Merge struct columns into array 【发布时间】:2019-08-29 19:00:19 【问题描述】:

我想将多个结构列合并到一个数组中。

我尝试了 array(col1, col2) from .. 但它最终导致数据类型不匹配,即使所有 struct 列都是相同的类型。

查询 ->

select array(struct(f_name_add, True as is_data_found),struct(l_name_add, True as is_data_found)) as tag from (select array(map('value',f_name),map('value',f_add)) as f_name_add, array(map('value',l_name),map('value',l_add)) as l_name_add from (select distinct f_name, f_add, l_name, l_add from db.tabl1 where id = 'aaa')

输入

Sample Table

 id   f_name  f_add  l_name  l_add

 aaa  tom    in     nats    in

预期输出:

"tag":
 [
       
            "f_name_add": [
                
                    "value":"tom"
                ,
                
                    "value": "in"
                
            ],
            "is_data_found": true
        ,
        
         "l_name_add": [
                
                    "value":"nats"
                ,
                
                    "value": "in"
                
            ],
            "is_data_found": true
        
]

错误:

cannot resolve 'array(named_struct('f_name_add', __auto_generated_subquery_name.f_name_add, 'is_data_found', true), named_struct('l_name_add', __auto_generated_subquery_name.l_name_add, 'is_data_found', true))' due to data type mismatch: input to function array should all be the same type, but it's [struct<f_name_add:array<map<string,string>>,is_data_found:boolean>, struct<l_name_add:array<map<string,string>>,is_data_found:boolean>]

【问题讨论】:

【参考方案1】:

看起来错误消息说有两个structs: struct<f_name_add:array<map<string,string>>struct<l_name_add:array<map<string,string>>

由于f_name_addl_name_add,它们之间的比较并不相同。

在这里,我将尝试创建一个 json 字符串数组,但我不确定这是否正是您想要的 :(

    设置一个简单的例子
a = [('aaa', 'tom', 'in', 'nats', 'in'),('bbb', 'tom1', 'on', 'nats1', 'on'),]
df = spark.createDataFrame(a, ['id', 'f_name',  'f_add',  'l_name', 'l_add'])
df.show()


+---+------+-----+------+-----+
| id|f_name|f_add|l_name|l_add|
+---+------+-----+------+-----+
|aaa|   tom|   in|  nats|   in|
|bbb|  tom1|   on| nats1|   on|
+---+------+-----+------+-----+

    算法如下:
df.registerTempTable("tabl1")
df = spark.sql("select array(to_json(struct(f_name_add, True as is_data_found)), \
                             to_json(struct(l_name_add, True as is_data_found))) as tag \
                from (select array(map('value',f_name),map('value',f_add)) as f_name_add, \
                       array(map('value',l_name),map('value',l_add)) as l_name_add \
                from (select distinct f_name, f_add, l_name, l_add from tabl1 where id = 'aaa'))")

df.show(truncate=False)

+------------------------+
|tag                                                                                                                                          |
+------------------------+
|["f_name_add":["value":"tom","value":"in"],"is_data_found":true, "l_name_add":["value":"nats","value":"in"],"is_data_found":true]|
+------------------------+

如果你运行df.dtypes,我们就会有,你可以看到我们这里实际上有一个 json 字符串数组。

[('tag', 'array<string>')]

【讨论】:

是的,有什么想法可以克服这个问题吗?可能是 UDF 或任何提示可能会有所帮助。 你有一些样本数据吗?最好的样本输出。人们在这里会更容易提供帮助:) 添加样本输入和预期输出 你有多少个像‘f_name_add’和‘l_name_add’这样的唯一值? 它可以是任何数字,因为它是动态的并且取决于输入

以上是关于Pyspark - 将结构列合并到数组中的主要内容,如果未能解决你的问题,请参考以下文章

在 Pyspark 中爆炸不是数组的结构列

如何使用给定的reduce函数基于pyspark中的字段合并多个JSON数据行

pyspark - 将两个数据帧与目标中的额外列合并

将重复记录合并到 pyspark 数据框中的单个记录中

pyspark列合并为一行

nTypeError:无法合并类型 <class \'pyspark.sql.types.DoubleType\'> 和 <class \'pyspark.sql.types.Str