使用 Redis、Celery 设置 Django 以通过 Gmail 发送电子邮件
Posted
技术标签:
【中文标题】使用 Redis、Celery 设置 Django 以通过 Gmail 发送电子邮件【英文标题】:Setting up Django with Redis, Celery to send emails via Gmail 【发布时间】:2017-05-16 21:55:44 【问题描述】:我想设置我的 Django 应用程序以使用异步任务队列通过 Gmail 设置电子邮件。我正在使用 Celery,Redis 作为我的经纪人。但是,当我将 Celery 定义为我的电子邮件后端时,我无法发送电子邮件——我收到一条错误消息,指出连接失败:
...:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/utils/functional.py in __call__(self)
35 try:
---> 36 return self.__value__
37 except AttributeError:
AttributeError: 'ChannelPromise' object has no attribute '__value__'
During handling of the above exception, another exception occurred:
ConnectionRefusedError Traceback (most recent call last)
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _ensured(*args, **kwargs)
493 try:
--> 494 return fun(*args, **kwargs)
495 except conn_errors as exc:
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/messaging.py in _publish(self, body, priority, content_type, content_encoding, headers, properties, routing_key, mandatory, immediate, exchange, declare)
186 immediate, exchange, declare):
--> 187 channel = self.channel
188 message = channel.prepare_message(
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/messaging.py in _get_channel(self)
208 if isinstance(channel, ChannelPromise):
--> 209 channel = self._channel = channel()
210 self.exchange.revive(channel)
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/utils/functional.py in __call__(self)
37 except AttributeError:
---> 38 value = self.__value__ = self.__contract__()
39 return value
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/messaging.py in <lambda>()
223 self.__connection__ = connection
--> 224 channel = ChannelPromise(lambda: connection.default_channel)
225 if isinstance(channel, ChannelPromise):
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in default_channel(self)
818 # make sure we're still connected, and if not refresh.
--> 819 self.connection
820 if self._default_channel is None:
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in connection(self)
801 self._default_channel = None
--> 802 self._connection = self._establish_connection()
803 self._closed = False
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _establish_connection(self)
756 self._debug('establishing connection...')
--> 757 conn = self.transport.establish_connection()
758 self._debug('connection established: %r', self)
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/transport/pyamqp.py in establish_connection(self)
129 conn.client = self.client
--> 130 conn.connect()
131 return conn
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/amqp/connection.py in connect(self, callback)
293 )
--> 294 self.transport.connect()
295 self.on_inbound_frame = self.frame_handler_cls(
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/amqp/transport.py in connect(self)
119 def connect(self):
--> 120 self._connect(self.host, self.port, self.connect_timeout)
121 self._init_socket(
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/amqp/transport.py in _connect(self, host, port, timeout)
160 self.sock.settimeout(timeout)
--> 161 self.sock.connect(sa)
162 except socket.error:
ConnectionRefusedError: [Errno 61] Connection refused
During handling of the above exception, another exception occurred:
ConnectionRefusedError Traceback (most recent call last)
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _reraise_as_library_errors(self, ConnectionError, ChannelError)
413 try:
--> 414 yield
415 except (ConnectionError, ChannelError):
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _ensured(*args, **kwargs)
514 interval_start, interval_step, interval_max,
--> 515 reraise_as_library_errors=False,
516 )
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in ensure_connection(self, errback, max_retries, interval_start, interval_step, interval_max, callback, reraise_as_library_errors)
404 interval_start, interval_step, interval_max,
--> 405 callback)
406 return self
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/utils/functional.py in retry_over_time(fun, catch, args, kwargs, errback, max_retries, interval_start, interval_step, interval_max, callback)
332 try:
--> 333 return fun(*args, **kwargs)
334 except catch as exc:
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in connect(self)
260 self._closed = False
--> 261 return self.connection
262
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in connection(self)
801 self._default_channel = None
--> 802 self._connection = self._establish_connection()
803 self._closed = False
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _establish_connection(self)
756 self._debug('establishing connection...')
--> 757 conn = self.transport.establish_connection()
758 self._debug('connection established: %r', self)
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/transport/pyamqp.py in establish_connection(self)
129 conn.client = self.client
--> 130 conn.connect()
131 return conn
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/amqp/connection.py in connect(self, callback)
293 )
--> 294 self.transport.connect()
295 self.on_inbound_frame = self.frame_handler_cls(
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/amqp/transport.py in connect(self)
119 def connect(self):
--> 120 self._connect(self.host, self.port, self.connect_timeout)
121 self._init_socket(
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/amqp/transport.py in _connect(self, host, port, timeout)
160 self.sock.settimeout(timeout)
--> 161 self.sock.connect(sa)
162 except socket.error:
ConnectionRefusedError: [Errno 61] Connection refused
During handling of the above exception, another exception occurred:
OperationalError Traceback (most recent call last)
<ipython-input-1-75e7f965af5d> in <module>()
1 from django.core.mail import send_mail
----> 2 send_mail('test email', 'hello world', 'myemail@gmail.com', recipient_list=['test@test.com'], fail_silently=False)
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/django/core/mail/__init__.py in send_mail(subject, message, from_email, recipient_list, fail_silently, auth_user, auth_password, connection, html_message)
59 mail.attach_alternative(html_message, 'text/html')
60
---> 61 return mail.send()
62
63
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/django/core/mail/message.py in send(self, fail_silently)
290 # send to.
291 return 0
--> 292 return self.get_connection(fail_silently).send_messages([self])
293
294 def attach(self, filename=None, content=None, mimetype=None):
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/djcelery_email/backends.py in send_messages(self, email_messages)
15 messages = [email_to_dict(msg) for msg in email_messages]
16 for chunk in chunked(messages, settings.CELERY_EMAIL_CHUNK_SIZE):
---> 17 result_tasks.append(send_emails.delay(chunk, self.init_kwargs))
18 return result_tasks
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/celery/app/task.py in delay(self, *args, **kwargs)
410 celery.result.AsyncResult: Future promise.
411 """
--> 412 return self.apply_async(args, kwargs)
413
414 def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/celery/app/task.py in apply_async(self, args, kwargs, task_id, producer, link, link_error, shadow, **options)
533 link=link, link_error=link_error, result_cls=self.AsyncResult,
534 shadow=shadow, task_type=self,
--> 535 **options
536 )
537
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/celery/app/base.py in send_task(self, name, args, kwargs, countdown, eta, task_id, producer, connection, router, result_cls, expires, publisher, link, link_error, add_to_parent, group_id, retries, chord, reply_to, time_limit, soft_time_limit, root_id, parent_id, route_name, shadow, chain, task_type, **options)
735 with P.connection._reraise_as_library_errors():
736 self.backend.on_task_call(P, task_id)
--> 737 amqp.send_task_message(P, name, message, **options)
738 result = (result_cls or self.AsyncResult)(task_id)
739 if add_to_parent:
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/celery/app/amqp.py in send_task_message(producer, name, message, exchange, routing_key, queue, event_dispatcher, retry, retry_policy, serializer, delivery_mode, compression, declare, headers, exchange_type, **kwargs)
556 delivery_mode=delivery_mode, declare=declare,
557 headers=headers2,
--> 558 **properties
559 )
560 if after_receivers:
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/messaging.py in publish(self, body, routing_key, delivery_mode, mandatory, immediate, priority, content_type, content_encoding, serializer, headers, compression, exchange, retry, retry_policy, declare, expiration, **properties)
179 body, priority, content_type, content_encoding,
180 headers, properties, routing_key, mandatory, immediate,
--> 181 exchange_name, declare,
182 )
183
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _ensured(*args, **kwargs)
525 self._debug('ensure channel error: %r',
526 exc, exc_info=1)
--> 527 errback and errback(exc, 0)
528 _ensured.__name__ = bytes_if_py2('0(ensured)'.format(fun.__name__))
529 _ensured.__doc__ = fun.__doc__
/usr/local/Cellar/python3/3.4.3_2/Frameworks/Python.framework/Versions/3.4/lib/python3.4/contextlib.py in __exit__(self, type, value, traceback)
75 value = type()
76 try:
---> 77 self.gen.throw(type, value, traceback)
78 raise RuntimeError("generator didn't stop after throw()")
79 except StopIteration as exc:
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _reraise_as_library_errors(self, ConnectionError, ChannelError)
417 except self.recoverable_connection_errors as exc:
418 reraise(ConnectionError, ConnectionError(text_t(exc)),
--> 419 sys.exc_info()[2])
420 except self.recoverable_channel_errors as exc:
421 reraise(ChannelError, ChannelError(text_t(exc)),
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/vine/five.py in reraise(tp, value, tb)
173 """Reraise exception."""
174 if value.__traceback__ is not tb:
--> 175 raise value.with_traceback(tb)
176 raise value
177
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _reraise_as_library_errors(self, ConnectionError, ChannelError)
412 ChannelError=exceptions.OperationalError):
413 try:
--> 414 yield
415 except (ConnectionError, ChannelError):
416 raise
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _ensured(*args, **kwargs)
513 remaining_retries,
514 interval_start, interval_step, interval_max,
--> 515 reraise_as_library_errors=False,
516 )
517 channel = self.default_channel
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in ensure_connection(self, errback, max_retries, interval_start, interval_step, interval_max, callback, reraise_as_library_errors)
403 (), , on_error, max_retries,
404 interval_start, interval_step, interval_max,
--> 405 callback)
406 return self
407
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/utils/functional.py in retry_over_time(fun, catch, args, kwargs, errback, max_retries, interval_start, interval_step, interval_max, callback)
331 for retries in count():
332 try:
--> 333 return fun(*args, **kwargs)
334 except catch as exc:
335 if max_retries and retries >= max_retries:
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in connect(self)
259 """Establish connection to server immediately."""
260 self._closed = False
--> 261 return self.connection
262
263 def channel(self):
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in connection(self)
800 self.declared_entities.clear()
801 self._default_channel = None
--> 802 self._connection = self._establish_connection()
803 self._closed = False
804 return self._connection
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _establish_connection(self)
755 def _establish_connection(self):
756 self._debug('establishing connection...')
--> 757 conn = self.transport.establish_connection()
758 self._debug('connection established: %r', self)
759 return conn
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/transport/pyamqp.py in establish_connection(self)
128 conn = self.Connection(**opts)
129 conn.client = self.client
--> 130 conn.connect()
131 return conn
132
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/amqp/connection.py in connect(self, callback)
292 socket_settings=self.socket_settings,
293 )
--> 294 self.transport.connect()
295 self.on_inbound_frame = self.frame_handler_cls(
296 self, self.on_inbound_method)
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/amqp/transport.py in connect(self)
118
119 def connect(self):
--> 120 self._connect(self.host, self.port, self.connect_timeout)
121 self._init_socket(
122 self.socket_settings, self.read_timeout, self.write_timeout,
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/amqp/transport.py in _connect(self, host, port, timeout)
159 pass
160 self.sock.settimeout(timeout)
--> 161 self.sock.connect(sa)
162 except socket.error:
163 self.sock.close()
OperationalError: [Errno 61] Connection refused
尝试调试会产生以下结果:
我能够成功启动 redis 服务器并 ping 它。但是,当我尝试使用 celery worker -A myapp -l info -c 5
启动 celery worker 时,出现错误。我猜这是由于身份验证?看起来它只是试图以匿名用户身份登录,但我不知道如何传递用户名和密码。
celery worker -A myapp -l info -c 5
-------------- celery@Users-iMac v4.0.2 (latentcall)
---- **** -----
--- * *** * -- Darwin-15.6.0-x86_64-i386-64bit 2017-01-02 19:58:38
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: myapp:0x10dc15400
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 5 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. djcelery_email_send
. myapp.celery.debug_task
[2017-01-02 19:58:39,187: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 61] Connection refused.
Trying again in 2.00 seconds...
[2017-01-02 19:58:41,199: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 61] Connection refused.
Trying again in 4.00 seconds...
[2017-01-02 19:58:45,219: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 61] Connection refused.
Trying again in 6.00 seconds...
[2017-01-02 19:58:51,245: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 61] Connection refused.
Trying again in 8.00 seconds...
我正在使用djcelery_email
,在myapp/myapp/__init__.py
有一个文件如下:
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ['celery_app']
以及与settings.py
处于同一级别的celery.py
文件
import os
from celery import Celery
from celery.bin import Option
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')
app = Celery('myapp')
# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: 0!r'.format(self.request))
并具有以下 `settings.py:
# Email
CACHES =
"default":
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": "redis://127.0.0.1:6379/1",
"OPTIONS":
"CLIENT_CLASS": "django_redis.client.DefaultClient",
EMAIL_HOST = 'smtp.gmail.com' # since you are using a gmail account
EMAIL_PORT = 587 # Gmail SMTP port for TLS
EMAIL_USE_TLS = True
EMAIL_HOST_USER = 'myemail@gmail.com'
EMAIL_HOST_PASSWORD = 'secret_password'
EMAIL_BACKEND = 'djcelery_email.backends.CeleryEmailBackend' # Use Celery for sending emails
CELERY_EMAIL_TASK_CONFIG =
'name': 'djcelery_email_send',
'ignore_result': True,
BROKER_URL = 'redis://127.0.0.1:6379/1'
我需要进行哪些更改才能使用 Redis、Celery 和 Django 发送电子邮件?
【问题讨论】:
【参考方案1】:此错误与电子邮件无关。 Celery 正在尝试通过 amqp 协议连接到消息代理,这意味着 RabbitMQ;但是您说您已将 Redis 设置为代理。所以 Celery 没有采用这种设置。
原因是你需要在你的 settings.py 中为 Celery 特定的设置加上 CELERY_
前缀;所以 BROKER_URL 应该是CELERY_BROKER_URL
。
【讨论】:
正确。这是由于:app.config_from_object('django.conf:settings', namespace='CELERY')
以上是关于使用 Redis、Celery 设置 Django 以通过 Gmail 发送电子邮件的主要内容,如果未能解决你的问题,请参考以下文章
django redis celery 和 celery beats 的正确设置