使用 Redis、Celery 设置 Django 以通过 Gmail 发送电子邮件

Posted

技术标签:

【中文标题】使用 Redis、Celery 设置 Django 以通过 Gmail 发送电子邮件【英文标题】:Setting up Django with Redis, Celery to send emails via Gmail 【发布时间】:2017-05-16 21:55:44 【问题描述】:

我想设置我的 Django 应用程序以使用异步任务队列通过 Gmail 设置电子邮件。我正在使用 Celery,Redis 作为我的经纪人。但是,当我将 Celery 定义为我的电子邮件后端时,我无法发送电子邮件——我收到一条错误消息,指出连接失败:

   ...: 
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/utils/functional.py in __call__(self)
     35         try:
---> 36             return self.__value__
     37         except AttributeError:

AttributeError: 'ChannelPromise' object has no attribute '__value__'

During handling of the above exception, another exception occurred:

ConnectionRefusedError                    Traceback (most recent call last)
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _ensured(*args, **kwargs)
    493                     try:
--> 494                         return fun(*args, **kwargs)
    495                     except conn_errors as exc:

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/messaging.py in _publish(self, body, priority, content_type, content_encoding, headers, properties, routing_key, mandatory, immediate, exchange, declare)
    186                  immediate, exchange, declare):
--> 187         channel = self.channel
    188         message = channel.prepare_message(

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/messaging.py in _get_channel(self)
    208         if isinstance(channel, ChannelPromise):
--> 209             channel = self._channel = channel()
    210             self.exchange.revive(channel)

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/utils/functional.py in __call__(self)
     37         except AttributeError:
---> 38             value = self.__value__ = self.__contract__()
     39             return value

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/messaging.py in <lambda>()
    223             self.__connection__ = connection
--> 224             channel = ChannelPromise(lambda: connection.default_channel)
    225         if isinstance(channel, ChannelPromise):

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in default_channel(self)
    818         # make sure we're still connected, and if not refresh.
--> 819         self.connection
    820         if self._default_channel is None:

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in connection(self)
    801                 self._default_channel = None
--> 802                 self._connection = self._establish_connection()
    803                 self._closed = False

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _establish_connection(self)
    756         self._debug('establishing connection...')
--> 757         conn = self.transport.establish_connection()
    758         self._debug('connection established: %r', self)

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/transport/pyamqp.py in establish_connection(self)
    129         conn.client = self.client
--> 130         conn.connect()
    131         return conn

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/amqp/connection.py in connect(self, callback)
    293         )
--> 294         self.transport.connect()
    295         self.on_inbound_frame = self.frame_handler_cls(

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/amqp/transport.py in connect(self)
    119     def connect(self):
--> 120         self._connect(self.host, self.port, self.connect_timeout)
    121         self._init_socket(

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/amqp/transport.py in _connect(self, host, port, timeout)
    160                 self.sock.settimeout(timeout)
--> 161                 self.sock.connect(sa)
    162             except socket.error:

ConnectionRefusedError: [Errno 61] Connection refused

During handling of the above exception, another exception occurred:

ConnectionRefusedError                    Traceback (most recent call last)
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _reraise_as_library_errors(self, ConnectionError, ChannelError)
    413         try:
--> 414             yield
    415         except (ConnectionError, ChannelError):

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _ensured(*args, **kwargs)
    514                             interval_start, interval_step, interval_max,
--> 515                             reraise_as_library_errors=False,
    516                         )

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in ensure_connection(self, errback, max_retries, interval_start, interval_step, interval_max, callback, reraise_as_library_errors)
    404                             interval_start, interval_step, interval_max,
--> 405                             callback)
    406         return self

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/utils/functional.py in retry_over_time(fun, catch, args, kwargs, errback, max_retries, interval_start, interval_step, interval_max, callback)
    332         try:
--> 333             return fun(*args, **kwargs)
    334         except catch as exc:

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in connect(self)
    260         self._closed = False
--> 261         return self.connection
    262 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in connection(self)
    801                 self._default_channel = None
--> 802                 self._connection = self._establish_connection()
    803                 self._closed = False

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _establish_connection(self)
    756         self._debug('establishing connection...')
--> 757         conn = self.transport.establish_connection()
    758         self._debug('connection established: %r', self)

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/transport/pyamqp.py in establish_connection(self)
    129         conn.client = self.client
--> 130         conn.connect()
    131         return conn

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/amqp/connection.py in connect(self, callback)
    293         )
--> 294         self.transport.connect()
    295         self.on_inbound_frame = self.frame_handler_cls(

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/amqp/transport.py in connect(self)
    119     def connect(self):
--> 120         self._connect(self.host, self.port, self.connect_timeout)
    121         self._init_socket(

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/amqp/transport.py in _connect(self, host, port, timeout)
    160                 self.sock.settimeout(timeout)
--> 161                 self.sock.connect(sa)
    162             except socket.error:

ConnectionRefusedError: [Errno 61] Connection refused

During handling of the above exception, another exception occurred:

OperationalError                          Traceback (most recent call last)
<ipython-input-1-75e7f965af5d> in <module>()
      1 from django.core.mail import send_mail
----> 2 send_mail('test email', 'hello world', 'myemail@gmail.com', recipient_list=['test@test.com'], fail_silently=False)

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/django/core/mail/__init__.py in send_mail(subject, message, from_email, recipient_list, fail_silently, auth_user, auth_password, connection, html_message)
     59         mail.attach_alternative(html_message, 'text/html')
     60 
---> 61     return mail.send()
     62 
     63 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/django/core/mail/message.py in send(self, fail_silently)
    290             # send to.
    291             return 0
--> 292         return self.get_connection(fail_silently).send_messages([self])
    293 
    294     def attach(self, filename=None, content=None, mimetype=None):

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/djcelery_email/backends.py in send_messages(self, email_messages)
     15         messages = [email_to_dict(msg) for msg in email_messages]
     16         for chunk in chunked(messages, settings.CELERY_EMAIL_CHUNK_SIZE):
---> 17             result_tasks.append(send_emails.delay(chunk, self.init_kwargs))
     18         return result_tasks

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/celery/app/task.py in delay(self, *args, **kwargs)
    410             celery.result.AsyncResult: Future promise.
    411         """
--> 412         return self.apply_async(args, kwargs)
    413 
    414     def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/celery/app/task.py in apply_async(self, args, kwargs, task_id, producer, link, link_error, shadow, **options)
    533             link=link, link_error=link_error, result_cls=self.AsyncResult,
    534             shadow=shadow, task_type=self,
--> 535             **options
    536         )
    537 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/celery/app/base.py in send_task(self, name, args, kwargs, countdown, eta, task_id, producer, connection, router, result_cls, expires, publisher, link, link_error, add_to_parent, group_id, retries, chord, reply_to, time_limit, soft_time_limit, root_id, parent_id, route_name, shadow, chain, task_type, **options)
    735             with P.connection._reraise_as_library_errors():
    736                 self.backend.on_task_call(P, task_id)
--> 737                 amqp.send_task_message(P, name, message, **options)
    738         result = (result_cls or self.AsyncResult)(task_id)
    739         if add_to_parent:

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/celery/app/amqp.py in send_task_message(producer, name, message, exchange, routing_key, queue, event_dispatcher, retry, retry_policy, serializer, delivery_mode, compression, declare, headers, exchange_type, **kwargs)
    556                 delivery_mode=delivery_mode, declare=declare,
    557                 headers=headers2,
--> 558                 **properties
    559             )
    560             if after_receivers:

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/messaging.py in publish(self, body, routing_key, delivery_mode, mandatory, immediate, priority, content_type, content_encoding, serializer, headers, compression, exchange, retry, retry_policy, declare, expiration, **properties)
    179             body, priority, content_type, content_encoding,
    180             headers, properties, routing_key, mandatory, immediate,
--> 181             exchange_name, declare,
    182         )
    183 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _ensured(*args, **kwargs)
    525                         self._debug('ensure channel error: %r',
    526                                     exc, exc_info=1)
--> 527                         errback and errback(exc, 0)
    528         _ensured.__name__ = bytes_if_py2('0(ensured)'.format(fun.__name__))
    529         _ensured.__doc__ = fun.__doc__

/usr/local/Cellar/python3/3.4.3_2/Frameworks/Python.framework/Versions/3.4/lib/python3.4/contextlib.py in __exit__(self, type, value, traceback)
     75                 value = type()
     76             try:
---> 77                 self.gen.throw(type, value, traceback)
     78                 raise RuntimeError("generator didn't stop after throw()")
     79             except StopIteration as exc:

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _reraise_as_library_errors(self, ConnectionError, ChannelError)
    417         except self.recoverable_connection_errors as exc:
    418             reraise(ConnectionError, ConnectionError(text_t(exc)),
--> 419                     sys.exc_info()[2])
    420         except self.recoverable_channel_errors as exc:
    421             reraise(ChannelError, ChannelError(text_t(exc)),

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/vine/five.py in reraise(tp, value, tb)
    173         """Reraise exception."""
    174         if value.__traceback__ is not tb:
--> 175             raise value.with_traceback(tb)
    176         raise value
    177 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _reraise_as_library_errors(self, ConnectionError, ChannelError)
    412             ChannelError=exceptions.OperationalError):
    413         try:
--> 414             yield
    415         except (ConnectionError, ChannelError):
    416             raise

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _ensured(*args, **kwargs)
    513                             remaining_retries,
    514                             interval_start, interval_step, interval_max,
--> 515                             reraise_as_library_errors=False,
    516                         )
    517                         channel = self.default_channel

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in ensure_connection(self, errback, max_retries, interval_start, interval_step, interval_max, callback, reraise_as_library_errors)
    403                             (), , on_error, max_retries,
    404                             interval_start, interval_step, interval_max,
--> 405                             callback)
    406         return self
    407 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/utils/functional.py in retry_over_time(fun, catch, args, kwargs, errback, max_retries, interval_start, interval_step, interval_max, callback)
    331     for retries in count():
    332         try:
--> 333             return fun(*args, **kwargs)
    334         except catch as exc:
    335             if max_retries and retries >= max_retries:

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in connect(self)
    259         """Establish connection to server immediately."""
    260         self._closed = False
--> 261         return self.connection
    262 
    263     def channel(self):

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in connection(self)
    800                 self.declared_entities.clear()
    801                 self._default_channel = None
--> 802                 self._connection = self._establish_connection()
    803                 self._closed = False
    804             return self._connection

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _establish_connection(self)
    755     def _establish_connection(self):
    756         self._debug('establishing connection...')
--> 757         conn = self.transport.establish_connection()
    758         self._debug('connection established: %r', self)
    759         return conn

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/transport/pyamqp.py in establish_connection(self)
    128         conn = self.Connection(**opts)
    129         conn.client = self.client
--> 130         conn.connect()
    131         return conn
    132 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/amqp/connection.py in connect(self, callback)
    292             socket_settings=self.socket_settings,
    293         )
--> 294         self.transport.connect()
    295         self.on_inbound_frame = self.frame_handler_cls(
    296             self, self.on_inbound_method)

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/amqp/transport.py in connect(self)
    118 
    119     def connect(self):
--> 120         self._connect(self.host, self.port, self.connect_timeout)
    121         self._init_socket(
    122             self.socket_settings, self.read_timeout, self.write_timeout,

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/amqp/transport.py in _connect(self, host, port, timeout)
    159                     pass
    160                 self.sock.settimeout(timeout)
--> 161                 self.sock.connect(sa)
    162             except socket.error:
    163                 self.sock.close()

OperationalError: [Errno 61] Connection refused

尝试调试会产生以下结果:

我能够成功启动 redis 服务器并 ping 它。但是,当我尝试使用 celery worker -A myapp -l info -c 5 启动 celery worker 时,出现错误。我猜这是由于身份验证?看起来它只是试图以匿名用户身份登录,但我不知道如何传递用户名和密码。

celery worker -A myapp -l info -c 5

 -------------- celery@Users-iMac v4.0.2 (latentcall)
---- **** ----- 
--- * ***  * -- Darwin-15.6.0-x86_64-i386-64bit 2017-01-02 19:58:38
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         myapp:0x10dc15400
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 5 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . djcelery_email_send
  . myapp.celery.debug_task

[2017-01-02 19:58:39,187: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 61] Connection refused.
Trying again in 2.00 seconds...

[2017-01-02 19:58:41,199: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 61] Connection refused.
Trying again in 4.00 seconds...

[2017-01-02 19:58:45,219: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 61] Connection refused.
Trying again in 6.00 seconds...

[2017-01-02 19:58:51,245: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 61] Connection refused.
Trying again in 8.00 seconds...

我正在使用djcelery_email,在myapp/myapp/__init__.py 有一个文件如下:

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

以及与settings.py处于同一级别的celery.py文件

import os
from celery import Celery
from celery.bin import Option

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')

app = Celery('myapp')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')


# Load task modules from all registered Django app configs.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: 0!r'.format(self.request))

并具有以下 `settings.py:

# Email

CACHES = 
    "default": 
        "BACKEND": "django_redis.cache.RedisCache",
        "LOCATION": "redis://127.0.0.1:6379/1",
        "OPTIONS": 
            "CLIENT_CLASS": "django_redis.client.DefaultClient",
        
    


EMAIL_HOST = 'smtp.gmail.com'  # since you are using a gmail account
EMAIL_PORT = 587  # Gmail SMTP port for TLS
EMAIL_USE_TLS = True
EMAIL_HOST_USER = 'myemail@gmail.com'
EMAIL_HOST_PASSWORD = 'secret_password'
EMAIL_BACKEND = 'djcelery_email.backends.CeleryEmailBackend' # Use Celery for sending emails
CELERY_EMAIL_TASK_CONFIG = 
    'name': 'djcelery_email_send',
    'ignore_result': True,


BROKER_URL = 'redis://127.0.0.1:6379/1'

我需要进行哪些更改才能使用 Redis、Celery 和 Django 发送电子邮件?

【问题讨论】:

【参考方案1】:

此错误与电子邮件无关。 Celery 正在尝试通过 amqp 协议连接到消息代理,这意味着 RabbitMQ;但是您说您已将 Redis 设置为代理。所以 Celery 没有采用这种设置。

原因是你需要在你的 settings.py 中为 Celery 特定的设置加上 CELERY_ 前缀;所以 BROKER_URL 应该是CELERY_BROKER_URL

【讨论】:

正确。这是由于:app.config_from_object('django.conf:settings', namespace='CELERY')

以上是关于使用 Redis、Celery 设置 Django 以通过 Gmail 发送电子邮件的主要内容,如果未能解决你的问题,请参考以下文章

django redis celery 和 celery beats 的正确设置

如何在 Django-Celery 失败的情况下设置重试任务

Django设置异步任务

Django+Celery+Redis 使用

Django使用Celery加redis执行异步任务

Celery 队列和 Redis 队列