从配置单元表中的 json 字符串中提取值

Posted

技术标签:

【中文标题】从配置单元表中的 json 字符串中提取值【英文标题】:Extract value from a json string in a hive table 【发布时间】:2021-03-23 02:48:51 【问题描述】:

我有如下数据

+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|asset_id  |chg_log                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|4455628986|['oldValues': [], 'newValues': ['COMPO_MAIL'], 'fieldName': 'Communication Type', 'fieldType': 'TEXT', 'lookupInfo': 'fieldType': 'TEXT', 'additional': 'GROUP_QUALIFIER': ['qualifiers': ['lookupKey': 'Communication_Type', 'value': 'COMPO_MAIL']], 'oldValues': [], 'lookupInfo': 'isClientLevel': False, 'fieldType': 'DATE', 'fieldName': 'Delivery Due Date', 'fieldType': 'DATE', 'newValues': ['1601771520000'], 'oldValues': [], 'lookupInfo': 'lookupType': 'CUST_ID', 'fieldType': 'CUST_ID', 'fieldName': 'Customer Id', 'fieldType': 'CUST_ID', 'newValues': ['10486'], 'oldValues': [], 'lookupInfo': 'isClientLevel': False, 'fieldType': 'DROPDOWN', 'fieldName': 'Process_Status', 'fieldType': 'PICKLIST', 'newValues': ['Request Review']]  |
+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

chg_log 列的 Json 字符串中,我想提取 fieldName - Process_Status 的值,它的 newValues 作为请求审查。预期结果如下。

+----------+---------------+
|asset_id  |Process_Status |
+----------+---------------+
|4455628986|Request Review |
+----------+---------------+

json 每次的顺序都不一样。有时 Process_Status 首先出现在 json 字符串中,然后是 Communication Type 等等...... 我尝试使用函数 json_extract 但无法获得它。 如何在 spark Sql 或 Pyspark 或 Hive 中实现这一点。有人可以帮我吗?

提前致谢

【问题讨论】:

【参考方案1】:

您的专栏chg_log 似乎是一个字符串化 Python 字典,而不是一个有效的JSON 字符串。

在 Pyspark 中,您可以使用 UDF 将 dict 转换为 json,然后使用 from_json 将其转换为结构数组,最后过滤数组以找到字段 Process_Status

import ast
from pyspark.sql import functions as F

dict_to_json = F.udf(lambda x: json.dumps(ast.literal_eval(x)))

df = df.withColumn("chg_log", dict_to_json(F.col("chg_log")))

df1 = df.withColumn(
    "chg_log",
    F.from_json("chg_log", F.schema_of_json(df.select("chg_log").head()[0]))
).withColumn(
    "chg_log",
    F.expr("filter(chg_log, x -> x.fieldName = 'Process_Status')")[0]
).select(
    F.col("asset_id"), F.col("chg_log.newValues").alias("Process_Status")
)

df1.show()
# +----------+----------------+
# |  asset_id|  Process_Status|
# +----------+----------------+
# |4455628986|[Request Review]|
# +----------+----------------+

另一种方法是直接在 UDF 中进行查找:

parse_status = F.udf(
    lambda x: next(i["newValues"] for i in ast.literal_eval(x) if i["fieldName"] == "Process_Status"),
    ArrayType(StringType())
)

df1 = df.select(F.col("asset_id"), parse_status(F.col("chg_log")).alias("Process_Status"))

【讨论】:

嗨@blackbishop 谢谢你的回复。我的实际数据中有几个asset_id,并且遇到了异常RuntimeError:(“从用户代码中捕获的StopIteration;任务失败”,StopIteration()) 在您提供的UDF中处理运行时异常的最佳方法是什么?? 嗨@bunnylorr,哪个给你这个错误,第一种还是第二种方法?我怀疑这是第二个,您可以在 UDF 中添加一个 try/catch(对于 next(....) 部分),如果有异常则返回 None。【参考方案2】:

在 Hive 中,对 JSON 处理的支持非常有限:get_json_object 返回字符串,即使它是 JSON 数组或映射,也是 JSONPath filtering does not work。这就是为什么您需要提取、拆分、分解和过滤查询中的所有内容。

例如这个get_json_object(chg_log,"$.[].fieldName")返回:

["Communication Type","Delivery Due Date","Customer Id","Process_Status"]

看起来像数组,但它是一个字符串,你需要将它拆分才能得到数组。

您的 JSON 也是无效的。它应该是双引号而不是单引号,False 应该小写:"isClientLevel": false,而不是 'isClientLevel': False

使用您的示例进行演示(参见代码中的 cmets):

with mytable as(
 select 4455628986 asset_id , "['oldValues': [], 'newValues': ['COMPO_MAIL'], 'fieldName': 'Communication Type', 'fieldType': 'TEXT', 'lookupInfo': 'fieldType': 'TEXT', 'additional': 'GROUP_QUALIFIER': ['qualifiers': ['lookupKey': 'Communication_Type', 'value': 'COMPO_MAIL']], 'oldValues': [], 'lookupInfo': 'isClientLevel': False, 'fieldType': 'DATE', 'fieldName': 'Delivery Due Date', 'fieldType': 'DATE', 'newValues': ['1601771520000'], 'oldValues': [], 'lookupInfo': 'lookupType': 'CUST_ID', 'fieldType': 'CUST_ID', 'fieldName': 'Customer Id', 'fieldType': 'CUST_ID', 'newValues': ['10486'], 'oldValues': [], 'lookupInfo': 'isClientLevel': False, 'fieldType': 'DROPDOWN', 'fieldName': 'Process_Status', 'fieldType': 'PICKLIST', 'newValues': ['Request Review']]" chg_log 
)

select s.asset_id, split(s.newValues,'","')[f.pos] as Process_Status
from
(
select asset_id, 
--extract array of fieldName and newValues 
--returned as string, not array
--this is why we need to split and explode it later
--remove [" and "]
regexp_replace(get_json_object(chg_log,"$.[].fieldName"),'^\\["|"\\]$','') fieldName,
regexp_replace(get_json_object(chg_log,"$.[].newValues"),'^\\["|"\\]$','') newValues
from
(
--Fix invalid JSON
--replace single-quotes with double-quotes, convert False to false, etc, add more fixes if necessary here
select asset_id, regexp_replace(regexp_replace(regexp_replace(chg_log, "'",'"'),'False','false'),'True','true') chg_log from mytable
)s 
)s lateral view outer posexplode(split(fieldName,'","'))f as pos, field_name
where f.field_name='Process_Status' --filter

结果:

asset_id    process_status
4455628986  Request Review

【讨论】:

以上是关于从配置单元表中的 json 字符串中提取值的主要内容,如果未能解决你的问题,请参考以下文章

从配置单元中的 JSON 数组获取 JSON 密钥时面临的问题

从 BigQuery 中的 JSON 数组中提取多个值

如何从json字符串中提取值?

使用 Python 从 JSON 嵌套列表和字符串数组中提取值

如何将 json 字符串数据类型列转换为配置单元中的映射数据类型列?

有条件地在其他列上从配置单元表中获取最新列值