Spring Integration Kafka Consumer Listener 不接收消息

Posted

技术标签:

【中文标题】Spring Integration Kafka Consumer Listener 不接收消息【英文标题】:Spring Integration Kafka Consumer Listener not Receiving messages 【发布时间】:2017-02-15 15:56:36 【问题描述】:

根据here 提供的文档,我正在尝试使用 POC 将消息发送到same documentation 中提到的侦听器,以下是我编写配置的方式。

@Configuration
public class KafkaConsumerConfig 

    public static final String TEST_TOPIC_ID = "record-stream";

    @Value("$kafka.topic:" + TEST_TOPIC_ID + "")
    private String topic;

    @Value("$kafka.address:localhost:9092")
    private String brokerAddress;


    /*
      @Bean public KafkaMessageDrivenChannelAdapter<String, String> adapter(
      KafkaMessageListenerContainer<String, String> container) 
      KafkaMessageDrivenChannelAdapter<String, String>
      kafkaMessageDrivenChannelAdapter = new
      KafkaMessageDrivenChannelAdapter<>( container, ListenerMode.record);
      kafkaMessageDrivenChannelAdapter.setOutputChannel(received()); return
      kafkaMessageDrivenChannelAdapter; 

      @Bean public QueueChannel received()  return new QueueChannel(); 
     */

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() 

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(30000);
        return factory;

    

    /*
     * @Bean public KafkaMessageListenerContainer<String, String> container()
     * throws Exception  ContainerProperties properties = new
     * ContainerProperties(this.topic); // set more properties return new
     * KafkaMessageListenerContainer<>(consumerFactory(), properties); 
     */

    @Bean
    public ConsumerFactory<String, String> consumerFactory() 
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
        // props.put(ConsumerConfig.GROUP_ID_CONFIG, "mygroup");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // earliest
                                                                        // smallest
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    


监听器如下,

@Service
public class Listener 

    private Logger log = Logger.getLogger(Listener.class);


    @KafkaListener(topicPattern = KafkaConsumerConfig.TEST_TOPIC_ID, containerFactory = "kafkaListenerContainerFactory")
    public void process(String message/* , Acknowledgment ack */) 
        Gson gson = new Gson();
        Record record = gson.fromJson(message, Record.class);

        log.info(record.getId() + " " + record.getName());
        // ack.acknowledge();
    


即使我正在为同一主题生成消息并且此使用者正在处理同一主题,但侦听器并未执行。

我正在运行 Kafka 0.10.0.1,这是我当前的 pom。与许多命令行示例不同,此使用者作为 Spring Boot Web 应用程序工作。

   <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>

        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-kafka</artifactId>
            <version>2.1.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-java-dsl</artifactId>

        </dependency>

        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

我花了很多时间来弄清楚为什么当主题有消息时这个监听器没有被命中,我做错了什么。

我知道我可以使用通道接收消息(我已经在代码中注释掉了配置部分),但是这里的并发处理是干净的。

这种实现是否可以通过异步消息消费实现。

【问题讨论】:

【参考方案1】:

您必须在@Configuration 旁边添加@EnableKafka

很快会add一些描述。

同时:

@Configuration
@EnableKafka
public class KafkaConsumerConfig 

【讨论】:

我认为 EnableIntegration 可以解决这个问题,在我的 Producer 中,我没有 EnableKafka。让我试试这个,然后回到你身边 @EnableIntegration 用于 Spring Integration,但我们在这里讨论的是 Spring Kafka。它们是完全不同的独立项目。看@KafkaListener 在 Spring Integration Kafka 之外。记得有@EnableJms@EnableRabbit 和许多其他@Enable...。唯一的@EnableIntegration 也顾不上所有了。这不是它的责任。 非常感谢您的洞察力,您的工作就像一个魅力。会记住这一点。

以上是关于Spring Integration Kafka Consumer Listener 不接收消息的主要内容,如果未能解决你的问题,请参考以下文章

Spring Integration Kafka Consumer Listener 不接收消息

Spring 集成Kafka(完整版)

如何向普罗米修斯报告 Kafka Producer 的指标(使用 Spring Boot)

JEESZ-kafka消息服务平台实现

JEESZ-kafka消息服务平台实现

JEESZ-kafka消息服务平台实现