JSON 表架构到 bigquery.TableSchema 用于 BigQuerySink

Posted

技术标签:

【中文标题】JSON 表架构到 bigquery.TableSchema 用于 BigQuerySink【英文标题】:JSON table schema to bigquery.TableSchema for BigQuerySink 【发布时间】:2016-03-21 09:43:37 【问题描述】:

我有一个以 JSON 格式(具有名称、类型、模式属性)定义并存储在文件中的非平凡表架构(涉及嵌套和重复字段)。它已成功用于使用 bq load 命令填充 bigquery 表。

但是当我尝试使用 Dataflow Python SDK 和 BigQuerySink 做同样的事情时,schema 参数需要是'name':'type' 元素的逗号分隔列表,或者是bigquery.TableSchema 对象。

有什么方便的方法可以将我的 JSON 架构转换为 bigquery.TableSchema,还是必须将其转换为 name:value 列表?

【问题讨论】:

【参考方案1】:

Andrea Pierleoni 发布的上述 sn-p 适用于旧版本的 google-cloud-bigquery python 客户端,例如 google-cloud-bigquery 的版本 0.25.0 恰好通过 pip install apache-beam[gcp] 安装。

但是,BigQuery Python 客户端 API 在最新版本的 google-cloud-bigquery 中发生了巨大变化,例如在我当前使用的版本 1.8.0 中,bigquery.TableFieldSchema()bigquery.TableSchema() 不起作用。

如果您使用的是更新版本的 google-cloud-bigquery 软件包,以下是如何从 JSON 文件中获取所需的 SchemaField 列表(例如,创建表所必需的)。这是 Andrea Pierleoni 上面发布的代码的改编版(谢谢!)

def _get_field_schema(field):
    name = field['name']
    field_type = field.get('type', 'STRING')
    mode = field.get('mode', 'NULLABLE')
    fields = field.get('fields', [])

    if fields:
        subschema = []
        for f in fields:
            fields_res = _get_field_schema(f)
            subschema.append(fields_res)
    else:
        subschema = []

    field_schema = bigquery.SchemaField(name=name, 
        field_type=field_type,
        mode=mode,
        fields=subschema
    )
    return field_schema


def parse_bq_json_schema(schema_filename):
    schema = []
    with open(schema_filename, 'r') as infile:
        jsonschema = json.load(infile)

    for field in jsonschema:
        schema.append(_get_field_schema(field))

    return schema

现在,假设您有一张桌子的schema already defined in JSON。假设您有this particular "schema.json" file,然后使用上述辅助方法,您可以获得 Python 客户端所需的SchemaField 表示,如下所示:

>>> res_schema = parse_bq_json_schema("schema.json")

>>> print(res_schema)

[SchemaField(u'event_id', u'INTEGER', u'REQUIRED', None, ()), SchemaField(u'event_name', u'STRING', u'REQUIRED', None, ()), SchemaField(u'event_types', u'STRING', u'REPEATED', None, ()), SchemaField(u'product_code', u'STRING', u'REQUIRED', None, ()), SchemaField(u'product_sub_code', u'STRING', u'REPEATED', None, ()), SchemaField(u'source', u'RECORD', u'REQUIRED', None, (SchemaField(u'internal', u'RECORD', u'NULLABLE', None, (SchemaField(u'name', u'STRING', u'REQUIRED', None, ()), SchemaField(u'timestamp', u'TIMESTAMP', u'REQUIRED', None, ()))), SchemaField(u'external', u'RECORD', u'NULLABLE', None, (SchemaField(u'name', u'STRING', u'REQUIRED', None, ()), SchemaField(u'timestamp', u'TIMESTAMP', u'REQUIRED', None, ()))))), SchemaField(u'timestamp', u'TIMESTAMP', u'REQUIRED', None, ()), SchemaField(u'user_key', u'RECORD', u'REQUIRED', None, (SchemaField(u'device_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'cookie_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'profile_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'best_id', u'STRING', u'REQUIRED', None, ()))), SchemaField(u'message_id', u'STRING', u'REQUIRED', None, ()), SchemaField(u'message_type', u'STRING', u'REQUIRED', None, ()), SchemaField(u'tracking_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'funnel_stage', u'STRING', u'NULLABLE', None, ()), SchemaField(u'location', u'RECORD', u'NULLABLE', None, (SchemaField(u'latitude', u'FLOAT', u'REQUIRED', None, ()), SchemaField(u'longitude', u'FLOAT', u'REQUIRED', None, ()), SchemaField(u'geo_region_id', u'INTEGER', u'NULLABLE', None, ()))), SchemaField(u'campaign_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'topic', u'STRING', u'REQUIRED', None, ())]

现在到create a table having the above schema using the Python SDK,你会这样做:

dataset_ref = bqclient.dataset('YOUR_DATASET')
table_ref = dataset_ref.table('YOUR_TABLE')
table = bigquery.Table(table_ref, schema=res_schema)

您可以选择设置基于时间的分区(如果需要),如下所示:

table.time_partitioning = bigquery.TimePartitioning(
    type_=bigquery.TimePartitioningType.DAY,
    field='timestamp'  # name of column to use for partitioning
)

这最终创建了表格:

table = bqclient.create_table(table)

print('Created table , partitioned on column '.format(
    table.table_id, table.time_partitioning.field))

【讨论】:

这种样本应该在google文档或源代码中。谢谢你们。【参考方案2】:

目前您不能直接指定 JSON 架构。您必须将架构指定为包含逗号分隔的字段列表的字符串或bigquery.TableSchema 对象。

如果架构很复杂并且包含嵌套和/或重复的字段,我们建议构建一个bigquery.TableSchema 对象。

这是一个示例 bigquery.TableSchema 具有嵌套和重复字段的对象。

from apitools.clients import bigquery

table_schema = bigquery.TableSchema()

# ‘string’ field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'fullName'
field_schema.type = 'string'
field_schema.mode = 'required'
table_schema.fields.append(field_schema)

# ‘integer’ field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'age'
field_schema.type = 'integer'
field_schema.mode = 'nullable'
table_schema.fields.append(field_schema)

# nested field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'phoneNumber'
field_schema.type = 'record'
field_schema.mode = 'nullable'

area_code = bigquery.TableFieldSchema()
area_code.name = 'areaCode'
area_code.type = 'integer'
area_code.mode = 'nullable'
field_schema.fields.append(area_code)

number = bigquery.TableFieldSchema()
number.name = 'number'
number.type = 'integer'
number.mode = 'nullable'
field_schema.fields.append(number)
table_schema.fields.append(field_schema)

# repeated field
field_schema = bigquery.TableFieldSchema()
field_schema.name = 'children'
field_schema.type = 'string'
field_schema.mode = 'repeated'
table_schema.fields.append(field_schema)

【讨论】:

谢谢!我刚刚意识到 Python SDK 是 alpha 版,所以我会将进一步的问题定向到 github,直到它成熟一点。【参考方案3】:

我遇到了同样的问题。就我而言,我已经在 bigquery 中加载了一些 json,并自动生成了一个模式。

所以我能够使用以下命令获取自动生成的架构:

bq show --format prettyjson my-gcp-project:my-bq-table |jq .schema > my-bq-table.json

然后可以使用这个 sn-p 将架构转换为 bigquery.TableSchema

from apache_beam.io.gcp.internal.clients import bigquery


def _get_field_schema(**kwargs):
    field_schema = bigquery.TableFieldSchema()
    field_schema.name = kwargs['name']
    field_schema.type = kwargs.get('type', 'STRING')
    field_schema.mode = kwargs.get('mode', 'NULLABLE')
    fields = kwargs.get('fields')
    if fields:
        for field in fields:
            field_schema.fields.append(_get_field_schema(**field))
    return field_schema


def _inject_fields(fields, table_schema):
    for field in fields:
        table_schema.fields.append(_get_field_schema(**field))


def parse_bq_json_schema(schema):
    table_schema = bigquery.TableSchema()
    _inject_fields(schema['fields'], table_schema)
    return table_schema

它将适用于 bigquery json 架构规范,如果您像我一样懒惰,如果您对默认为可为空的字符串的字段感到满意,则可以避免指定 typemode

【讨论】:

谢谢你!如果您需要在 BIgQuery UI 中创建表的架构(例如,必须手动创建分区表),请不要忘记在 schema 字段中选择 fields 键,即:bq show --format prettyjson my-gcp-project:my-bq-table | jq '.schema.fields' > my-bq-table.json 【参考方案4】:

BigQuery 库中有一个内置的转换器函数:

from google.cloud import bigquery
...
client = bigquery.Client()
client.schema_from_json('path/to/schema.json`)

【讨论】:

这个答案真的应该推到最上面。虽然之前的所有答案在某种程度上都是正确的,但它们都已过时,客户端对象的这一方法取代了它们。【参考方案5】:

这是一个可以帮助你的简单程序。

import json
from apache_beam.io.gcp.internal.clients import bigquery


def bq_schema(json_schema):
    table_schema = bigquery.TableSchema()
    with open(json_schema) as json_file:
        data = json.load(json_file)
        for p in data:
            field = bigquery.TableFieldSchema()
            field.name = p['name']
            field.type = p['type']
            field.mode = p['mode']
            table_schema.fields.append(field)
    return table_schema

【讨论】:

【参考方案6】:

现在,您可以使用内置的parse_table_schema_from_json 函数:

from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json

with open('schema.json') as f:
    schema_string = f.read()
table_schema = parse_table_schema_from_json(schema_string)

【讨论】:

以上是关于JSON 表架构到 bigquery.TableSchema 用于 BigQuerySink的主要内容,如果未能解决你的问题,请参考以下文章

JSON 表架构到 bigquery.TableSchema 用于 BigQuerySink

如何在将JSON文件加载到BigQuery表中时管理/处理架构更改

使用 bigquery 中的 bigquery select 语句的 JSON 格式的现有表的架构

单独 JSON 文件中的 Terraform Bigquery 表架构

使用 Python 将 BigQuery 架构表转换为 json

有没有办法将架构的内容创建到 BigQuery 中的表中?