如何编写云函数来接收、解析和发布 PubSub 消息?

Posted

技术标签:

【中文标题】如何编写云函数来接收、解析和发布 PubSub 消息?【英文标题】:How do I write a Cloud Function to receive, parse, and publish PubSub messages? 【发布时间】:2019-05-30 18:28:24 【问题描述】:

这可以被视为this thread 的后续行动,但我需要更多帮助来推动事情的发展。希望有人可以在下面查看我的尝试并提供进一步的指导。

总而言之,我需要一个云功能

    由主题 A 中发布的 PubSub 消息触发(这可以在 UI 中完成)。 在“推送”PubSub 主题 A 中读取凌乱的对象更改通知消息。 “解析”它 在 PubSub 主题 B 中发布消息,以原始消息 ID 作为数据,其他元数据(例如文件名、大小、时间)作为属性。

。 1:

杂乱对象更改通知示例:

\n "kind": "storage#object",\n "id": "bucketcfpubsub/test.txt/1544681756538155",\n "selfLink": "https://www.googleapis.com/storage/v1/b/bucketcfpubsub/o/test.txt",\n "name": " test.txt",\n "bucket": "bucketcfpubsub",\n "generation": "1544681756538155",\n "metageneration": "1",\n "contentType": "text/plain",\n " timeCreated": "2018-12-13T06:15:56.537Z",\n "更新": "2018-12-13T06:15:56.537Z",\n "storageClass": "STANDARD",\n "timeStorageClassUpdated" : "2018-12-13T06:15:56.537Z",\n "大小": "1938",\n "md5Hash": "sDSXIvkR/PBg4mHyIUIvww==",\n "mediaLink": "https://www.googleapis.com/download/storage/v1/b/bucketcfpubsub/o/test.txt?generation=1544681756538155&alt=media", \n "crc32c": "UDhyzw==",\n "etag": "CKvqjvuTnN8CEAE="\n\n

为了澄清,这是一条带有空白“数据”字段的消息,并且上面的所有信息都在属性对中(如“属性名称”:“属性数据”)?还是只是塞进“数据”字段的长字符串,没有“属性”?

。 2:

在上面的线程中,使用了“拉”订阅。它比使用“推送”订阅更好吗?推送示例如下:

def create_push_subscription(project_id,
                             topic_name,
                             subscription_name,
                             endpoint):
    """Create a new push subscription on the given topic."""
    # [START pubsub_create_push_subscription]
    from google.cloud import pubsub_v1

    # TODO project_id = "Your Google Cloud Project ID"
    # TODO topic_name = "Your Pub/Sub topic name"
    # TODO subscription_name = "Your Pub/Sub subscription name"
    # TODO endpoint = "https://my-test-project.appspot.com/push"

    subscriber = pubsub_v1.SubscriberClient()
    topic_path = subscriber.topic_path(project_id, topic_name)
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    push_config = pubsub_v1.types.PushConfig(
        push_endpoint=endpoint)

    subscription = subscriber.create_subscription(
        subscription_path, topic_path, push_config)

    print('Push subscription created: '.format(subscription))
    print('Endpoint for subscription is: '.format(endpoint))
    # [END pubsub_create_push_subscription]

或者在此之后我需要更多代码来接收消息吗?

另外,每次发布的 pubsub 消息触发 Cloud Function 时,这不是都会创建一个新订阅者吗?我应该在 CF 末尾添加订阅删除代码,还是有更有效的方法来执行此操作?

。 3:

接下来,解析代码,这个示例代码做了如下几个属性:

def summarize(message):
    # [START parse_message]
    data = message.data
    attributes = message.attributes

    event_type = attributes['eventType']
    bucket_id = attributes['bucketId']
    object_id = attributes['objectId']

这是否适用于我在 1: 中的上述通知?

。 4:

如何分隔 topic_name?第 1 步和第 2 步使用主题 A,而这一步是发布到主题 B。是不是就像在下面的代码示例中重写 topic_name 一样简单?

# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
    data = u'Message number '.format(n)
    # Data must be a bytestring
    data = data.encode('utf-8')
    # Add two attributes, origin and username, to the message
    publisher.publish(
        topic_path, data, origin='python-sample', username='gcp')

print('Published messages with custom attributes.')

我获得大部分示例代码的来源(除了上面的线程):python-docs-samples。将上述代码示例改编和串在一起会产生有用的代码吗?或者我还会缺少“import ****”之类的东西吗?

【问题讨论】:

【参考方案1】:

您不应尝试手动创建在 Cloud Functions 中运行的订阅者。相反,请按照文档here 设置一个云函数,该函数将通过传递--trigger-topic 命令行参数来调用发送到给定主题的所有消息。

解决您的其他一些问题:

“我是否应该在 CF 末尾添加订阅删除代码”- 订阅是对应于特定积压消息的长期资源。如果在云函数结束时创建和删除订阅,则不会收到不存在时发送的消息。

“我如何分隔主题名称”- 本示例中的“主题名称”指的是格式为 projects/project_id/topics/topic_name 的字符串的最后一部分,它将出现在您的主题的云控制台中的 this page 上已创建。

【讨论】:

以上是关于如何编写云函数来接收、解析和发布 PubSub 消息?的主要内容,如果未能解决你的问题,请参考以下文章

具有 CustomAttributes 的 PubSub - 未调用接收器函数,但出现 json 解析异常

是否可以有 2 个不同的 pubsub 指向单个云功能?

GCP 云功能未正确接收/确认 PubSub 消息

从 Pubsub 在 BigQuery 中编写查询

如何使用 Node.js 控制 Cloud PubSub 中的确认

Google Pubsub - 接收推送订阅的传递尝试