AWS Glue:如何将嵌套的 Hive 结构扩展到 Dict?
Posted
技术标签:
【中文标题】AWS Glue:如何将嵌套的 Hive 结构扩展到 Dict?【英文标题】:AWS Glue: How to expand nested Hive struct to Dict? 【发布时间】:2019-08-01 16:50:31 【问题描述】:我正在尝试将我的 AWS Glue 爬网程序映射的表中的字段映射扩展到 Python 中的嵌套字典。但是,我找不到任何 Spark/Hive 解析器来反序列化
var_type = 'struct<loc_lat:double,service_handler:string,ip_address:string,device:bigint,source:struct<id:string,contacts:struct<admin:struct<email:string,name:string>>,name:string>,loc_name:string>'
位于 table_schema['Table']['StorageDescriptor']['Columns'] 中的字符串到 Python 字典。
如何在 Glue 中转储表定义:
import boto3
client = boto3.client('glue')
client.get_table(DatabaseName=selected_db, Name=selected_table)
回复:
table_schema = 'Table': 'Name': 'asdfasdf',
'DatabaseName': 'asdfasdf',
'Owner': 'owner',
'CreateTime': datetime.datetime(2019, 7, 29, 13, 20, 13, tzinfo=tzlocal()),
'UpdateTime': datetime.datetime(2019, 7, 29, 13, 20, 13, tzinfo=tzlocal()),
'LastAccessTime': datetime.datetime(2019, 7, 29, 13, 20, 13, tzinfo=tzlocal()),
'Retention': 0,
'StorageDescriptor': 'Columns': ['Name': 'version', 'Type': 'int',
'Name': 'payload',
'Type': 'struct<loc_lat:double,service_handler:string,ip_address:string,device:bigint,source:struct<id:string,contacts:struct<admin:struct<email:string,name:string>>,name:string>,loc_name:string>',
'Name': 'origin', 'Type': 'string'],
'Location': 's3://asdfasdf/',
'InputFormat': 'org.apache.hadoop.mapred.TextInputFormat',
'OutputFormat': 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat',
'Compressed': False,
'NumberOfBuckets': -1,
'SerdeInfo': 'SerializationLibrary': 'org.openx.data.jsonserde.JsonSerDe',
'Parameters': 'paths': 'origin,payload,version',
'BucketColumns': [],
'SortColumns': [],
'Parameters': 'CrawlerSchemaDeserializerVersion': '1.0',
'CrawlerSchemaSerializerVersion': '1.0',
'UPDATED_BY_CRAWLER': 'asdfasdf',
'averageRecordSize': '799',
'classification': 'json',
'compressionType': 'none',
'objectCount': '94',
'recordCount': '92171',
'sizeKey': '74221058',
'typeOfData': 'file',
'StoredAsSubDirectories': False,
'PartitionKeys': ['Name': 'partition_0', 'Type': 'string',
'Name': 'partition_1', 'Type': 'string',
'Name': 'partition_2', 'Type': 'string'],
'TableType': 'EXTERNAL_TABLE',
'Parameters': 'CrawlerSchemaDeserializerVersion': '1.0',
'CrawlerSchemaSerializerVersion': '1.0',
'UPDATED_BY_CRAWLER': 'asdfasdf',
'averageRecordSize': '799',
'classification': 'json',
'compressionType': 'none',
'objectCount': '94',
'recordCount': '92171',
'sizeKey': '74221058',
'typeOfData': 'file',
'CreatedBy': 'arn:aws:sts::asdfasdf',
'ResponseMetadata': 'RequestId': 'asdfasdf',
'HTTPStatusCode': 200,
'HTTPHeaders': 'date': 'Thu, 01 Aug 2019 16:23:06 GMT',
'content-type': 'application/x-amz-json-1.1',
'content-length': '3471',
'connection': 'keep-alive',
'x-amzn-requestid': 'asdfasdf',
'RetryAttempts': 0
目标将是一个 Python 字典和每个字段类型的值,而不是嵌入的字符串。例如
expand_function('struct<loc_lat:double,service_handler:string,ip_address:string,device:bigint,source:struct<id:string,contacts:struct<admin:struct<email:string,name:string>>,name:string>,loc_name:string>')
返回
'loc_lat':'double',
'service_handler':'string',
'ip_address':'string',
'device':'bigint',
'source':'id':'string',
'contacts':
'admin':
'email':'string',
'name':'string'
,
'name':'string'
,
'loc_name':'string'
谢谢!
【问题讨论】:
是pyspark.sql.functions.to_json
你在找什么?看看我的这个答案***.com/a/57261068/9534390
@pythonic833 不完全- pyspark.sql.functions.to_json 将包含 StructType、ArrayType 或 MapType 的列转换为 JSON 字符串。但是,我需要一些东西来将上面的 DDL 格式字符串“struct<...>”解析为我可以使用的对象
同样的问题,你找到解决办法了吗?
@RicardoEdo - 下面的例子。希望对您有所帮助!
【参考方案1】:
接受的答案不处理数组。 该解决方案可以:
import json
import re
def _hive_struct_to_json(hive_str):
"""
Expands embedded Hive struct strings to Python dictionaries
Args:
Hive struct format as string
Returns
JSON object
"""
r = re.compile(r'(.*?)(struct<|array<|[:,>])(.*)')
root = dict()
to_parse = hive_str
parents = []
curr_elem = root
key = None
while to_parse:
left, operator, to_parse = r.match(to_parse).groups()
if operator == 'struct<' or operator == 'array<':
parents.append(curr_elem)
new_elem = dict() if operator == 'struct<' else list()
if key:
curr_elem[key] = new_elem
curr_elem = new_elem
elif isinstance(curr_elem, list):
curr_elem.append(new_elem)
curr_elem = new_elem
key = None
elif operator == ':':
key = left
elif operator == ',' or operator == '>':
if left:
if isinstance(curr_elem, dict):
curr_elem[key] = left
elif isinstance(curr_elem, list):
curr_elem.append(left)
if operator == '>':
curr_elem = parents.pop()
return root
hive_str = '''
struct<
loc_lat:double,
service_handler:string,
ip_address:string,
device:bigint,
source:struct<
id:string,
contacts:struct<
admin:struct<
email:string,
name:array<string>
>
>,
name:string
>,
loc_name:string,
tags:array<
struct<
key:string,
value:string
>
>
>
'''
hive_str = re.sub(r'[\s]+', '', hive_str).strip()
print(hive_str)
print(json.dumps(_hive_struct_to_json(hive_str), indent=2))
打印:
struct<loc_lat:double,service_handler:string,ip_address:string,device:bigint,source:struct<id:string,contacts:struct<admin:struct<email:string,name:array<string>>>,name:string>,loc_name:string,tags:array<struct<key:string,value:string>>>
"loc_lat": "double",
"service_handler": "string",
"ip_address": "string",
"device": "bigint",
"source":
"id": "string",
"contacts":
"admin":
"email": "string",
"name": [
"string"
]
,
"name": "string"
,
"loc_name": "string",
"tags": [
"key": "string",
"value": "string"
]
【讨论】:
不错!谢谢!有没有实现这个的库?我找不到它。 @DiegoMoraCespedes 没有。随意在您的项目中使用上述代码。【参考方案2】:这是在上面嵌入的 Hive 结构字符串上运行的函数。
def _hive_struct_to_json(hive_struct):
"""
Expands embedded Hive struct strings to Python dictionaries
Args:
Hive struct format as string
Returns
JSON object
"""
# Convert embedded hive type definition string to JSON
hive_struct = hive_struct.replace(':', '":"')
hive_struct = hive_struct.replace(',', '","')
hive_struct = hive_struct.replace('struct<', '"')
hive_struct = hive_struct.replace('""', '"')
hive_struct = hive_struct.replace('>', '"')
hive_struct = hive_struct.replace('"', '')
return json.loads(hive_struct)
hive_str = 'struct<loc_lat:double,service_handler:string,ip_address:string,device:bigint,source:struct<id:string,contacts:struct<admin:struct<email:string,name:string>>,name:string>,loc_name:string>'
print(json.dumps(_hive_struct_to_json(hive_str),indent=2))
返回:
"loc_lat": "double",
"service_handler": "string",
"ip_address": "string",
"device": "bigint",
"source":
"id": "string",
"contacts":
"admin":
"email": "string",
"name": "string"
,
"name": "string"
,
"loc_name": "string"
【讨论】:
【参考方案3】:我尝试从一些现有的方式中寻找,并从 pyspark 中找到了一些辅助函数。
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("tmp").getOrCreate()
struct_map = T._parse_datatype_string("MAP < STRING, STRUCT < year: INT, place: STRING, details: STRING >>")
struct_map 是一种 pyspark 类型,它又具有嵌套字段以进行迭代。一旦你有一个像上面这样的对象,执行递归调用来展平它应该很容易。我愿意听取其他人对这种方法的意见。
【讨论】:
以上是关于AWS Glue:如何将嵌套的 Hive 结构扩展到 Dict?的主要内容,如果未能解决你的问题,请参考以下文章
如何创建分区具有不同列的 AWS Glue 表? ('HIVE_PARTITION_SCHEMA_MISMATCH')
如何从aws glue pyspark作业中的嵌套数组中提取数据