如何使用 celery worker 从 SQS 轮询消息,消息为 JSON 格式,并且 worker 无法解码该格式

Posted

技术标签:

【中文标题】如何使用 celery worker 从 SQS 轮询消息,消息为 JSON 格式,并且 worker 无法解码该格式【英文标题】:how to poll message from SQS using celery worker, the message is in JSON format and worker not able to decode the format 【发布时间】:2019-12-14 16:05:51 【问题描述】:

如何使用 celery worker 从 SQS 轮询消息,消息是 JSON 格式,worker 无法解码格式

注意:这些消息不是使用celery beat发送到SQS的,这个队列是从SNS订阅的

我的 celery worker 命令是: celery worker -A status_handling -l info -Q es_status_test

Msg in Queue:


  "Type" : "Notification",
  "MessageId" : "f7e40fd9-8f92-59c5-afd9-5a1847aaae57",
  "TopicArn" : "***",
  "Message" : "\"SESResponseStatusCode\": 200, \"Status\": \"Delivered\", \"Message\": \"Email sent successfully.\", \"MessageId\": \"a59e85a2-8b7a-4b49-9354-0a7a4170b0c0\", \"Uuid\": null",
  "Timestamp" : "2019-08-05T06:00:24.943Z",
  "SignatureVersion" : "1",
  "Signature" : "pass",
  "SigningCertURL" : "pass",
  "UnsubscribeURL" : "pass"

错误来了:

[2019-08-04 23:00:25,116: CRITICAL/MainProcess] Unrecoverable error: JSONDecodeError('Expecting value: line 1 column 1 (char 0)')
Traceback (most recent call last):
  File "/home/vagrant/env/lib/python3.7/site-packages/celery/worker/worker.py", line 205, in start
    self.blueprint.start(self)
  File "/home/vagrant/env/lib/python3.7/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/home/vagrant/env/lib/python3.7/site-packages/celery/bootsteps.py", line 369, in start
    return self.obj.start()
  File "/home/vagrant/env/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 318, in start
    blueprint.start(self)
  File "/home/vagrant/env/lib/python3.7/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/home/vagrant/env/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 596, in start
    c.loop(*c.loop_args())
  File "/home/vagrant/env/lib/python3.7/site-packages/celery/worker/loops.py", line 91, in asynloop
    next(loop)
  File "/home/vagrant/env/lib/python3.7/site-packages/kombu/asynchronous/hub.py", line 362, in create_loop
    cb(*cbargs)
  File "/home/vagrant/env/lib/python3.7/site-packages/kombu/asynchronous/http/curl.py", line 111, in on_readable
    return self._on_event(fd, _pycurl.CSELECT_IN)
  File "/home/vagrant/env/lib/python3.7/site-packages/kombu/asynchronous/http/curl.py", line 124, in _on_event
    self._process_pending_requests()
  File "/home/vagrant/env/lib/python3.7/site-packages/kombu/asynchronous/http/curl.py", line 130, in _process_pending_requests
    self._process(curl)
  File "/home/vagrant/env/lib/python3.7/site-packages/kombu/asynchronous/http/curl.py", line 178, in _process
    buffer=buffer, effective_url=effective_url, error=error,
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 177, in __call__
    svpending(*ca, **ck)
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 170, in __call__
    return self.throw()
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 167, in __call__
    retval = fun(*final_args, **final_kwargs)
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/funtools.py", line 100, in _transback
    return callback(ret)
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 170, in __call__
    return self.throw()
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 167, in __call__
    retval = fun(*final_args, **final_kwargs)
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/funtools.py", line 100, in _transback
    return callback(ret)
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 170, in __call__
    return self.throw()
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/promises.py", line 167, in __call__
    retval = fun(*final_args, **final_kwargs)
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/funtools.py", line 98, in _transback
    callback.throw()
  File "/home/vagrant/env/lib/python3.7/site-packages/vine/funtools.py", line 96, in _transback
    ret = filter_(*args + (ret,), **kwargs)
  File "/home/vagrant/env/lib/python3.7/site-packages/kombu/transport/SQS.py", line 370, in _on_messages_ready
    msg_parsed = self._message_to_python(msg, qname, queue)
  File "/home/vagrant/env/lib/python3.7/site-packages/kombu/transport/SQS.py", line 236, in _message_to_python
    payload = loads(bytes_to_str(body))
  File "/home/vagrant/env/lib/python3.7/site-packages/kombu/utils/json.py", line 94, in loads
    return stdjson.loads(s)
  File "/usr/local/lib/python3.7/json/__init__.py", line 348, in loads
    return _default_decoder.decode(s)
  File "/usr/local/lib/python3.7/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/local/lib/python3.7/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

【问题讨论】:

【参考方案1】:

Celery 中的 SQS 支持仅用作 Celery 特定消息的传输机制。您不能使用 Celery 来消费任意 SQS 消息。

相反,我建议编写一个自定义 Django 管理命令,您可以在其中使用 boto3 库轮询 SQS 队列。

【讨论】:

以上是关于如何使用 celery worker 从 SQS 轮询消息,消息为 JSON 格式,并且 worker 无法解码该格式的主要内容,如果未能解决你的问题,请参考以下文章

弹性 beantalk 中的 celery worker 出错(使用 django 和 SQS)[ImportError:curl 客户端需要 pycurl 库。]

在 Elastic Beanstalk 上启动 SQS celery worker

如何在 SQS 中解码 celery 消息

你如何让亚马逊 SQS 与 Django celery 一起工作

AWS SWF和SQS关系是否与Celery和RabbitMQ类似?

如何在beantalk worker中批量读取sqs消息