在数据流管道中动态设置 bigquery 表 id

Posted

技术标签:

【中文标题】在数据流管道中动态设置 bigquery 表 id【英文标题】:Dynamically set bigquery table id in dataflow pipeline 【发布时间】:2021-04-15 11:08:20 【问题描述】:

我有数据流管道,它在 Python 中,这就是它正在做的事情:

    从 PubSub 读取消息。消息是压缩的协议缓冲区。在 PubSub 上接收的一条消息包含多种类型的消息。请参阅下面的协议父级消息规范:

    message BatchEntryPoint 
    
     /**
     * EntryPoint
     * 
     * Description: Encapsulation message
     */
    message EntryPoint 
    // Proto Message
    google.protobuf.Any proto = 1;
    
    // Timestamp
    google.protobuf.Timestamp timestamp = 4;
    
    
    // Array of EntryPoint messages
    repeated EntryPoint entrypoints = 1;
    
    

所以,为了更好地解释,我有几个 protobuf 消息。每条消息都必须打包在 EntryPoint 消息的 proto 字段中,由于 MQTT 的限制,我们一次发送多条消息,这就是为什么我们在 BatchEntryPoint 上使用指向 EntryPoint 消息的重复字段。

    解析收到的消息。

这里没什么特别的,只是解压缩和反序列化我们刚刚从 PubSub 读取的消息。获取“人类可读”数据。

    BatchEntryPoint 上的 For 循环以评估每个 EntryPoint 消息。

由于 BatchEntryPoint 上的每条消息可以有不同的类型,我们需要对它们进行不同的处理

    解析的消息数据

执行不同的过程来获取我需要的所有信息并将其格式化为 BigQuery 可读格式

    将数据写入 bigQuery

这就是我的“麻烦”开始的地方,所以我的代码可以工作,但在我看来它很脏而且很难维护。 有两点需要注意。 每个消息的类型可以发送到 3 个不同的数据集,一个研发数据集,一个开发数据集和一个生产数据集。 假设我有一条名为 System. 它可以转到:

我的项目:rd_dataset.system 我的项目:dev_dataset.system 我的项目:prod_dataset.system

这就是我现在正在做的事情:

console_records | 'Write to Console BQ' >> beam.io.WriteToBigQuery(
    lambda e: 'my-project:rd_dataset.table1' if dataset_is_rd_table1(e) else (
        'my-project:dev_dataset.table1' if dataset_is_dev_table1(e) else (
        'my-project:prod_dataset.table1' if dataset_is_prod_table1(e) else (
        'my-project:rd_dataset.table2' if dataset_is_rd_table2(e) else (
        'my-project:dev_dataset.table2' if dataset_is_dev_table2(e) else (
        ...) else 0

我有 30 多种不同类型的消息,其中 90 多行用于向大查询插入数据。

dataset_is_..._tableX 方法如下所示:

def dataset_is_rd_messagestype(element) -> bool:
""" check if env is rd for message's type message """
valid: bool = False
is_type = check_element_type(element, 'MessagesType')
if is_type:
    valid = dataset_is_rd(element)
return valid

check_element_type 检查消息的类型是否正确(例如:系统)。 dataset_is_rd 看起来像这样:

def dataset_is_rd(element) -> bool:
    """ Check if dataset should be RD from registry id """
    if element['device_registry_id'] == 'rd':
        del element['device_registry_id']
        del element['bq_type']
        return True
    return False

作为键的元素指示我们必须在哪个数据集上发送消息。

所以这按预期工作,但我希望我可以编写更简洁的代码,并可能减少在添加或删除某种类型的消息时要更改的代码量。

有什么想法吗?

【问题讨论】:

IMO 您只需要根据WriteToBigQuery's documentation 为您的 lambda 命名 - 我的意思是,就像在 define 中一样,它是一个 function类似get_table_destination_from_element 【参考方案1】:

用TaggedOutput怎么样。

【讨论】:

问题是,我不能使用 Pcollection 来设置我的表,因为我们可以从中提取数据【参考方案2】:

你能不能这样写:

def dataset_type(element) -> bool:
    """ Check if dataset should be RD from registry id """
    dev_registry = element['device_registry_id']
    del element['device_registry_id']
    del element['bq_type']
    table_type = get_element_type(element, 'MessagesType')
    return 'my-project:%s_dataset.table%d' % (dev_registry, table_type)

并将其用作传递给 BQ 的 table lambda?

【讨论】:

在此函数中更改element 是一个好习惯吗?此处的更改将从我刚才测试的内容写入 BigQuery... 您不得修改该元素。甚至更多像这样的简单方法不能处理 PCollection 数据【参考方案3】:

所以我设法通过动态制作表名来创建将数据插入动态表的代码。

这并不完美,因为我必须修改传递给方法的元素,但是我仍然对结果非常满意,它已经从数百行中清理了我的代码。如果我有一个新表,添加它需要在数组中添加一行,而之前在管道中添加 6 行。

这是我的解决方案:

def batch_pipeline(pipeline):
    console_message = (
            pipeline
            | 'Get console\'s message from pub/sub' >> beam.io.ReadFromPubSub(
        subscription='sub1',
        with_attributes=True)
    )
    common_message = (
            pipeline
            | 'Get common\'s message from pub/sub' >> beam.io.ReadFromPubSub(
        subscription='sub2',
        with_attributes=True)
    )
    jetson_message = (
            pipeline
            | 'Get jetson\'s message from pub/sub' >> beam.io.ReadFromPubSub(
        subscription='sub3',
        with_attributes=True)
    )

 

message = (console_message, common_message, jetson_message) | beam.Flatten()
clear_message = message | beam.ParDo(GetClearMessage())
console_bytes = clear_message | beam.ParDo(SetBytesData())
console_bytes | 'Write to big query back up table' >> beam.io.WriteToBigQuery(
    lambda e: write_to_backup(e)
)
records = clear_message | beam.ParDo(GetProtoData())
gps_records = clear_message | 'Get GPS Data' >> beam.ParDo(GetProtoData())
parsed_gps = gps_records | 'Parse GPS Data' >> beam.ParDo(ParseGps())
if parsed_gps:
    parsed_gps | 'Write to big query gps table' >> beam.io.WriteToBigQuery(
        lambda e: write_gps(e)
    )
records | 'Write to big query table' >> beam.io.WriteToBigQuery(
    lambda e: write_to_bq(e)
)

因此管道正在从 3 个不同的 pub sub 中读取数据,提取数据并写入大查询。

WriteToBigQuery 使用的元素结构如下所示:

  obj = 
        'data': data_to_write_on_bq,
        'registry_id': data_needed_to_craft_table_name,
        'gcloud_id': data_to_write_on_bq,
        'proto_type': data_needed_to_craft_table_name
  

然后我在 WriteToBigQuery 的 lambda 上使用的一种方法如下所示:

def write_to_bq(e):
    logging.info(e)
    element = copy(e)
    registry = element['registry_id']
    logging.info(registry)
    dataset = set_dataset(registry) # set dataset name, knowing the registry, this is to set the environment (dev/prod/rd/...)
    proto_type = element['proto_type']
    logging.info('Proto Type %s', proto_type)
    table_name = reduce(lambda x, y: x + ('_' if y.isupper() else '') + y, proto_type).lower()
    full_table_name = f'my_project:dataset.table_name'
    logging.info(full_table_name)
    del e['registry_id']
    del e['proto_type']

    return full_table_name

就是这样,经过 3 天的麻烦!

【讨论】:

以上是关于在数据流管道中动态设置 bigquery 表 id的主要内容,如果未能解决你的问题,请参考以下文章

BigQuery 代码段中的错误

Bigquery - 在 CSV(联合表)中处理双引号和管道字段分隔符

带有 BigQuery 的动态表后缀不限制处理的数据

Google Cloud Data Fusion,如何在一个管道中将多个表加载到 bigquery

流插入,然后定期合并到 Dataflow 管道中的 BigQuery [关闭]

在 Dataflow 管道中写入 BigQuery 表失败