GCP Pub/Sub - 如何从 BQ 计划查询中检索状态

Posted

技术标签:

【中文标题】GCP Pub/Sub - 如何从 BQ 计划查询中检索状态【英文标题】:GCP Pub/Sub - How to retrieve state from BQ scheduled query 【发布时间】:2020-06-24 23:41:37 【问题描述】:

我有一个通过 pub/sub 触发云功能的 Big Query 预定查询。

我希望函数从发布/订阅消息中读取“状态”值,以便查看它是否成功完成。

以下将始终触发 else 语句。如果 if 语句被删除,它会返回一个 KeyError。

import base64

def hello_pubsub(event, context):
    data = base64.b64decode(event['data']).decode('utf-8')

    if 'state' in data:
        state = data['state']
        print("returned state: " + state)
    else:
        print ("No state attribute found")

这是函数应该接收的 pubsub 消息:


"data":
"dataSourceId": "scheduled_query", 
"destinationDatasetId": "xxxxxxxxxx", 
"emailPreferences":  , 
"endTime": "2020-03-12T20:40:13.627285Z", 
"errorStatus":  ,
"name": "xxxxxxxxxx", "notificationPubsubTopic": "projects/xxxxxxxxxx/topics/xxxxxxxxxx", 
"params":  "destination_table_name_template": "xxxxxxxxxx", "query": "xxxxxxxxxx", "write_disposition": "WRITE_TRUNCATE" , 
"runTime": "2020-03-05T10:00:00Z", 
"scheduleTime": "2020-03-12T20:37:13.17166Z", 
"startTime": "2020-03-12T20:37:13.328479Z", 
"state": "SUCCEEDED", 
"updateTime": "2020-03-12T20:40:13.627307Z", 
"userId": "xxxxxxxxxx"


【问题讨论】:

在您的问题中显示您的函数收到的event 数据。 “不起作用”是什么意思?错误或其中一条消息? 嗨约翰,很抱歉没有更清楚。我已经编辑了我的帖子以包含该信息。 对您问题的更新是实际收到的数据还是您认为应该是的数据? 实际收到,打印到log获取。 【参考方案1】:

我想通了。

data = base64.b64decode(event['data']).decode('utf-8')

这将返回一个 json 格式的字符串,而不是一个字典对象。您需要通过以下方式转换为 dict:

data_dict = json.loads(data)

为了能够像字典一样访问它。

【讨论】:

【参考方案2】:

你可以看看python库here

你还有the documentation

最后你可以查看additional fields added in the JSON notification message

【讨论】:

以上是关于GCP Pub/Sub - 如何从 BQ 计划查询中检索状态的主要内容,如果未能解决你的问题,请参考以下文章

GCP 数据流:从 Pub/Sub IO 流式传输的系统滞后

如何将 blob 文件发布到 GCP Pub/Sub?

如何在 GCP Pub Sub 中配置非持久消息?

GCP Pub/Sub 无法确认消息

为啥 GCP Pub/Sub 订阅无法确认消息?

如何在 GCP Pub/sub 中修复来自推送订阅的多条消息