BigQuery - use_avro_logical_types 在 python 脚本中不起作用

Posted

技术标签:

【中文标题】BigQuery - use_avro_logical_types 在 python 脚本中不起作用【英文标题】:BigQuery - use_avro_logical_types doesn´t work in a python script 【发布时间】:2019-12-04 16:28:43 【问题描述】:

疑问与问题类似:BigQuery use_avro_logical_types ignored in Python script 但是我已经更新了我使用谷歌的库但没有成功。所以我想了解我的情况。我将发送 avro 文件的摄入脚本部分。注意:该问题出现在 BigQuery 的多个 avro 文件提取管道中。

编辑:上面提到的更改架构类型的解决方案不起作用,它给出了另一个错误。

BQ 中的架构:

BQ 中的数据:

import csv
import base64
import json
import io
import avro.schema
import avro.io
from avro.datafile import DataFileReader, DataFileWriter
import math
import os
import gcloud
from gcloud import storage
from google.cloud import bigquery
from oauth2client.client import GoogleCredentials
from datetime import datetime, timedelta, date
import numpy as np

try:
    script_path = os.path.dirname(os.path.abspath(__file__)) + "/"
except:
    script_path = "C:\\Users\\me\\key.json"

#Bigquery Credentials and settings
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = script_path

folder = str((datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d'))
data_folder = str((datetime.now() - timedelta(days=1)).strftime('%Y%m%d'))
bucket_name = 'gs://bucket/*.csv'
dataset = 'dataset'
tabela = 'table_ids'

new_file = 'C:\\Users\\me\\register_' + data_folder + '.avro'
file_schema = 'C:\\Users\\me\\schema.avsc'
new_filename = 'register_' + data_folder + '.avro'

bq1 = bigquery.Client()
#Deleta IDs
query1 = """DELETE FROM dataset.table_ids WHERE ID IS NOT NULL""" 
query_job1 = bq1.query(query1)

def insert_bigquery(target_uri, dataset_id, table_id):
    bigquery_client = bigquery.Client()
    dataset_ref = bigquery_client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.schema = [
        bigquery.SchemaField('id','STRING',mode='REQUIRED')
    ]
    job_config.source_format = bigquery.SourceFormat.CSV
    job_config.field_delimiter = ";"
    uri = target_uri
    load_job = bigquery_client.load_table_from_uri(
        uri,
        dataset_ref.table(table_id),
        job_config=job_config
        )
    print('Starting job '.format(load_job.job_id))
    load_job.result()
    print('Job finished.')

insert_bigquery(bucket_name, dataset, tabela)

def get_data_from_bigquery():
    """query bigquery to get data to import to PSQL"""
    bq = bigquery.Client()
    #Busca IDs
    query = """SELECT id FROM dataset.table_ids"""
    query_job = bq.query(query)
    data = query_job.result()
    rows = list(data)
    return rows

a = get_data_from_bigquery()
length = len(a) 
line_count = 0
schema = avro.schema.Parse(open(file_schema, "rb").read())  # need to know the schema to write. According to 1.8.2 of Apache Avro
writer = DataFileWriter(open(new_file, "wb"), avro.io.DatumWriter(), schema)

for row in range(length):
    bytes = base64.b64decode(str(a[row][0]))
    bytes = bytes[5:]
    buf = io.BytesIO(bytes)
    decoder = avro.io.BinaryDecoder(buf)
    rec_reader = avro.io.DatumReader(avro.schema.Parse(open(file_schema).read()))
    out=rec_reader.read(decoder)
    writer.append(out)
writer.close()

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob("insert/" + destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print('File  uploaded to '.format(
        source_file_name,
        destination_blob_name
    ))

upload_blob('bucket', new_file, new_filename)

def insert_bigquery_avro(target_uri, dataset_id, table_id):
    bigquery_client = bigquery.Client()
    dataset_ref = bigquery_client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.source_format = bigquery.SourceFormat.AVRO
    job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND
    job_config.use_avro_logical_types = True
    time_partitioning = bigquery.table.TimePartitioning()
    job_config.time_partitioning = time_partitioning
    uri = target_uri
    load_job = bigquery_client.load_table_from_uri(
        uri,
        dataset_ref.table(table_id),
        job_config=job_config
        )
    print('Starting job '.format(load_job.job_id))
    load_job.result()
    print('Job finished.')

Avro 架构:

"fields": [
    
      "name": "id",
      "type": 
        "type": "string",
        "avro.java.string": "String"
      ,
      "doc": "the payment id"
    ,
    
      "name": "merchant",
      "type": 
        "type": "string",
        "avro.java.string": "String"
      ,
      "doc": "the merchant who owns the payment"
    ,
    
      "name": "date",
      "type": 
        "type": "long",
        "logicalType": "timestamp-millis"
      ,
      "doc": "the date where the transaction happend"
    ,
    
      "name": "amount",
      "type": 
        "type": "record",
        "name": "amount",
        "fields": [
          
            "name": "amount",
            "type": [
              "null",
              
                "type": "bytes",
                "logicalType": "decimal",
                "precision": 5,
                "scale": 5
              
            ],
            "doc": "the original currency amount",
            "default": null
          ,
          
            "name": "foreignAmount",
            "type": [
              "null",
              
                "type": "bytes",
                "logicalType": "decimal",
                "precision": 5,
                "scale": 5
              
            ],
            "doc": "the foreign amount for the payment",
            "default": null
          ,
          
            "name": "code",
            "type": 
              "type": "string",
              "avro.java.string": "String"
            ,
            "doc": "the destination currency code"
          
        ],
        "default": null
      
    ,
    
      "name": "exchange_rate",
      "type": 
        "type": "record",
        "name": "code",
        "fields": [
          
            "name": "currency_code",
            "type": [
              "null",
              
                "type": "string",
                "avro.java.string": "String"
              
            ],
            "doc": "the exchange rate currency code",
            "default": null
          ,
          
            "name": "rate",
            "type": [
              "null",
              
                "type": "bytes",
                "logicalType": "decimal",
                "precision": 5,
                "scale": 5
              
            ],
            "default": null
          ,
          
            "name": "online_rate",
            "type": [
              "null",
              
                "type": "bytes",
                "logicalType": "decimal",
                "precision": 5,
                "scale": 5
              
            ],
            "default": null
          
        ]
      ,
      "doc": "The transaction exchange rate"
    ,
    
      "name": "consumer",
      "type": 
        "type": "record",
        "name": "Consumer",
        "fields": [
          
            "name": "name",
            "type": [
              "null",
              
                "type": "string",
                "avro.java.string": "String"
              
            ],
            "doc": "the consumer's name",
            "default": null
          ,
          
            "name": "email",
            "type": [
              "null",
              
                "type": "string",
                "avro.java.string": "String"
              
            ],
            "doc": "the consumer's email address",
            "default": null
          ,
          
            "name": "external_id",
            "type": [
              "null",
              
                "type": "string",
                "avro.java.string": "String"
              
            ],
            "doc": "the consumer's external id when needed",
            "default": null
          ,
          
            "name": "national_id",
            "type": 
              "type": "string",
              "avro.java.string": "String"
            ,
            "doc": "the national id"
          ,
          
            "name": "phone",
            "type": 
              "type": "string",
              "avro.java.string": "String"
            ,
            "doc": "the consumer's phone number",
            "default": ""
          
        ]
      
    ,
    
      "name": "soft_descriptor",
      "type": [
        "null",
        
          "type": "string",
          "avro.java.string": "String"
        
      ],
      "doc": "the description as it will be shown at the customer's invoice",
      "default": null
    ,
    
      "name": "merchant_contract",
      "type": 
        "type": "enum",
        "name": "merchant_contract_type",
        "symbols": [
          "PAY",
          "BANK"
        ]
      ,
      "default": "PAY"
    ,
    
      "name": "type",
      "type": 
        "type": "enum",
        "name": "payment_type",
        "symbols": [
          "INITIAL",
          "CREDIT_CARD",
          "DEBIT_CARD",
          "ONLINE_DEBIT",
          "BANK_SLIP",
          "DIGITAL_WALLET",
          "ELECTRONIC_BANK_TRANSFER"
        ]
      ,
      "default": "INITIAL"
    ,
    
      "name": "card",
      "type": 
        "type": "record",
        "name": "card",
        "fields": [
          
            "name": "type",
            "type": [
              "null",
              
                "type": "enum",
                "name": "card_type",
                "symbols": [
                  "CARD",
                  "TOKEN"
                ]
              
            ],
            "default": null
          ,
          
            "name": "mask_number",
            "type": [
              "null",
              
                "type": "string",
                "avro.java.string": "String"
              
            ],
            "default": null
          ,
          
            "name": "card_holder",
            "type": [
              "null",
              
                "type": "string",
                "avro.java.string": "String"
              
            ],
            "default": null
          ,
          
            "name": "brand",
            "type": [
              "null",
              
                "type": "string",
                "avro.java.string": "String"
              
            ],
            "default": null
          
        ]
      
    ,
    
      "name": "confirm",
      "type": "boolean",
      "doc": "indicates whether is self confirmed",
      "default": false
    ,
    
      "name": "installments",
      "type": "int",
      "doc": "Number of installments for the payment",
      "default": 1
    ,
    
      "name": "due_date",
      "type": [
        "null",
        
          "type": "int",
          "logicalType": "date"
        
      ],
      "default": null
    ,
    
      "name": "correlation_id",
      "type": [
        "null",
        
          "type": "string",
          "avro.java.string": "String"
        
      ],
      "doc": "the external customer correlationid",
      "default": null
    ,
    
      "name": "billing",
      "type": 
        "type": "record",
        "name": "Billing",
        "fields": [
          
            "name": "national_id",
            "type": [
              "null",
              
                "type": "string",
                "avro.java.string": "String"
              
            ],
            "doc": "billing info",
            "default": null
          ,
          
            "name": "name",
            "type": [
              "null",
              
                "type": "string",
                "avro.java.string": "String"
              
            ],
            "doc": "the consumer address name",
            "default": null
          
        ]
      
    ,
    
      "name": "status",
      "type": 
        "type": "enum",
        "name": "payment_status",
        "symbols": [
          "INITIAL",
          "CONSUMER",
          "AUTHORIZED",
          "WAITING_CONFIRMATION",
          "CANCELED",
          "WAITING_CLEARING",
          "CLEARED",
          "TRANSFERENCE",
          "DECLINED_BY_ISSUER",
          "DECLINED_BY_BUSINESS_RULES",
          "CONFIRMED",
          "WAITING_CANCELING",
          "WAITING_CONSUMER",
          "TRANSFER_REQUESTED"
        ],
        "default": "INITIAL"
      
    ,
    
      "name": "metadata",
      "type": 
        "type": "map",
        "values": 
          "type": "string",
          "avro.java.string": "String"
        ,
        "avro.java.string": "String"
      
    ,
    
      "name": "events",
      "type": 
        "type": "array",
        "items": 
          "type": "record",
          "name": "event",
          "fields": [
            
              "name": "id",
              "type": 
                "type": "string",
                "avro.java.string": "String"
              ,
              "default": "0"
            ,
            
              "name": "type",
              "type": 
                "type": "enum",
                "name": "event_type",
                "symbols": [
                  "AUTHORIZATION",
                  "AUTHENTICATION",
                  "CONFIRMATION",
                  "CANCELATION",
                  "CHECKOUT_CREATION",
                  "SETTLEMENT",
                  "TRANSFER_VALIDATION",
                  "TRANSFER_SCHEDULE",
                  "TRANSFERRED"
                ]
              
            ,
            
              "name": "gateway",
              "type": [
                "null",
                
                  "type": "string",
                  "avro.java.string": "String"
                
              ],
              "default": null
            ,
            
              "name": "breadcrumb_id",
              "type": [
                "null",
                
                  "type": "string",
                  "avro.java.string": "String"
                
              ],
              "default": null
            ,
            
              "name": "request_time",
              "type": 
                "type": "long",
                "logicalType": "timestamp-millis"
              ,
              "doc": "the moment where this request was received by the platform"
            ,
            
              "name": "response_time",
              "type": 
                "type": "long",
                "logicalType": "timestamp-millis"
              ,
              "doc": "the moment where this request was returned by the platform"
            ,
            
              "name": "status",
              "type": 
                "type": "enum",
                "name": "event_status",
                "symbols": [
                  "SUCCESS",
                  "DENIED",
                  "ERROR",
                  "TIMEOUT",
                  "PENDING"
                ]
              
            ,
            
              "name": "actor",
              "type": 
                "type": "enum",
                "name": "actor",
                "symbols": [
                  "AQ",
                  "GTW",
                  "CONCIL"
                ],
                "default": "GTW"
              ,
              "default": "GTW"
            ,
            
              "name": "amount",
              "type": [
                "null",
                
                  "type": "bytes",
                  "logicalType": "decimal",
                  "precision": 5,
                  "scale": 5
                
              ],
              "doc": "the original currency amount",
              "default": null
            ,
            
              "name": "foreign_amount",
              "type": [
                "null",
                
                  "type": "bytes",
                  "logicalType": "decimal",
                  "precision": 5,
                  "scale": 5
                
              ],
              "doc": "the foreign amount for the payment",
              "default": null
            ,
            
              "name": "error",
              "type": 
                "type": "record",
                "name": "Error",
                "fields": [
                  
                    "name": "code",
                    "type": [
                      "null",
                      
                        "type": "string",
                        "avro.java.string": "String"
                      
                    ],
                    "default": null
                  ,
                  
                    "name": "message",
                    "type": [
                      "null",
                      
                        "type": "string",
                        "avro.java.string": "String"
                      
                    ],
                    "default": null
                  
                ]
              
            ,
            
              "name": "message",
              "type": [
                "null",
                
                  "type": "string",
                  "avro.java.string": "String"
                
              ],
              "default": null
            ,
            
              "name": "fee_amount",
              "type": [
                "null",
                
                  "type": "bytes",
                  "logicalType": "decimal",
                  "precision": 5,
                  "scale": 5
                
              ],
              "doc": "the fee amount",
              "default": null
            ,
            
              "name": "net_amount",
              "type": [
                "null",
                
                  "type": "bytes",
                  "logicalType": "decimal",
                  "precision": 5,
                  "scale": 5
                
              ],
              "doc": "the net amount",
              "default": null
            ,
            
              "name": "metadata",
              "type": 
                "type": "map",
                "values": 
                  "type": "string",
                  "avro.java.string": "String"
                ,
                "avro.java.string": "String"
              
            ,
            
              "name": "internal_metadata",
              "type": 
                "type": "map",
                "values": 
                  "type": "string",
                  "avro.java.string": "String"
                ,
                "avro.java.string": "String"
              
            
          ]
        
      
    ,
    
      "name": "bank_account",
      "type": [
        "null",
        
          "type": "record",
          "name": "bank_account",
          "fields": [
            
              "name": "name",
              "type": 
                "type": "string",
                "avro.java.string": "String"
              ,
              "doc": "the bank name",
              "default": ""
            ,
            
              "name": "code",
              "type": 
                "type": "string",
                "avro.java.string": "String"
              ,
              "doc": "the bank code",
              "default": ""
            ,
            
              "name": "agency",
              "type": 
                "type": "string",
                "avro.java.string": "String"
              ,
              "doc": "the bank agency",
              "default": ""
            ,
            
              "name": "account",
              "type": 
                "type": "string",
                "avro.java.string": "String"
              ,
              "doc": "the bank account",
              "default": ""
            ,
            
              "name": "document_number",
              "type": 
                "type": "string",
                "avro.java.string": "String"
              ,
              "doc": "the bank document number (CNPJ)",
              "default": ""
            
          ]
        
      ],
      "doc": "The bank account values",
      "default": null
    
  ]

【问题讨论】:

1.您能否在加载作业完成后发布 BigQuery 表的结果架构? 2. 您是尝试加载到现有表(然后您使用哪种写入模式,WRITE_APPEND 或 WRITE_TRUNCATE),还是加载到新表。 3. 哪一列给了你意想不到的类型? @NhanNguyen 你好,我刚刚包含了所要求的信息。如果表不存在,想法是它生成一个表,但它都在 WRITE_APPEND 之上。需要更改的列:date(时间戳)、due_date(日期)、response_time(时间戳)、request_time(时间戳)。 @NhanNguyen 你有什么建议吗? 【参考方案1】:

尝试将此设置为 avro 架构上所有时间戳列的“类型”:

"type": ["null", "type": "long", "logicalType": "timestamp-millis"]

【讨论】:

我试过了,但是,我收到了这条消息:Can't access branch index 1574092545874 for union with 2 个分支 Writer's Schema: [ "null", "type": "long", "logicalType": "timestamp-millis" ] 读者架构:[ "null", "type": "long", "logicalType": "timestamp-millis" ] 您能提供您的架构吗? 我看到你也有日期类型,所以我之前的建议不起作用。请参考这个public issue和this

以上是关于BigQuery - use_avro_logical_types 在 python 脚本中不起作用的主要内容,如果未能解决你的问题,请参考以下文章

数据处理 - BigQuery 与 Data Proc+BigQuery

BigQuery:写入查询结果时使用 bigquery 作业的意外行为

Google BigQuery - 将数据流式传输到 BigQuery

Python BigQuery 脚本 bigquery.jobs.create 错误

阻止用户删除 BigQuery 表

Bigquery:在 Bigquery 中计算余额或重写 SQL 脚本