PubSub 主题不从云功能中提取数据

Posted

技术标签:

【中文标题】PubSub 主题不从云功能中提取数据【英文标题】:PubSub topic does not pull data from cloud function 【发布时间】:2021-10-20 08:16:43 【问题描述】:

我正在尝试通过云功能->PubSub--> BigQuery 进行网络抓取

我编写了一个 python 代码并将我的代码部署到云函数。此代码的文本结果变为“正常”,我可以在日志中看到爬取的数据。但是当我试图从主题中提取消息时,我无法获得任何数据。当我检查 PubSub Api 指标时,我看到 404 响应。我应该如何编写将消息发布到 PubSub 主题的代码?

这是我目前写的代码:

import base64
from bs4 import BeautifulSoup
import requests
from google.cloud import pubsub_v1

def hello_pubsub(event, context):
    publisher = pubsub_v1.PublisherClient()
    # The `topic_path` method creates a fully qualified identifier
    # in the form `projects/project_id/topics/topic_id`
    topic_path = publisher.topic_path("tokyo-ring-<secret>", "webscraping")
    html_text = requests.get('https://www.arabam.com/ikinci-el?take=50').text

    #print(html_text)
    soup = BeautifulSoup(html_text,'lxml')
    models = soup.find_all('tr', class_='listing-list-item pr should-hover bg-white')
    for model in models:
        model_name = model.find('td', class_='listing-modelname pr').text
        title = model.find('td', class_='horizontal-half-padder-minus pr').text
        model_year = model.find('td', class_='listing-text pl8 pr8 tac pr').text
        price = model.find('td', 'pl8 pr8 tac pr').text.replace('TL','').replace(' ','').replace('.','')
        publish_date = model.find('td', class_='listing-text tac pr').text
        location = model.find('div', style='display:flex;justify-content:center;align-items:center;height:81px').text.split(' ', 1)[0]
        data= ""+"\"model_name\":\""+model_name+"\""+","+"\"title\":"+"\""+title+"\",\""+"model_year\""+":\""+model_year+"\""+",\"price\":\""+price+"\""+",\"publish_date\":\""+publish_date+"\","+"\"location\":\""+location+"\""
        #pubsub_message = base64.b64decode(event['data']).decode('utf-8')
        print(data)

【问题讨论】:

【参考方案1】:

我在您的代码 sn-p 中没有看到 publisher.publish() 函数,这就是您将消息发布到 PubSub 的方式。这是一个完整的示例,展示了如何发布到 pubsub:

"""Publishes multiple messages to a Pub/Sub topic with an error handler."""
from concurrent import futures
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []

def get_callback(publish_future, data):
    def callback(publish_future):
        try:
            # Wait 60 seconds for the publish call to succeed.
            print(publish_future.result(timeout=60))
        except futures.TimeoutError:
            print(f"Publishing data timed out.")

    return callback

for i in range(10):
    data = str(i)
    # When you publish a message, the client returns a future.
    publish_future = publisher.publish(topic_path, data.encode("utf-8"))
    # Non-blocking. Publish failures are handled in the callback function.
    publish_future.add_done_callback(get_callback(publish_future, data))
    publish_futures.append(publish_future)

# Wait for all the publish futures to resolve before exiting.
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

print(f"Published messages with error handler to topic_path.")

参考:https://cloud.google.com/pubsub/docs/publisher

【讨论】:

以上是关于PubSub 主题不从云功能中提取数据的主要内容,如果未能解决你的问题,请参考以下文章

在 MessageReciever 之外确认 pubSub 消息

GCP Pubsub 主题持续时间中存在的消息数

PubSub 最大传递尝试次数和死信主题

如何使用 Node.js 控制 Cloud PubSub 中的确认

通过 mosquitto 代理发布到谷歌云中的不同 pubsub 主题?

来自 PubSub 主题的 Google Cloud 函数触发器是订阅吗