在数据流管道中动态设置 bigquery 表 id
Posted
技术标签:
【中文标题】在数据流管道中动态设置 bigquery 表 id【英文标题】:Dynamically set bigquery table id in dataflow pipeline 【发布时间】:2021-04-15 11:08:20 【问题描述】:我有数据流管道,它在 Python 中,这就是它正在做的事情:
从 PubSub 读取消息。消息是压缩的协议缓冲区。在 PubSub 上接收的一条消息包含多种类型的消息。请参阅下面的协议父级消息规范:
message BatchEntryPoint
/**
* EntryPoint
*
* Description: Encapsulation message
*/
message EntryPoint
// Proto Message
google.protobuf.Any proto = 1;
// Timestamp
google.protobuf.Timestamp timestamp = 4;
// Array of EntryPoint messages
repeated EntryPoint entrypoints = 1;
所以,为了更好地解释,我有几个 protobuf 消息。每条消息都必须打包在 EntryPoint 消息的 proto 字段中,由于 MQTT 的限制,我们一次发送多条消息,这就是为什么我们在 BatchEntryPoint 上使用指向 EntryPoint 消息的重复字段。
-
解析收到的消息。
这里没什么特别的,只是解压缩和反序列化我们刚刚从 PubSub 读取的消息。获取“人类可读”数据。
-
BatchEntryPoint 上的 For 循环以评估每个 EntryPoint 消息。
由于 BatchEntryPoint 上的每条消息可以有不同的类型,我们需要对它们进行不同的处理
-
解析的消息数据
执行不同的过程来获取我需要的所有信息并将其格式化为 BigQuery 可读格式
-
将数据写入 bigQuery
这就是我的“麻烦”开始的地方,所以我的代码可以工作,但在我看来它很脏而且很难维护。 有两点需要注意。 每个消息的类型可以发送到 3 个不同的数据集,一个研发数据集,一个开发数据集和一个生产数据集。 假设我有一条名为 System. 它可以转到:
我的项目:rd_dataset.system 我的项目:dev_dataset.system 我的项目:prod_dataset.system这就是我现在正在做的事情:
console_records | 'Write to Console BQ' >> beam.io.WriteToBigQuery(
lambda e: 'my-project:rd_dataset.table1' if dataset_is_rd_table1(e) else (
'my-project:dev_dataset.table1' if dataset_is_dev_table1(e) else (
'my-project:prod_dataset.table1' if dataset_is_prod_table1(e) else (
'my-project:rd_dataset.table2' if dataset_is_rd_table2(e) else (
'my-project:dev_dataset.table2' if dataset_is_dev_table2(e) else (
...) else 0
我有 30 多种不同类型的消息,其中 90 多行用于向大查询插入数据。
dataset_is_..._tableX 方法如下所示:
def dataset_is_rd_messagestype(element) -> bool:
""" check if env is rd for message's type message """
valid: bool = False
is_type = check_element_type(element, 'MessagesType')
if is_type:
valid = dataset_is_rd(element)
return valid
check_element_type 检查消息的类型是否正确(例如:系统)。 dataset_is_rd 看起来像这样:
def dataset_is_rd(element) -> bool:
""" Check if dataset should be RD from registry id """
if element['device_registry_id'] == 'rd':
del element['device_registry_id']
del element['bq_type']
return True
return False
作为键的元素指示我们必须在哪个数据集上发送消息。
所以这按预期工作,但我希望我可以编写更简洁的代码,并可能减少在添加或删除某种类型的消息时要更改的代码量。
有什么想法吗?
【问题讨论】:
IMO 您只需要根据WriteToBigQuery's documentation 为您的 lambda 命名 - 我的意思是,就像在 define 中一样,它是一个 function类似get_table_destination_from_element
【参考方案1】:
用TaggedOutput怎么样。
【讨论】:
问题是,我不能使用 Pcollection 来设置我的表,因为我们可以从中提取数据【参考方案2】:你能不能这样写:
def dataset_type(element) -> bool:
""" Check if dataset should be RD from registry id """
dev_registry = element['device_registry_id']
del element['device_registry_id']
del element['bq_type']
table_type = get_element_type(element, 'MessagesType')
return 'my-project:%s_dataset.table%d' % (dev_registry, table_type)
并将其用作传递给 BQ 的 table
lambda?
【讨论】:
在此函数中更改element
是一个好习惯吗?此处的更改将从我刚才测试的内容写入 BigQuery...
您不得修改该元素。甚至更多像这样的简单方法不能处理 PCollection 数据【参考方案3】:
所以我设法通过动态制作表名来创建将数据插入动态表的代码。
这并不完美,因为我必须修改传递给方法的元素,但是我仍然对结果非常满意,它已经从数百行中清理了我的代码。如果我有一个新表,添加它需要在数组中添加一行,而之前在管道中添加 6 行。
这是我的解决方案:
def batch_pipeline(pipeline):
console_message = (
pipeline
| 'Get console\'s message from pub/sub' >> beam.io.ReadFromPubSub(
subscription='sub1',
with_attributes=True)
)
common_message = (
pipeline
| 'Get common\'s message from pub/sub' >> beam.io.ReadFromPubSub(
subscription='sub2',
with_attributes=True)
)
jetson_message = (
pipeline
| 'Get jetson\'s message from pub/sub' >> beam.io.ReadFromPubSub(
subscription='sub3',
with_attributes=True)
)
message = (console_message, common_message, jetson_message) | beam.Flatten()
clear_message = message | beam.ParDo(GetClearMessage())
console_bytes = clear_message | beam.ParDo(SetBytesData())
console_bytes | 'Write to big query back up table' >> beam.io.WriteToBigQuery(
lambda e: write_to_backup(e)
)
records = clear_message | beam.ParDo(GetProtoData())
gps_records = clear_message | 'Get GPS Data' >> beam.ParDo(GetProtoData())
parsed_gps = gps_records | 'Parse GPS Data' >> beam.ParDo(ParseGps())
if parsed_gps:
parsed_gps | 'Write to big query gps table' >> beam.io.WriteToBigQuery(
lambda e: write_gps(e)
)
records | 'Write to big query table' >> beam.io.WriteToBigQuery(
lambda e: write_to_bq(e)
)
因此管道正在从 3 个不同的 pub sub 中读取数据,提取数据并写入大查询。
WriteToBigQuery 使用的元素结构如下所示:
obj =
'data': data_to_write_on_bq,
'registry_id': data_needed_to_craft_table_name,
'gcloud_id': data_to_write_on_bq,
'proto_type': data_needed_to_craft_table_name
然后我在 WriteToBigQuery 的 lambda 上使用的一种方法如下所示:
def write_to_bq(e):
logging.info(e)
element = copy(e)
registry = element['registry_id']
logging.info(registry)
dataset = set_dataset(registry) # set dataset name, knowing the registry, this is to set the environment (dev/prod/rd/...)
proto_type = element['proto_type']
logging.info('Proto Type %s', proto_type)
table_name = reduce(lambda x, y: x + ('_' if y.isupper() else '') + y, proto_type).lower()
full_table_name = f'my_project:dataset.table_name'
logging.info(full_table_name)
del e['registry_id']
del e['proto_type']
return full_table_name
就是这样,经过 3 天的麻烦!
【讨论】:
以上是关于在数据流管道中动态设置 bigquery 表 id的主要内容,如果未能解决你的问题,请参考以下文章
Bigquery - 在 CSV(联合表)中处理双引号和管道字段分隔符
Google Cloud Data Fusion,如何在一个管道中将多个表加载到 bigquery