在 BigQuery 中加载 avro 文件 - 默认值的类型为意外类型。预期为 null,但找到字符串:“null”

Posted

技术标签:

【中文标题】在 BigQuery 中加载 avro 文件 - 默认值的类型为意外类型。预期为 null,但找到字符串:“null”【英文标题】:Load a avro file in BigQuery - Unexpected type for default value. Expected null, but found string: "null" 【发布时间】:2019-11-26 13:03:27 【问题描述】:

我需要将此查询的结果传输到 BigQuery,如您所见,我解码了在 Cloud Storage 中获得的数据,我创建了一个 avro 文件以将其加载到 BigQuery 表中,但我收到此错误:

BadRequest                                Traceback (most recent call last)
<ipython-input-8-78860f4800c4> in <module>
    110 bucket_name1 = 'gs://new_bucket/insert_transfer/*.avro'
    111 
--> 112 insert_bigquery_avro(bucket_name1, dataset1, tabela1)

<ipython-input-8-78860f4800c4> in insert_bigquery_avro(target_uri, dataset_id, table_id)
    103         )
    104     print('Starting job '.format(load_job.job_id))
--> 105     load_job.result()
    106     print('Job finished.')
    107 

c:\users\me\appdata\local\programs\python\python37\lib\site-packages\google\cloud\bigquery\job.py in result(self, timeout)
    695             self._begin()
    696         # TODO: modify PollingFuture so it can pass a retry argument to done().
--> 697         return super(_AsyncJob, self).result(timeout=timeout)
    698 
    699     def cancelled(self):

c:\users\me\appdata\local\programs\python\python37\lib\site-packages\google\api_core\future\polling.py in result(self, timeout)
    125             # pylint: disable=raising-bad-type
    126             # Pylint doesn't recognize that this is valid in this case.
--> 127             raise self._exception
    128 
    129         return self._result

BadRequest: 400 Error while reading data, error message: The Apache Avro library failed to parse the header with the following error: Unexpected type for default value. Expected null, but found string: "null"

这是脚本过程:

import csv
import base64
import json
import io
import avro.schema
import avro.io
from avro.datafile import DataFileReader, DataFileWriter
import math
import os
import gcloud
from gcloud import storage
from google.cloud import bigquery
from oauth2client.client import GoogleCredentials
from datetime import datetime, timedelta
import numpy as np

try:
    script_path = os.path.dirname(os.path.abspath(__file__)) + "/"
except:
    script_path = "C:\\Users\\me\\Documents\\Keys\\key.json"

#Bigquery Credentials and settings
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = script_path

folder = str((datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d'))
bucket_name = 'gs://new_bucket/table/*.csv'
dataset = 'dataset'
tabela = 'table'

schema = avro.schema.Parse(open("C:\\Users\\me\\schema_table.avsc", "rb").read())  

writer = DataFileWriter(open("C:\\Users\\me\\table_register.avro", "wb"), avro.io.DatumWriter(), schema)


def insert_bigquery(target_uri, dataset_id, table_id):
    bigquery_client = bigquery.Client()
    dataset_ref = bigquery_client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.schema = [
        bigquery.SchemaField('id','STRING',mode='REQUIRED')
    ]
    job_config.source_format = bigquery.SourceFormat.CSV
    job_config.field_delimiter = ";"
    uri = target_uri
    load_job = bigquery_client.load_table_from_uri(
        uri,
        dataset_ref.table(table_id),
        job_config=job_config
        )
    print('Starting job '.format(load_job.job_id))
    load_job.result()
    print('Job finished.')

#insert_bigquery(bucket_name, dataset, tabela)

def get_data_from_bigquery():
    """query bigquery to get data to import to PSQL"""
    bq = bigquery.Client()
    #Busca IDs
    query = """SELECT id FROM dataset.base64_data"""
    query_job = bq.query(query)
    data = query_job.result()
    rows = list(data)
    return rows

a = get_data_from_bigquery()
length = len(a) 
line_count = 0

for row in range(length):
    bytes = base64.b64decode(str(a[row][0]))
    bytes = bytes[5:]
    buf = io.BytesIO(bytes)
    decoder = avro.io.BinaryDecoder(buf)
    rec_reader = avro.io.DatumReader(avro.schema.Parse(open("C:\\Users\\me\\schema_table.avsc").read()))
    out=rec_reader.read(decoder)
    writer.append(out)
writer.close()

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob("insert_transfer/" + destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print('File  uploaded to '.format(
        source_file_name,
        destination_blob_name
    ))

upload_blob('new_bucket', 'C:\\Users\\me\\table_register.avro', 'table_register.avro')

def insert_bigquery_avro(target_uri, dataset_id, table_id):
    bigquery_client = bigquery.Client()
    dataset_ref = bigquery_client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.source_format = bigquery.SourceFormat.AVRO
    time_partitioning = bigquery.table.TimePartitioning(type_=bigquery.TimePartitioningType.DAY, field="date")
    job_config.time_partitioning = time_partitioning
    uri = target_uri
    load_job = bigquery_client.load_table_from_uri(
        uri,
        dataset_ref.table(table_id),
        job_config=job_config
        )
    print('Starting job '.format(load_job.job_id))
    load_job.result()
    print('Job finished.')

dataset1 = 'dataset'
tabela1 = 'table'    
bucket_name1 = 'gs://new_bucket/insert_transfer/*.avro'

insert_bigquery_avro(bucket_name1, dataset1, tabela1)

我在 Cloud Storage 中收到一个 CSV 文件,如下所示:

这个脚本像这样解码寄存器:

我想创建一个例程,将解码后的信息放入 BigQuery。

架构文件:


    "namespace": "transfers",
    "type": "record",
    "name": "Transfer",
    "doc": "Represents the The transfer request",
    "fields": [
        
            "name": "id",
            "type": "string",
            "doc": "the transfer request id"
        ,
        
            "name": "date",
            "type": 
                "type": "long",
                "logicalType": "timestamp-millis"
            ,
            "doc": "the date where the transaction happend"
        ,
        
            "name": "merchant",
            "type": "string",
            "doc": "the merchant who owns the payment"
        ,
        
            "name": "amount",
            "type": ["null", 
                "type": "bytes",
                "logicalType": "decimal",
                "precision": 4,
                "scale": 2
            ],
            "default": "null",
            "doc": "the foreign amount for the payment"
        ,
        
            "name": "status",
            "type": 
                "type": "enum",
                "name": "transfer_status",
                "symbols": [
                    "RECEIVED",
                    "WAITING_TRANSFER",
                    "ON_PROCESSING",
                    "EXECUTED",
                    "DENIED"
                ]
            ,
            "default": "DENIED"
        ,
        
            "name": "correlation_id",
            "type": ["null", "string"],
            "default": "null",
            "doc": "the correlation id of the request"
        ,
        
            "name": "transfer_period",
            "type": ["null", "string"],
            "default": "null",
            "doc": "The transfer period spec"
        ,
        
            "name": "payments",
            "type": 
                "type": "array",
                "items": "string"
            
        ,
        
            "name": "metadata",
            "type": 
                "type": "map",
                "values": "string"
            
        ,
        
            "name": "events",
            "type": 
                "type": "array",
                "items": 
                    "name": "event",
                    "type": "record",
                    "fields": [
                        
                            "name": "id",
                            "type": "string"
                        ,
                        
                            "name": "type",
                            "type": 
                                "type": "enum",
                                "name": "event_type",
                                "symbols": [
                                    "REQUEST",
                                    "VALIDATION",
                                    "TRANSFER_SCHEDULE",
                                    "TRANSFERENCE"
                                ]
                            
                        ,
                        
                            "name": "amount",
                            "type": ["null", 
                                "type": "bytes",
                                "logicalType": "decimal",
                                "precision": 4,
                                "scale": 2
                            ],
                            "doc": "the original currency amount",
                            "default": "null"
                        ,
                        
                            "name": "date",
                            "type": 
                                "type": "long",
                                "logicalType": "timestamp-millis"
                            ,
                            "doc": "the moment where this request was received by the platform"
                        ,
                        
                            "name": "status",
                            "type": 
                                "type": "enum",
                                "name": "event_status",
                                "symbols": [
                                    "SUCCESS",
                                    "DENIED",
                                    "ERROR",
                                    "TIMEOUT",
                                    "PENDING"
                                ]
                            
                        ,

                        
                            "name": "metadata",
                            "type": 
                                "type": "map",
                                "values": "string"
                            
                        ,
                        
                            "name": "internal_metadata",
                            "type": 
                                "type": "map",
                                "values": "string"
                            
                        ,
                        
                            "name": "error",
                            "type": 
                                "type": "record",
                                "name": "Error",
                                "fields": [
                                    
                                        "name": "code",
                                        "type": ["null", "string"],
                                        "default": "null"
                                    ,
                                    
                                        "name": "message",
                                        "type": ["null", "string"],
                                        "default": "null"
                                    
                                ]
                            
                        ,
                        
                            "name": "message",
                            "type": ["null", "string"],
                            "default": "null"
                        
                    ]
                
            
        
    ]

【问题讨论】:

我尝试包含以下命令来创建一个 avro 文件并附加已解码的记录,并尝试将此文件上传到 BigQuery,但它不起作用。 writer = DataFileWriter(open("C:\\Users\\me\\table_register.avsc", "wb"), avro.io.DatumWriter(), avro.schema.Parse(open("C:\\Users\ \me\\schema_table.avsc").read()))。有什么建议吗? 挑战很有趣,但是,为什么您不简单地使用模式执行 CSV?你有重复字段吗?如果是这样,为什么不使用 JSON? @guillaumeblaquiere 感谢您的反馈。这是我从信息源提供者那里收到的嵌套表。我被赋予的任务是解码与 avsc 架构相交的信息,然后在 BigQuery 上播放。我试图将架构转换为 json 和脚本结果,但没有成功。你会在这个脚本中做什么? 是的,我在这个表中重复了字段。 您的意思是您收到了一个 AVRO 文件吗?你能分享一个例子,看看我是否可以帮助你? 【参考方案1】:

尝试将"default" 的值从"null" 更改为null

Reference.

【讨论】:

你能帮我解决这个问题吗:***.com/questions/59090735/…

以上是关于在 BigQuery 中加载 avro 文件 - 默认值的类型为意外类型。预期为 null,但找到字符串:“null”的主要内容,如果未能解决你的问题,请参考以下文章

在BigQuery文件中加载列名称加载python

如何在 Google BigQuery 中加载大文本文件

如何将数据从按年/月/日分区的存储桶中加载到 bigquery

Google BigQuery:当我从存储中加载数据时出现“string_field_0”

Bigquery 在日分区表中加载数据

在 BigQuery 中加载 JSON / JSON 在从位置开始的行中解析错误 ...:解析器在字符串结束之前终止