SpringBoot集成Kafka,实现简单的收发消息

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot集成Kafka,实现简单的收发消息相关的知识,希望对你有一定的参考价值。

参考技术A

在kafka的 config 目录下找到 server.properties 配置文件

把 listeners 和 advertised.listeners 两处配置的注释去掉,可以根据需要配置连接的服务器 外网IP 和 端口号 ,我这里演示选择的是本地 localhost 和默认端口 9092

KafkaTemplate 这个类包装了个生产者 Producer ,来提供方便的发送数据到 kafka 的主题 topic 里面。
send() 方法的源码, KafkaTemplate 类中还重载了很多 send() 方法,有需要可以看看源码

通过 KafkaTemplate 模板类发送数据。
kafkaTemplate.send(String topic, K key, V data) ,第一个入参是主题,第二个入参是发送的对象,第三个入参是发送的数据。通过 @KafkaListener 注解配置用户监听 topics

bootstrap-servers :kafka服务器地址(可以多个)
consumer.group-id :指定一个默认的组名
不指定的话会报

1. earliest :当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,从头开始消费
2. latest :当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费新产生的该分区下的数据
3. none : topic 各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的 offset ,则抛出异常

这个属性也是必须配置的,不然也是会报错的

在使用Kafka发送接收消息时,生产者 producer 端需要序列化,消费者 consumer 端需要反序列化,由于网络传输过来的是 byte[] ,只有反序列化后才能得到生产者发送的真实的消息内容。这样消息才能进行网络传输
consumer.key-deserializer 和 consumer.value-deserializer 是消费者 key/value 反序列化
producer.key-deserializer 和 producer.value-deserializer 是生产者 key/value 序列化

StringDeserializer 是内置的字符串反序列化方式

StringSerializer 是内置的字符串序列化方式

在 org.apache.kafka.common.serialization 源码包中还提供了多种类型的序列化和反序列化方式
要自定义序列化方式,需要实现接口 Serializer
要自定义反序列化方式,需要实现接口 Deserializer

详细可以参考
https://blog.csdn.net/shirukai/article/details/82152172

这是 Kafka 的消费者 Consumer 的配置信息,每个消费者都会输出该配置信息

访问 http://localhost:8080/kafka ,就可以看到控制台打印消息了

SpringBoot系列之集成kafka实现事件发布

事件发布订阅实现,我们经常使用到spring框架提供的ApplicationEventPublisher,基于kafka的特性,我们也可以简单实现类似的效果

1、kafka环境部署搭建

官网下载链接:https://kafka.apache.org/downloads,最开始用最新版的,发现在我的win10系统没部署成功,所以还是选择2.8.1版本的

在D:\\kafka_2.12-2.8.1\\bin\\windows,使用cmd命令启动zookeeper,window系统修改conf文件夹下面的zookeeper.properties里面的dataDir

zookeeper-server-start.bat ..\\..\\config\\zookeeper.properties


window系统修改conf文件夹下面的log.dirs路径

kafka-server-start.bat ..\\..\\config\\server.properties

2、kafka常用命令使用

启动另外一个cmd参考,创建一个命令为test-topic的topic

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic

查看kafkatopic列表

kafka-topics.bat --list --zookeeper localhost:2181

启动kafka的生产者,发送topic数据

kafka-console-producer.bat --broker-list localhost:9092 --topic test-topic


启动一个kafka消费者端,可以接收到消息数据

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-topic --from-beginning

3、创建一个kafka starter工程

创建一个工程,实现对kafka的api简单封装

jdk选择jdk8的

选择需要的依赖

基于kafka的EventPublisher

package com.example.ebus.publisher;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEvent;
import org.springframework.kafka.core.KafkaTemplate;

@Slf4j
public class MyEventPublisher 

    private KafkaTemplate<String, Object> kafkaTemplate;

    @Value("$app.ebus.topic:ebus")
    private String topic;

    public MyEventPublisher(KafkaTemplate<String, Object> kafkaTemplate) 
        this.kafkaTemplate = kafkaTemplate;
    

    public void publishEvent(Object event) 
        if (log.isInfoEnabled()) 
            log.info("topic发送:", event.getClass().getName());
        
        kafkaTemplate.send(topic, event);
    



自动配置类

package com.example.ebus.configuration;

import com.example.ebus.publisher.MyEventPublisher;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;

@Configuration
public class EbusAutoConfiguration 

    @Bean
    public MyEventPublisher myEventPublisher(@Qualifier("kafkaTemplate") KafkaTemplate<String, Object> kafkaTemplate) 
        return new MyEventPublisher(kafkaTemplate);
    



META-INF/spring.factories,加上配置

org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.example.ebus.configuration.EbusAutoConfiguration

4、kafka生产端实现

同样创建一个生产者端工程,引入starter,加上yml配置

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

写个测试接口测试数据:

package com.example.producer.controller;

import com.example.ebus.event.ShopOrderEvent;
import com.example.ebus.publisher.MyEventPublisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(value = "/api")
public class ShopOrderController 

    @Autowired
    private MyEventPublisher eventPublisher;

    @PostMapping("/order")
    public String placeOrder(@RequestBody ShopOrderEvent orderEvent) 
        eventPublisher.publishEvent(orderEvent);
        return orderEvent.getOrderCode();
    



    "orderCode":"123456",
    "productName":"三星手机",
    "price":1122,
    "productDesc":"三星手机"

5、kafka消费者端实现

同样创建一个消费者工程

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      enable-auto-commit: true
      group-id: consumer1
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      max-poll-records: 1
      properties:
        spring:
          json:
            trusted:
              packages: '*'


进行监听,使用kafka的KafkaListener

package com.example.consumer.handler;


import com.example.ebus.event.ShopOrderEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class OrderListenerHandler 

    @KafkaListener(topics = "$app.ebus.topic:ebus")
    public void obtainTopicData(ShopOrderEvent event) 
        log.info("下单成功,orderCode:" , event.getOrderCode());
    



本博客代码例子可以在GitHub找到下载链接

参考资料

以上是关于SpringBoot集成Kafka,实现简单的收发消息的主要内容,如果未能解决你的问题,请参考以下文章

springboot kafka集成(实现producer和consumer)

SpringBoot系列之集成kafka实现事件发布

springboot 整合kafka

(十三)ATP应用测试平台——springboot集成kafka案例实战

SpringBoot集成Kafka

kafka学习:kafka简单命令操作&springboot+kafka