从json模式构建spark模式
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从json模式构建spark模式相关的知识,希望对你有一定的参考价值。
我正在尝试构建一个想要显式提供的spark模式,同时创建数据框我可以使用下面生成json模式
from pyspark.sql.types import StructType
# Save schema from the original DataFrame into json:
schema_json = df.schema.json()
这给了我
{"fields":[{"metadata":{},"name":"cloud_events_version","nullable":true,"type":"string"},{"metadata":{},"name":"data","nullable":true,"type":{"fields":[{"metadata":{},"name":"email","nullable":true,"type":"string"},{"metadata":{},"name":"member_role","nullable":true,"type":"string"},{"metadata":{},"name":"reg_source_product","nullable":true,"type":"string"},{"metadata":{},"name":"school_type","nullable":true,"type":"string"},{"metadata":{},"name":"year_in_college","nullable":true,"type":"long"}],"type":"struct"}},{"metadata":{},"name":"event_time","nullable":true,"type":"string"},{"metadata":{},"name":"event_type","nullable":true,"type":"string"},{"metadata":{},"name":"event_type_version","nullable":true,"type":"string"},{"metadata":{},"name":"event_validated_ts","nullable":true,"type":"string"},{"metadata":{},"name":"event_validation_status","nullable":true,"type":"string"},{"metadata":{},"name":"extensions","nullable":true,"type":{"fields":[{"metadata":{},"name":"client_common","nullable":true,"type":{"fields":[{"metadata":{},"name":"adobe_mcid","nullable":true,"type":"string"},{"metadata":{},"name":"adobe_sdid","nullable":true,"type":"string"},{"metadata":{},"name":"auth_state","nullable":true,"type":"string"},{"metadata":{},"name":"uuid","nullable":true,"type":"string"},{"metadata":{},"name":"client_experiments","nullable":true,"type":"string"},{"metadata":{},"name":"client_ip_address","nullable":true,"type":"string"},{"metadata":{},"name":"device_id","nullable":true,"type":"string"},{"metadata":{},"name":"page_name","nullable":true,"type":"string"},{"metadata":{},"name":"referral_url","nullable":true,"type":"string"},{"metadata":{},"name":"url","nullable":true,"type":"string"},{"metadata":{},"name":"user_agent","nullable":true,"type":"string"},{"metadata":{},"name":"uvn","nullable":true,"type":"string"}],"type":"struct"}}],"type":"struct"}},{"metadata":{},"name":"source","nullable":true,"type":"string"},{"metadata":{},"name":"validated_message","nullable":true,"type":"string"},{"metadata":{},"name":"year","nullable":true,"type":"integer"},{"metadata":{},"name":"mon","nullable":true,"type":"integer"},{"metadata":{},"name":"day","nullable":true,"type":"integer"},{"metadata":{},"name":"hour","nullable":true,"type":"integer"}],"type":"struct"}
但为此,我需要解析数据帧,这需要一些时间,我试图避免
我能做的一件事是从我们内部的目录中获取所需的模式。这给了类似的东西
[{u'Name': u'cloud_events_version', u'Type': u'string'},
{u'Name': u'event_type', u'Type': u'string'},
{u'Name': u'event_time', u'Type': u'string'},
{u'Name': u'data', u'Type': u'struct<school_type:string,reg_source_product:string,member_role:string,email:string,year_in_college:int>'},
{u'Name': u'source', u'Type': u'string'},
{u'Name': u'extensions', u'Type': u'struct<client_common:struct<auth_state:string,client_ip_address:string,client_experiments:string,uvn:string,device_id:string,adobe_sdid:string,url:string,page_name:string,user_agent:string,uuid:string,adobe_mcid:string,referral_url:string>>'},
{u'Name': u'event_type_version', u'Type': u'string'},
{u'Name': u'event_validation_status', u'Type': u'string'},
{u'Name': u'event_validated_ts', u'Type': u'string'},
{u'Name': u'validated_message', u'Type': u'string'}]
我正在尝试编写一个递归python查询,生成上面的json。逻辑是循环遍历此dict列表,并在类型为字符串时为此字典指定名称和类型
{"metadata" : {},"name" : columnName,"nullable" : True,"type" : columnType}
但是当类型是struct时,它会创建struct的所有元素的字典列表,并将其分配给type并以递归方式执行,直到找不到任何struct。
我只能鼓起勇气
def structRecursive(columnName,columnType):
if "struct" not in columnType:
ColumnDict = {"metadata" : {},"name" : columnName,"nullable" : True,"type" : columnType}
else:
structColumnList = []
structColumnDict = {
'metadata': {},
'name': columnName,
'nullable': True,
'type': {'fields': structColumnList, 'type': 'struct'}
}
if columnType.count('struct<')==1:
structCol = columnName
structColList = columnType.encode('utf-8').replace('struct<',
'').replace('>', '').split(',')
for item in structColList:
fieldName = item.split(':')[0]
dataType = item.split(':')[1]
nodeDict = {}
nodeDict['metadata'] = {}
nodeDict['name'] = '{}'.format(fieldName)
nodeDict['nullable'] = True
nodeDict['type'] = '{}'.format(dataType)
structColumnList.append(nodeDict)
else:
columnName = columnType.replace('struct<','',1).replace('>','').split(':')[0]
columnType = columnType.split("{}:".format(columnName),1)[1].replace('>','',1)
return structColumnDict
MainStructList = []
MainStructDict = {'fields': MainStructList, 'type': 'struct'}
for item in ListOfDict :
columnName = item['Name'].encode('utf-8')
columnType = item['Type'].encode('utf-8')
MainStructList.append(structRecursive(columnName,columnType))
当然,这并没有给出理想的结果。很想在这里得到一些建议。
如果我的问题是正确的,您希望解析列的列表并将其转换为描述具有复杂类型的模式的字典。困难的部分是解析表示复杂类型的字符串。首先,我们需要一个从列定义中提取结构条目的方法:
def extract_struct(text):
stop = 7
flag = 1
for c in text[7:]:
stop += 1
if c == "<":
flag += 1
if c == ">":
flag -= 1
if flag == 0:
return text[:stop], text[stop:]
这将返回提取的结构和列定义中的剩余文本。例如
extract_struct("struct<a:int,b:double>,c:string")
将返回
("struct<a:int,d:double>", "c:string").
其次,我们需要遍历每个列类型并获取struct条目的定义:
def parse(s, node):
while s != '':
# Strip column name
col_name = s.partition(':')[0]
s = s.partition(':')[2]
# If column type is struct, parse it as well
if s.startswith('struct'):
col_type, s = extract_struct(s)
node[col_name] = {}
parse(col_type[7:-1], node[col_name])
else:
# Just add column definition
col_type = s.partition(',')[0]
node[col_name] = {
"metadata": {},
"name": col_name,
"nullable": True,
"type": col_type
}
# Go to next entry
s = s.partition(',')[2]
如果列类型很简单,上面的方法只是向模式树中的当前节点添加一个新列,否则它将提取名称和结构,并递归地遍历struct的子条目。现在我们只需要遍历每一列并解析它们。所以在用一种方法包装上面之后:
def build(columns):
def extract_struct(text):
stop = 7
flag = 1
for c in text[7:]:
stop += 1
if c == '<':
flag += 1
if c == '>':
flag -= 1
if flag == 0:
return text[:stop], text[stop:]
def parse(s, node):
while s != '':
# Strip column name
col_name = s.partition(':')[0]
s = s.partition(':')[2]
# If column type is struct, parse it as well
if s.startswith('struct'):
col_type, s = extract_struct(s)
node[col_name] = {}
parse(col_type[7:-1], node[col_name])
else:
# Just add column definition
col_type = s.partition(',')[0]
node[col_name] = {
"metadata": {},
"name": col_name,
"nullable": True,
"type": col_type
}
# Go to next entry
s = s.partition(',')[2]
schema = {}
for column in columns:
parse("{}:{}".format(column['Name'], column['Type']), schema)
return schema
现在,如果您在示例列表上运行它,您将获得以下字典(很容易转换为列列表,但顺序并不重要):
{
"cloud_events_version": {
"nullable": true,
"type": "string",
"name": "cloud_events_version",
"metadata": {}
},
"event_type": {
"nullable": true,
"type": "string",
"name": "event_type",
"metadata": {}
},
"event_time": {
"nullable": true,
"type": "string",
"name": "event_time",
"metadata": {}
},
"event_validated_ts": {
"nullable": true,
"type": "string",
"name": "event_validated_ts",
"metadata": {}
},
"event_type_version": {
"nullable": true,
"type": "string",
"name": "event_type_version",
"metadata": {}
},
"source": {
"nullable": true,
"type": "string",
"name": "source",
"metadata": {}
},
"extensions": {
"client_common": {
"adobe_sdid": {
"nullable": true,
"type": "string",
"name": "adobe_sdid",
"metadata": {}
},
"auth_state": {
"nullable": true,
"type": "string",
"name": "auth_state",
"metadata": {}
},
"client_ip_address": {
"nullable": true,
"type": "string",
"name": "client_ip_address",
"metadata": {}
},
"url": {
"nullable": true,
"type": "string",
"name": "url",
"metadata": {}
},
"client_experiments": {
"nullable": true,
"type": "string",
"name": "client_experiments",
"metadata": {}
},
"referral_url": {
"nullable": true,
"type": "string",
"name": "referral_url",
"metadata": {}
},
"page_name": {
"nullable": true,
"type": "string",
"name": "page_name",
"metadata": {}
},
"user_agent": {
"nullable": true,
"type": "string",
"name": "user_agent",
"metadata": {}
},
"uvn": {
"nullable": true,
"type": "string",
"name": "uvn",
"metadata": {}
},
"chegg_uuid": {
"nullable": true,
"type": "string",
"name": "chegg_uuid",
"metadata": {}
},
"adobe_mcid": {
"nullable": true,
"type": "string",
"name": "adobe_mcid",
"metadata": {}
},
"device_id": {
"nullable": true,
"type": "string",
"name": "device_id",
"metadata": {}
}
}
},
"validated_message": {
"nullable": true,
"type": "string",
"name": "validated_message",
"metadata": {}
},
"event_validation_status": {
"nullable": true,
"type": "string",
"name": "event_validation_status",
"metadata": {}
},
"data": {
"school_type": {
"nullable": true,
"type": "string",
"name": "school_type",
"metadata": {}
},
"reg_source_product": {
"nullable": true,
"type": "string",
"name": "reg_source_product",
"metadata": {}
},
"member_role": {
"nullable": true,
"type": "string",
"name": "member_role",
"metadata": {}
},
"email": {
"nullable": true,
"type": "string",
"name": "email",
"metadata": {}
},
"year_in_college": {
"nullable": true,
"type": "int",
"name": "year_in_college",
"metadata": {}
}
}
}
最后,请注意,这仅适用于简单类型和struct
(不适用于array
或map
类型)。但是,扩展到其他复杂类型也相当容易。
为什么不使用标准的json-schema https://json-schema.org/understanding-json-schema/about.html并使用这个json-shema转换为spark schema https://github.com/zalando-incubator/spark-json-schema#quickstart
终于能够解决这个问题
def struct_definition(column_name, column_type):
column_dict = {"metadata": {}, "name": column_name, "nullable": True, "type": column_type}
return column_dict
def convert_to_json_array(struct_def):
striped = struct_def.lstrip('struct')
striped = striped.lstrip('<')
striped = striped.rstrip('>')
main_struct_list = []
if striped.__contains__('struct'):
name = striped.split(':')[0]
json = {'Name': name, 'Type': striped.lstrip(name + ':') + '>'}
main_struct_list.append(json)
else:
for i in striped.split(','):
key_value = i.split(':')
normalized_json = {'Name': key_value[0], 'Type': key_value[1]}
main_struct_list.append(normalized_json)
return main_struct_list
def to_json(input_list):
main_struct_list = []
for x in input_list:
column_name = x['Name']
column_type = x['Type']
if column_type.startswith('struct'):
main_struct_list.append(
struct_definition(column_name,
{'fields': to_json(convert_to_json_array(column_type)), 'type': 'struct'}))
else:
main_以上是关于从json模式构建spark模式的主要内容,如果未能解决你的问题,请参考以下文章
如何从同时列出数据和模式的 JSON 文件创建 Spark-SQL 数据框
Spark结构化流式kafka在没有模式的情况下转换JSON(推断模式)
大数据技术之_19_Spark学习_05_Spark GraphX 应用解析 + Spark GraphX 概述解析 + 计算模式 + Pregel API + 图算法参考代码 + PageRank(