使用 Python 将 Pub/Sub 消息加载到 BigQuery

Posted

技术标签:

【中文标题】使用 Python 将 Pub/Sub 消息加载到 BigQuery【英文标题】:Loading Pub/Sub Message to BigQuery with Python 【发布时间】:2021-10-07 17:51:00 【问题描述】:

我正在尝试使用 python 提取 Pub/Sub 消息并将消息加载到 BigQuery。我可以提取消息,但无法将其加载到 BigQuery。这是一个编写的代码示例。你知道如何使用 python 将此消息加载到 BigQuery。

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
import os
import time
import json
import pandas as pd
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="C:/Users/Endo/Desktop/pub-sub-test/eminent-parsec-317508-98e5b51ebde7.json"

# TODO(developer)
project_id = "eminent-parsec-317508"
subscription_id = "my-python-topic-sub"
# Number of seconds the subscriber should listen for messages
timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/project_id/subscriptions/subscription_id`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message):
    print(f"Received message.")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on subscription_path..\n")
print(subscription_path)

dataset_id="message"
table_id="pubsub_message"

def write_messages_to_bq(dataset_id, table_id, subscription_path):
    client = bigquery.Client()
    dataset_ref = client.dataset(dataset_id)
    table_ref = dataset_ref.table(table_id)
    table = client.get_table(table_ref)

    errors = client.insert_rows(table, subscription_path)
    if not errors:
        print('Loaded  row(s) into :'.format(len(subscription_path), dataset_id, table_id))
    else:
        print('Errors:')
        for error in errors:
            print(error)

【问题讨论】:

当您说您无法将它们加载到 BigQuery 中时,您是什么意思?你有错误吗?如果是这样,您能否将该错误添加到您的帖子中? 我没有收到任何错误。但是我无法将消息加载到 BigQuery。如果我将订阅路径更改为消息。这次我得到“名称消息未定义”错误。 【参考方案1】:

您的代码不起作用。您无法提供对 BigQuery API 的 PubSub 订阅来加载数据。你需要做不同的事情。

您需要在 BigQuery 中逐条写入消息(因为您使用流式 API,所以这不是问题)。为此,在您的回调方法中,即在收到消息时处理消息的方法,您将消息写入 BigQuery,如果写入正常,则确认您的消息。

如果我重构您的代码(未经测试,只是为了向您展示要执行的更改)

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
import os
import time
import json
import pandas as pd
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="C:/Users/Endo/Desktop/pub-sub-test/eminent-parsec-317508-98e5b51ebde7.json"

# TODO(developer)
project_id = "eminent-parsec-317508"
subscription_id = "my-python-topic-sub"
# Number of seconds the subscriber should listen for messages
timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/project_id/subscriptions/subscription_id`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on subscription_path..\n")
print(subscription_path)

dataset_id="message"
table_id="pubsub_message"

client = bigquery.Client()
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
table = client.get_table(table_ref)

def callback(message):
    print(f"Received message.")
    errors = client.insert_rows(table, message)
    if not errors:
        print('Loaded  row(s) into :'.format(len(subscription_path), dataset_id, table_id))
    message.ack()
    else:
        print('Errors:')
        for error in errors:
            print(error)

【讨论】:

以上是关于使用 Python 将 Pub/Sub 消息加载到 BigQuery的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Python 中创建从 Pub/Sub 到 GCS 的数据流管道

将 Pub/Sub 连接到 Dataflow Python 管道

如何使用依赖项通过 JAVA 客户端将消息发布到 Cloud Pub/Sub?

无法使用 Apache Beam(Python SDK)读取 Pub/Sub 消息

使用来自 Airflow 的 Python 在 Pub/Sub 中发布消息时遇到问题

Google Pub/Sub - 将消息发布到主题后,未从本地函数中找到事件数据