SpringBoot整合Kafka消息队列并实现发布订阅和消费

Posted 低调小马(mcy)

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot整合Kafka消息队列并实现发布订阅和消费相关的知识,希望对你有一定的参考价值。

pom依赖 --版本和springboot相关

  <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
  </dependency>

配置文件 yml

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test1
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual
# 提交offset延时(接收到消息后多久提交offset)
#spring.kafka.consumer.auto.commit.interval.ms=10000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false


#
spring.kafka.producer.group-id=test1
#spring.kafka.producer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.producer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

#重试次数
spring.kafka.producer.retries=0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延时
spring.kafka.producer.properties.linger.ms=0
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了

# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432

配置发送者

import java.util.HashMap;
import java.util.Map;
 
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
 
/**e
* @date  2022/03/10
* @author mcy
* @version 1.0.0
*/
@Configuration
@EnableKafka
public class KafkaProducerConfig 
 
    @Value("$kafka.producer.servers")
    private String servers;
    @Value("$kafka.producer.retries")
    private int retries;
    @Value("$kafka.producer.batch.size")
    private int batchSize;
    @Value("$kafka.producer.linger")
    private int linger;
    @Value("$kafka.producer.buffer.memory")
    private int bufferMemory;
 
 
    public Map<String, Object> producerConfigs() 
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    
 
    public ProducerFactory<String, String> producerFactory() 
        return new DefaultKafkaProducerFactory<>(producerConfigs(),
                new StringSerializer(),
                new JsonSerializer<String>());
    
 
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() 
        return new KafkaTemplate<>(producerFactory());
    

配置消费者

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
 
import java.util.HashMap;
import java.util.Map;
/**e
 * @date  2022/03/10
 * @author mcy
 * @version 1.0.0
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig 
 
    @Value("$kafka.consumer.servers")
    private String servers;
    @Value("$kafka.consumer.enable.auto.commit")
    private boolean enableAutoCommit;
    @Value("$kafka.consumer.session.timeout")
    private String sessionTimeout;
    @Value("$kafka.consumer.auto.commit.interval")
    private String autoCommitInterval;
    @Value("$kafka.consumer.group.id")
    private String groupId;
    @Value("$kafka.consumer.auto.offset.reset")
    private String autoOffsetReset;
    @Value("$kafka.consumer.concurrency")
    private int concurrency;
 
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() 
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    
 
    private ConsumerFactory<String, String> consumerFactory() 
        return new DefaultKafkaConsumerFactory<>(
                consumerConfigs(),
                new StringDeserializer(),
                new JsonDeserializer<>(String.class)
        );
    
 
 
    private Map<String, Object> consumerConfigs() 
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return propsMap;
    

配置生产者监听

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.stereotype.Component;

/**e
* @date  2022/03/10
* @author mcy
* @version 1.0.0
*/

@Component
@Slf4j
public class KafkaProducerListener implements ProducerListener<String,String>
    @Override
    public void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) 
        log.info("发送者监听:消息推送成功,推送数据大小为:byte;推送内容为:",recordMetadata.serializedKeySize(),producerRecord.value());
    

    @Override
    public void onError(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata, Exception exception) 
        log.error("发送者监听:推送失败,失败原因",producerRecord.value(),exception.getMessage());
    
    

配置消费者监听

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**e
 * @date  2022/03/10
 * @author mcy
 * @version 1.0.0
 */
@Component
@Slf4j
public class KafkaConsumerListener 
    @KafkaListener(topics = "test",containerFactory = "kafkaListenerContainnerContainerFactory")
    public void listenConsumer(ConsumerRecord<?,?> record)
        log.info("消费者监听:value为:",record.value());
    

尝试发送数据到kafka队列上

     @Autowired
   	 KafkaTemplate kafkaTemplate;

    /**
     * 我这里就展示一下我全量查询
     * mysql数据库中的数据,然后一条一条的推送到kafka上吧
     * @return
     */
    @RequestMapping(value = "/query",method = RequestMethod.GET)
    public String sendEdith()
        ProducerRecord record = null;
        List<User> userList = userDataService.overviewQuery();
        int num = 0;
        try 
        for (User user: userList 
             ) 
                record = new ProducerRecord<String,String>("test", new ObjectMapper().writeValueAsString(user));
                kafkaTemplate.send(record);
           log.info("成功推送第条数据",++num);
         
            try 
                //这里可以添加线程睡眠控制推送速率
                Thread.sleep(3000);
             catch (InterruptedException e) 
                e.printStackTrace();
            
        
         catch (JsonProcessingException e) 
            e.printStackTrace();
        
        return "数据全部传输完毕!";
    

这里只是将自己控制层的代码发了出来,
没发出来的部分就是简单的一些查询操作,
自己可以根据自己的业务,做出同样的改变即可,
另外免费的点赞,关注,评论,收藏来点呗,阿里嘎多!!!

以上是关于SpringBoot整合Kafka消息队列并实现发布订阅和消费的主要内容,如果未能解决你的问题,请参考以下文章

springboot----数据层技术的选择+各种整合(缓存,消息队列)+定时任务+发邮件

[原创]SpringBoot整合RocketMQ消息队列

springboot整合kafka实现消息推送

springboot整合kafka,kafka消息过滤

springboot整合kafka,kafka消息过滤

Springboot 整合Kafka 实现手动提交 offset