如何在 SQS 中解码 celery 消息

Posted

技术标签:

【中文标题】如何在 SQS 中解码 celery 消息【英文标题】:How to decode celery message in SQS 【发布时间】:2019-01-02 02:00:37 【问题描述】:

sqs 中的一些 celery 任务永远挂起,我想在删除之前阅读这些消息(任务)。 在进入 sqs 控制台时,我可以看到我尝试解码的编码消息

value = base64.b64decode(value.encode('utf-8')).decode('utf-8')

这给了我带有键的字典转储

['body', 'headers', 'content-type', 'properties', 'content-encoding']

在这个 dict 正文中看起来像已编码 我试图用相同的方式对其进行解码

value = base64.b64decode(value.encode('utf-8')).decode('utf-8')

但它给出了错误的说法 UnicodeDecodeError:“utf8”编解码器无法解码位置 1 中的字节 0x87:无效的起始字节

我错过了什么吗? 如何解码此消息?有什么办法可以解码吗?

【问题讨论】:

【参考方案1】:

似乎“Celery”使用“pickle.dump”将任务的有效负载转换为字节,然后编码为base64。做相反的操作,我们再次得到有效载荷。

import base64
import boto3
import pickle

queue_name = 'your-queue-name'
sqsr = boto3.resource('sqs')
queue = sqsr.get_queue_by_name(QueueName=queue_name)

for message in queue.receive_messages(MaxNumberOfMessages=10):
    print(f'message.message_id >>> message.receipt_handle'
          f' >>> message.body >>> message.message_attributes')
    body_dict = json.loads(base64.b64decode(message.body))
    celery_payload = pickle.loads(base64.b64decode(body_dict.get('body')))
    print(celery_payload)

【讨论】:

以上是关于如何在 SQS 中解码 celery 消息的主要内容,如果未能解决你的问题,请参考以下文章

如何在不重复的情况下重试芹菜任务 - SQS

Amazon-SQS + Django-Celery 创建了数千个队列(每条消息一个队列)

你如何让亚马逊 SQS 与 Django celery 一起工作

Django 中带有 Celery 的 AWS SQS

如何在beantalk worker中批量读取sqs消息

如何在 Django 和 Celery 中配置多个代理?