在 Golang 中使用 BigQuery Write API

Posted

技术标签:

【中文标题】在 Golang 中使用 BigQuery Write API【英文标题】:Using BigQuery Write API in Golang 【发布时间】:2021-12-08 17:28:29 【问题描述】:

我正在尝试使用新的 Bigquery Storage API 从 Golang 进行流式插入。我根据this page 了解到,此 API 取代了旧的流式插入 bigquery API。

但是,examples in the docs 都没有显示如何实际插入行。为了创建 AppendRowsRequest,我得到了以下结果:

&storagepb.AppendRowsRequest
    WriteStream: resp.Name,
    Rows: &storagepb.AppendRowsRequest_ProtoRows
        ProtoRows: &storagepb.AppendRowsRequest_ProtoData
            WriterSchema: nil, // protobuf schema??
            Rows: &storagepb.ProtoRows
                SerializedRows: [][]byte, // serialized protocol buffer data??
            ,
        ,
    ,

我应该在上面的 SerializedRows 字段中输入什么数据?

上面的storagepb.ProtoRows 结构记录在here。不幸的是,给出的只是协议缓冲区主概述页面的链接。

谁能给我一个使用新的 Bigquery Storage API 将行从 Golang 流式传输到 bigquery 的示例?

【问题讨论】:

【参考方案1】:

我发现了一些关于将流写入表的文档 [1][2],但我不确定这就是您要查找的内容。请记住, storage/apiv1beta2 当前处于 beta 状态,因此可能尚未实现或缺少有关它的文档。如果我附加的文档对您没有帮助,我们可以打开一个公共问题跟踪器来正确记录或实施行流式处理。

【讨论】:

是的,这些是我在代码仓库中找到的示例以及我在那里找到的文档。尽管如此,还是感谢您的指点。 您是否正在寻找类似 @​​987654323@ 但在 Golang 中的东西? 是的,看起来很有帮助。我想在 Golang 中还没有写过这样的例子。 Python一仍然有帮助。非常感谢。 嗨,亚历克斯!抱歉回复晚了,我刚刚打开了public feature request。请随时添加您认为相关的任何信息 另外,目前的‘managedwriter’客户端是here,也许你会觉得有用【参考方案2】:

通过从上面的答案我是来工作的例子,这是可以在GitHub上有很大帮助: https://github.com/alexflint/bigquery-storage-api-example

主要代码:

const (
    project = "myproject"
    dataset = "mydataset"
    table   = "mytable"
    trace   = "bigquery-writeclient-example" // identifies this client for bigquery debugging
)

// the data we will stream to bigquery
var rows = []*Row
    Name: "John Doe", Age: 104,
    Name: "Jane Doe", Age: 69,
    Name: "Adam Smith", Age: 33,


func main() 
    ctx := context.Background()

    // create the bigquery client
    client, err := storage.NewBigQueryWriteClient(ctx)
    if err != nil 
        log.Fatal(err)
    
    defer client.Close()

    // create the write stream
    // a COMMITTED write stream inserts data immediately into bigquery
    resp, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest
        Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", project, dataset, table),
        WriteStream: &storagepb.WriteStream
            Type: storagepb.WriteStream_COMMITTED,
        ,
    )
    if err != nil 
        log.Fatal("CreateWriteStream: ", err)
    

    // get the stream by calling AppendRows
    stream, err := client.AppendRows(ctx)
    if err != nil 
        log.Fatal("AppendRows: ", err)
    

    // get the protobuf descriptor for our row type
    var row Row
    descriptor, err := adapt.NormalizeDescriptor(row.ProtoReflect().Descriptor())
    if err != nil 
        log.Fatal("NormalizeDescriptor: ", err)
    

    // serialize the rows
    var opts proto.MarshalOptions
    var data [][]byte
    for _, row := range rows 
        buf, err := opts.Marshal(row)
        if err != nil 
            log.Fatal("protobuf.Marshal: ", err)
        
        data = append(data, buf)
    

    // send the rows to bigquery
    err = stream.Send(&storagepb.AppendRowsRequest
        WriteStream: resp.Name,
        TraceId:     trace, // identifies this client
        Rows: &storagepb.AppendRowsRequest_ProtoRows
            ProtoRows: &storagepb.AppendRowsRequest_ProtoData
                // protocol buffer schema
                WriterSchema: &storagepb.ProtoSchema
                    ProtoDescriptor: descriptor,
                ,
                // protocol buffer data
                Rows: &storagepb.ProtoRows
                    SerializedRows: data, // serialized protocol buffer data
                ,
            ,
        ,
    )
    if err != nil 
        log.Fatal("AppendRows.Send: ", err)
    

    // get the response, which will tell us whether it worked
    _, err = stream.Recv()
    if err != nil 
        log.Fatal("AppendRows.Recv: ", err)
    

    log.Println("done")

和为“行”结构中的协议缓存定义以上是:

syntax = "proto3";

package tutorial;

option go_package = ".;main";

message Row 
    string Name = 1;
    int32 Age = 2;

您需要先创建一个BigQuery资料集和表与模式对应于所述协议缓冲器。见上文对于如何做到这一点链接库中的自述。 P>

运行上面的代码中,数据显示了在大量查询这样后:

$ bq query 'select * from mydataset.mytable'
Waiting on bqjob_r1b39442e5474a885_0000017df21f629e_1 ... (0s) Current status: DONE   
+------------+-----+
|    name    | age |
+------------+-----+
| John Doe   | 104 |
| Jane Doe   |  69 |
| Adam Smith |  33 |
+------------+-----+

谢谢大家的帮助! P>

【讨论】:

以上是关于在 Golang 中使用 BigQuery Write API的主要内容,如果未能解决你的问题,请参考以下文章

BigQuery - 获取 1000000 条记录并使用 goLang 对数据进行一些处理

BIGQUERY Golang客户获取项目列表

尝试从 golang 广告读取/运行对 bigquery 的查询被拒绝访问:BigQuery BigQuery:未找到具有 Google Drive 范围的 OAuth 令牌

Go 库的 BigQuery 行插入失败

Go Bigquery 上的 DEADLINE_EXCEEDED

使用参数从 Golang 调用 Apps Script API 函数?