Symfony Messenger 使用 Apache Kafka 作为队列传输

Posted

技术标签:

【中文标题】Symfony Messenger 使用 Apache Kafka 作为队列传输【英文标题】:Symfony Messenger with Apache Kafka as queue transport 【发布时间】:2020-02-07 14:10:37 【问题描述】:

我不知道如何为Symfony messenger 配置Kafka。一切都适用于rabbitmq(我创建了 messenger 和 messenger 处理程序):

.环境:

MESSENGER_TRANSPORT_DSN=amqp://user:password@myhost:5672/%2f/messages

config/packages/messenger.yaml

framework:
  messenger:
    transports:
      async: "%env(MESSENGER_TRANSPORT_DSN)%"

.env

MESSENGER_TRANSPORT_DSN=enqueue://node-1.kafka.myhost.com:9092/%2f/messages

config/packages/messenger.yaml

framework:
  messenger:
    transports:
      async: "%env(MESSENGER_TRANSPORT_DSN)%"

请给我最好的例子。谢谢!

【问题讨论】:

我能够解决问题 ;) 我面临几个问题。你能分享你的配置吗?这对我很有帮助。 @Virushabadoss 特别为您服务! :) 非常感谢@Arkadiusz G. 【参考方案1】:

我的开发者:Docker + Centos 7 + php73,nginx

此配置的解决方案:

1.安装php-rdkafka(重要:版本3.1.x!,更改php的路径;))

yum -y install make librdkafka-devel && git clone --branch 3.1.x https://github.com/arnaud-lb/php-rdkafka.git && cd php-rdkafka && /path/to/php73/root/bin/phpize && ./configure --with-php-config=/path/to/php73/root/bin/php-config && make all -j 5 && make install

2。在 php.ini 中添加 php 扩展

[rdkafka]
extension=rdkafka.so

3.为 symfony 安装包:

composer req symfony/messenger enqueue/rdkafka enqueue/enqueue-bundle sroze/messenger-enqueue-transport enqueue/async-event-dispatcher

4.注册捆绑包 - 添加到 config/bundles.php

Enqueue\Bundle\EnqueueBundle::class => ['all' => true],
Enqueue\MessengerAdapter\Bundle\EnqueueAdapterBundle::class => ['all' => true],

5.添加文件 config/packages/enqueue.yaml:

enqueue:
  default:
    transport:
      dsn: "rdkafka://"
      global:
        group.id: 'myapp'
        metadata.broker.list: "%env(KAFKA_BROKER_LIST)%"
      topic:
        auto.offset.reset: beginning
      commit_async: true
    client: ~

6.添加文件 config/packages/messenger.yaml:

framework:
  messenger:
    failure_transport: failed

    transports:
      async:
        dsn:  "%env(MESSENGER_TRANSPORT_DSN)%"
      failed:
        dsn: "doctrine://default?queue_name=failed"

    routing:
      'App\Message\EmailNotification': async

7.添加到 .env:

###> messenger ###
MESSENGER_TRANSPORT_DSN=enqueue://default?topic[name]=messages&queue[name]=messages
KAFKA_BROKER_LIST=node-1.kafka.host:9092,node-2.kafka.host:9092,node-3.kafka.host:9092
###< messenger ###

8.文档中的 Message 和 MessageHandler: https://symfony.com/doc/current/messenger.html

9.运行消费者:

php bin/console messenger:consume async

祝你好运!

【讨论】:

用 symfony messenger 设置 kafka 太难了。我一直在遵循您的指导方针并阅读不同的文章。我想说你的指导方针是我最接近使用 kafka + symfony 的方法。我已经完成了所有事情,包括第 8 步。但我仍然无法成功生产和消费。按照您的步骤没有错误,也没有错误消费或生产,但我在 confluent.io kafka 服务器上看不到任何消息。你能和我们分享你的设置吗,我觉得我们一定缺少什么。 从 confluent 在 kafka 界面中创建主题后。我已经设法生成消息,但我仍然无法使用 php bin/console messenger:consume async 使用消息 - 任何帮助将不胜感激 MESSENGER_TRANSPORT_DSN=enqueue://default?topic[name]=messages&amp;queue[name]=messages 并确保创建主题消息 - 之后一切正常,我花了很多调试才发现 :)

以上是关于Symfony Messenger 使用 Apache Kafka 作为队列传输的主要内容,如果未能解决你的问题,请参考以下文章

Symfony Messenger 使用 Apache Kafka 作为队列传输

Symfony Messenger 不会总是重启

Symfony messenger 和 mailer:如何添加 binding_key?

在 Symfony Messenger 组件中使用独立的发件人

通过 Symfony Messenger 执行邮件发送

Symfony Messenger / RabbitMQ 中的消费者错误处理