将 Celery 与 SQS 一起使用时 Errno 111 连接被拒绝

Posted

技术标签:

【中文标题】将 Celery 与 SQS 一起使用时 Errno 111 连接被拒绝【英文标题】:Errno 111 Connection refused when using Celery with SQS 【发布时间】:2014-08-25 05:52:18 【问题描述】:

我在运行celery statuscelery purge 命令时遇到问题。

 File "/usr/bin/celery", line 11, in <module>
    sys.exit(main())
  File "/usr/lib/python2.7/site-packages/celery/__main__.py", line 30, in main
    main()
  File "/usr/lib/python2.7/site-packages/celery/bin/celery.py", line 81, in main
    cmd.execute_from_commandline(argv)
  File "/usr/lib/python2.7/site-packages/celery/bin/celery.py", line 769, in execute_from_commandline
    super(CeleryCommand, self).execute_from_commandline(argv)))
  File "/usr/lib/python2.7/site-packages/celery/bin/base.py", line 306, in execute_from_commandline
    return self.handle_argv(self.prog_name, argv[1:])
  File "/usr/lib/python2.7/site-packages/celery/bin/celery.py", line 761, in handle_argv
    return self.execute(command, argv)
  File "/usr/lib/python2.7/site-packages/celery/bin/celery.py", line 693, in execute
    ).run_from_argv(self.prog_name, argv[1:], command=argv[0])
  File "/usr/lib/python2.7/site-packages/celery/bin/base.py", line 310, in run_from_argv
    sys.argv if argv is None else argv, command)
  File "/usr/lib/python2.7/site-packages/celery/bin/base.py", line 372, in handle_argv
    return self(*args, **options)
  File "/usr/lib/python2.7/site-packages/celery/bin/base.py", line 269, in __call__
    ret = self.run(*args, **kwargs)
  File "/usr/lib/python2.7/site-packages/celery/bin/celery.py", line 472, in run
    replies = I.run('ping', **kwargs)
  File "/usr/lib/python2.7/site-packages/celery/bin/celery.py", line 324, in run
    return self.do_call_method(args, **kwargs)
  File "/usr/lib/python2.7/site-packages/celery/bin/celery.py", line 346, in do_call_method
    callback=self.say_remote_command_reply)
  File "/usr/lib/python2.7/site-packages/celery/bin/celery.py", line 385, in call
    return getattr(i, method)(*args)
  File "/usr/lib/python2.7/site-packages/celery/app/control.py", line 99, in ping
    return self._request('ping')
  File "/usr/lib/python2.7/site-packages/celery/app/control.py", line 70, in _request
    timeout=self.timeout, reply=True,
  File "/usr/lib/python2.7/site-packages/celery/app/control.py", line 306, in broadcast
    limit, callback, channel=channel,
  File "/usr/lib/python2.7/site-packages/kombu/pidbox.py", line 283, in _broadcast
    chan = channel or self.connection.default_channel
  File "/usr/lib/python2.7/site-packages/kombu/connection.py", line 755, in default_channel
    self.connection
  File "/usr/lib/python2.7/site-packages/kombu/connection.py", line 740, in connection
    self._connection = self._establish_connection()
  File "/usr/lib/python2.7/site-packages/kombu/connection.py", line 695, in _establish_connection
    conn = self.transport.establish_connection()
  File "/usr/lib/python2.7/site-packages/kombu/transport/pyamqp.py", line 112, in establish_connection
    conn = self.Connection(**opts)
  File "/usr/lib/python2.7/site-packages/amqp/connection.py", line 165, in __init__
    self.transport = create_transport(host, connect_timeout, ssl)
  File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 294, in create_transport
    return TCPTransport(host, connect_timeout)
  File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 95, in __init__
    raise socket.error(last_err)
socket.error: [Errno 111] Connection refused

我正在使用 SQS BROKER_URL。 任务运行良好,但是当我想清除队列中的任务 (celery purge -f) 时,出现上述错误。

software -> celery:3.1.11 (Cipater) kombu:3.0.18 py:2.7.5
            billiard:3.3.0.17 py-amqp:1.4.5
platform -> system:Linux arch:64bit, ELF imp:CPython
loader   -> celery.loaders.default.Loader
settings -> transport:amqp results:disabled

我的服务器开放了 22、80、443、8000 端口,并且 SQS celery 队列中有大量消息,所以 celery 和 SQS 之间的连接应该没问题。

【问题讨论】:

【参考方案1】:

基于documentation of the commands status and purge,您需要向 celery 提供您所指的 celery 应用程序,以便它知道使用哪个代理。只需输入 $celery purge 或输入 $celery status celery 不知道您的目标是什么 celery 应用程序,因此会失败。

因此,转到您的 celery 应用程序

$cd /path/to/your/celery/app/directory

然后在您的应用程序上调用 celery purge。在这个例子中,我的目录有celeryapp.py,内容是:

from config import config
from celery import Celery
celery_app = Celery('tasks', 
                    backend=config.celery_backend_uri, 
                    broker=config.celery_broker_uri)
celery_app.conf.update(
    CELERY_IMPORTS=(
        'app.module_a.tasks',   # we're not including our tasks here as
        'app.module_b.tasks',   # our tasks are in other files listed here
    )
)

内容不如我们的电话重要,但提供它们是为了表明我们在celeryapp.py 中有我们的芹菜应用程序,所以我可以打电话

$celery -A celeryapp status
worker-name-a@node-name: OK
worker-name-b@node-name: OK

$celery -A celeryapp purge
WARNING: This will remove all tasks from queue: celery.
         There is no undo for this operation! 
(to skip this prompt use the -f option)
Are you sure you want to delete all tasks (yes/NO)? yes
No messages purged from 1 queue

我有一个类似的question here 并且Sol 似乎通过声明确认如果没有提供应用程序,celery 将输出此错误

如果你不给它应用程序的位置,它怎么知道使用什么代理传输?

【讨论】:

以上是关于将 Celery 与 SQS 一起使用时 Errno 111 连接被拒绝的主要内容,如果未能解决你的问题,请参考以下文章

你如何让亚马逊 SQS 与 Django celery 一起工作

Celery sqs 尝试连接到 amqp:无法连接到 amqp://guest:**@127.0.0.1:5672//

AWS SWF和SQS关系是否与Celery和RabbitMQ类似?

如何在 SQS 中解码 celery 消息

弹性 beantalk 中的 celery worker 出错(使用 django 和 SQS)[ImportError:curl 客户端需要 pycurl 库。]

Django 中带有 Celery 的 AWS SQS