通过 celery 向 django-channels 发送消息
Posted
技术标签:
【中文标题】通过 celery 向 django-channels 发送消息【英文标题】:Sending messages to django-channels via celery 【发布时间】:2017-06-09 05:03:57 【问题描述】:所以我有一个预定的 celery beat 任务(celery.py):
@app.on_after_configure.connect
def setup_periodic_tasks(sender,
**kwargs):
sender.add_periodic_task(10.0, test_event, name='test')
还有任务(events/tasks.py):
@shared_task
def test_event():
from .models import Event
Event.objects.create()
当事件被创建时,接收器被触发,它应该向通道组发送消息(events/receivers.py):
@receiver(post_save, sender=Event)
def event_post_add(sender, instance, created, *args, **kwargs):
if created:
print("receiver fired")
Group("test").send(
"text": json.dumps(
'type': 'test',
)
)
主要问题是接收器在 celery beat 过程中被触发,并且没有通过 django 通道发送任何内容。没有错误消息,什么都没有,只是没有发送。
如何整合这两者,以便能够从 celery 后台进程向通道发送消息?
【问题讨论】:
【参考方案1】:您好,我不知道您是否找到了解决方案。但是由于我自己被困在这个问题上,所以我尝试了解决方法。 我为需要通过 websocket 发送的消息创建了一个视图,并从 celery beat 向它发出请求 观点:
def send_message(request,uuid,name):
print('lamo')
ty = f"uuid_uuid"
data=
'message':f'name Driver is now Avaliable',
'sender':'HQ',
'id':str(uuid),
'to':str(uuid),
'type':'DriverAvailable',
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
ty,
'type':'chat_message',
'message':data,
)
和任务:
def my_task():
list=[]
for d in Driver_api.objects.all():
if d.available_on !=None:
if d.available_on <= timezone.now():
d.available_on = None
d.save()
uuid = str(d.user.uuid)
requests.get(f'DOMAINmessage/sendMessage/uuid/d.name')
logger.info('Adding 0'.format(d.user.uuid))
return list
对于我解决问题的方法中的任何疏忽或疏忽深表歉意。
【讨论】:
【参考方案2】:信号在 Django 中实际上并不是异步的。 所以在:
@shared_task
def test_event():
from .models import Event
Event.objects.create() # This will fire a signal and the signal will
# still be interpreted by celery
以下链接详细描述了此问题: https://githubmemory.com/repo/CJWorkbench/channels_rabbitmq/issues/37
我已经检查了 redis_channels 重新连接和次优性能的声明(如链接中所述),但我找不到它发生。
这是我正在运行的代码,Celery 正在向 django 频道发送消息。
class DjangoView(APIView):
def get(request):
send_message_to_channels.delay()
@shared_task
def send_message_to_channels():
send_test_message("Hello from celery", 200)
def send_test_message(message: str, code: int):
channel_layer = get_channel_layer()
channel_name = "celery-channels-test"
async_to_sync(channel_layer.group_send)(channel_name,
'type': 'consumer.test.message',
'message': message,
'code': code
)
使我的代码工作的先决条件
安装channels_redis(其他软件包只是失败) 在 django settings.py 中定义 channel_layer 指向 redis 数据库如果这不起作用,我想简单但非常讨厌的方法是向 django 中的视图发出请求,就像在@cosmo_boi 回答中一样
【讨论】:
以上是关于通过 celery 向 django-channels 发送消息的主要内容,如果未能解决你的问题,请参考以下文章
如何向 Celery (celerybeat) 动态添加/删除周期性任务
如何向 Django Celery Flower Monitoring 添加身份验证和端点?
Celery-Supervisor:如何重新启动主管工作以使新更新的 celery-tasks 工作?