如何使用 python 快速将消息发送到 Azure 队列存储?

Posted

技术标签:

【中文标题】如何使用 python 快速将消息发送到 Azure 队列存储?【英文标题】:How to quickly send messages to azure queue storage using python? 【发布时间】:2021-02-22 07:27:57 【问题描述】:

我正在尝试使用 python azure.storage.queue 库向 azure 发送大量消息(数千万),但是这样做需要很长时间。我使用的代码如下:

from azure.storage.queue import (
    QueueClient,
    BinaryBase64EncodePolicy,
    BinaryBase64DecodePolicy
)

messages = [example list of messages]
connectionString = "example connection string"
queueName = "example-queue-name"

queueClient = QueueClient.from_connection_string(connectionString, queueName)
for message in messages:
    queueClient.send_message(message)

目前,发送大约 70,000 条消息需要大约 3 个小时,考虑到需要发送的潜在消息数量,这显然太慢了。

我查看了文档以尝试找到批处理选项,但似乎不存在:https://docs.microsoft.com/en-us/python/api/azure-storage-queue/azure.storage.queue.queueclient?view=azure-python

我还想知道是否有人有任何使用 asynchio 库来加快此过程的经验并可以建议如何使用它?

【问题讨论】:

怎么样?我的帖子有用吗? 【参考方案1】:

试试这个:

from azure.storage.queue import (
    QueueClient,
    BinaryBase64EncodePolicy,
    BinaryBase64DecodePolicy
)
from concurrent.futures import ProcessPoolExecutor
import time

messages = []

messagesP1 = messages[:len(messages)//2] 
messagesP2 = messages[len(messages)//2:] 

print(len(messagesP1))
print(len(messagesP2))

connectionString = "<conn str>"
queueName = "<queue name>"

queueClient = QueueClient.from_connection_string(connectionString, queueName)

def pushThread(messages):
   for message in messages:
       queueClient.send_message(message)



def callback_function(future):
    print('Callback with the following result', future.result())

tic = time.perf_counter()

def main():
    with ProcessPoolExecutor(max_workers=2) as executor:
        future = executor.submit(pushThread, messagesP1)
        future.add_done_callback(callback_function)
        future2 = executor.submit(pushThread, messagesP2)
        while True:
            if(future.running()):
                print("Task 1 running")
            if(future2.running()):
                print("Task 2 running")

            if(future.done() and future2.done()):
                print(future.result(), future2.result())
                break

if __name__ == '__main__':
    main()


toc = time.perf_counter()
    
print(f"spent toc - tic:0.4f seconds")

如您所见,我将消息数组拆分为 2 个部分,并使用 2 个任务同时将数据推送到队列中。根据我的测试,我有大约 800 条消息,我花了 94 秒来推送所有消息:

但是使用上面的方式,我花了48s:

【讨论】:

嘿,Stanley,非常感谢,它看起来和我想要的完全一样。我一直很忙,但计划在今天或明天尝试实施,届时会向您提供反馈! @petgeo,你好,现在怎么样了?我的帖子有用吗? 嘿,斯坦利,我尝试实施此解决方案,但运行时失败,说未来没有结果。然后我尝试在没有回调功能的情况下运行它,它运行没有错误,但不幸的是实际上没有生成任何消息。我必须重置为旧版本,但如果有帮助,我可以重新运行并为您获取更多信息 @wwnde,肯定愿意尝试和帮助:) @wwnde,我的朋友 Hury 正在研究这个。他很擅长,不用担心。

以上是关于如何使用 python 快速将消息发送到 Azure 队列存储?的主要内容,如果未能解决你的问题,请参考以下文章

使用电报机器人 api、python 3 和 JobQueue 将消息发送到通道的方法

如何在 Linux 上快速将 LARGE 数据从 c++ 发送到 python?

你如何将消息从 Flask 服务器(Python)发送到 HTML 客户端?

如何从android发送消息到我的笔记本电脑python程序?

如何使用 Python 将 pdf 文件发送到多个电子邮件?

python: 如何使用 TO、CC 和 BCC 发送邮件?