Kafka Consumer 启动测试类
Posted 十光年
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka Consumer 启动测试类相关的知识,希望对你有一定的参考价值。
https://github.com/MarcoGhise/SpringKafka.git
1 package it.demo.kafka.springkafka.listener; 2 3 import org.springframework.beans.BeansException; 4 import org.springframework.context.ApplicationContext; 5 import org.springframework.context.ApplicationContextAware; 6 import org.springframework.integration.endpoint.EventDrivenConsumer; 7 import org.springframework.integration.endpoint.SourcePollingChannelAdapter; 8 import org.springframework.integration.kafka.support.ConsumerConfiguration; 9 import org.springframework.integration.kafka.support.KafkaConsumerContext; 10 11 import com.yammer.metrics.Metrics; 12 13 public class KafkaConsumerStarter implements ApplicationContextAware 14 { 15 private ApplicationContext appContext; 16 17 private SourcePollingChannelAdapter kafkaInboundChannelAdapter; 18 19 private KafkaConsumerContext kafkaConsumerContext; 20 21 public void initIt() throws Exception 22 { 23 kafkaInboundChannelAdapter = appContext.getBean("kafka-inbound-channel-adapter", SourcePollingChannelAdapter.class); 24 kafkaInboundChannelAdapter.start(); 25 26 kafkaConsumerContext = appContext.getBean("consumerContext", KafkaConsumerContext.class); 27 } 28 29 public void cleanUp() throws Exception 30 { 31 if (kafkaInboundChannelAdapter != null) 32 { 33 kafkaInboundChannelAdapter.stop(); 34 } 35 36 Thread.sleep(1000); 37 38 Metrics.defaultRegistry().shutdown(); 39 } 40 41 public void setApplicationContext(ApplicationContext applicationContext) throws BeansException 42 { 43 this.appContext = applicationContext; 44 } 45 46 }
以上是关于Kafka Consumer 启动测试类的主要内容,如果未能解决你的问题,请参考以下文章
kafkaThe group member needs to have a valid member id before actually entering a consumer group(代码片段
kafka 创建消费者报错 consumer zookeeper is not a recognized option
kafka启动consumer报java.nio.channels.UnresolvedAddressException