Symfony Messenger 使用 Apache Kafka 作为队列传输
Posted
技术标签:
【中文标题】Symfony Messenger 使用 Apache Kafka 作为队列传输【英文标题】:Symfony Messenger with Apache Kafka as queue transport 【发布时间】:2020-02-07 14:10:37 【问题描述】:我不知道如何为Symfony messenger
配置Kafka
。一切都适用于rabbitmq
(我创建了 messenger 和 messenger 处理程序):
.环境:
MESSENGER_TRANSPORT_DSN=amqp://user:password@myhost:5672/%2f/messages
config/packages/messenger.yaml
framework:
messenger:
transports:
async: "%env(MESSENGER_TRANSPORT_DSN)%"
.env
MESSENGER_TRANSPORT_DSN=enqueue://node-1.kafka.myhost.com:9092/%2f/messages
config/packages/messenger.yaml
framework:
messenger:
transports:
async: "%env(MESSENGER_TRANSPORT_DSN)%"
请给我最好的例子。谢谢!
【问题讨论】:
我能够解决问题 ;) 我面临几个问题。你能分享你的配置吗?这对我很有帮助。 @Virushabadoss 特别为您服务! :) 非常感谢@Arkadiusz G. 【参考方案1】:我的开发者:Docker + Centos 7 + php73,nginx。
此配置的解决方案:
1.安装php-rdkafka(重要:版本3.1.x!,更改php的路径;))
yum -y install make librdkafka-devel && git clone --branch 3.1.x https://github.com/arnaud-lb/php-rdkafka.git && cd php-rdkafka && /path/to/php73/root/bin/phpize && ./configure --with-php-config=/path/to/php73/root/bin/php-config && make all -j 5 && make install
2。在 php.ini 中添加 php 扩展
[rdkafka]
extension=rdkafka.so
3.为 symfony 安装包:
composer req symfony/messenger enqueue/rdkafka enqueue/enqueue-bundle sroze/messenger-enqueue-transport enqueue/async-event-dispatcher
4.注册捆绑包 - 添加到 config/bundles.php
Enqueue\Bundle\EnqueueBundle::class => ['all' => true],
Enqueue\MessengerAdapter\Bundle\EnqueueAdapterBundle::class => ['all' => true],
5.添加文件 config/packages/enqueue.yaml:
enqueue:
default:
transport:
dsn: "rdkafka://"
global:
group.id: 'myapp'
metadata.broker.list: "%env(KAFKA_BROKER_LIST)%"
topic:
auto.offset.reset: beginning
commit_async: true
client: ~
6.添加文件 config/packages/messenger.yaml:
framework:
messenger:
failure_transport: failed
transports:
async:
dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
failed:
dsn: "doctrine://default?queue_name=failed"
routing:
'App\Message\EmailNotification': async
7.添加到 .env:
###> messenger ###
MESSENGER_TRANSPORT_DSN=enqueue://default?topic[name]=messages&queue[name]=messages
KAFKA_BROKER_LIST=node-1.kafka.host:9092,node-2.kafka.host:9092,node-3.kafka.host:9092
###< messenger ###
8.文档中的 Message 和 MessageHandler: https://symfony.com/doc/current/messenger.html
9.运行消费者:
php bin/console messenger:consume async
祝你好运!
【讨论】:
用 symfony messenger 设置 kafka 太难了。我一直在遵循您的指导方针并阅读不同的文章。我想说你的指导方针是我最接近使用 kafka + symfony 的方法。我已经完成了所有事情,包括第 8 步。但我仍然无法成功生产和消费。按照您的步骤没有错误,也没有错误消费或生产,但我在 confluent.io kafka 服务器上看不到任何消息。你能和我们分享你的设置吗,我觉得我们一定缺少什么。 从 confluent 在 kafka 界面中创建主题后。我已经设法生成消息,但我仍然无法使用php bin/console messenger:consume async
使用消息 - 任何帮助将不胜感激
MESSENGER_TRANSPORT_DSN=enqueue://default?topic[name]=messages&queue[name]=messages
并确保创建主题消息 - 之后一切正常,我花了很多调试才发现 :)以上是关于Symfony Messenger 使用 Apache Kafka 作为队列传输的主要内容,如果未能解决你的问题,请参考以下文章
Symfony Messenger 使用 Apache Kafka 作为队列传输
Symfony messenger 和 mailer:如何添加 binding_key?