springcloud+kafka集群

Posted sunxuesong

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springcloud+kafka集群相关的知识,希望对你有一定的参考价值。

上节说了kafka在linux环境下怎么搭建集群。这节写一下怎么在springcloud中以stream流方式去做kafka集群对接。

1.yml配置

#spring  Cloud  kafka    -- streams --
  cloud:
     stream:
        kafka:
          binder:
            minPartitionCount: 3  # 分区数量,主要就是为了减轻单台服务器的压力,扩大并发量
            brokers: 192.168.100.100:9092,192.168.100.101:9092,192.168.100.102:9092  # kafka服务地址和端口
            autoCreateTopics: true
            autoAddPartitions: true

2.消息发送

@RestController
@RequestMapping("/kafka")
@EnableBinding(value = WarningStreams.class)
public class kafkaTest 

    @Autowired
    private MessageService messageService;

    /**
     * 测试消息发送,入参就是你的topic,进行发送的时候就算kafka中没有该topic,他也会自动创建一个你传入的topic
* 这里面的Msg是我封装的一个消息对象,可以是随意的一个消息对象,字符串也可以 * @param topic
*/ @RequestMapping("/sendMsg") public void sendMsg(String topic) // 循环发送6次消息,分别发送在不同的分区 for (int i=0; i<=5; i++ ) Msg msg = new Msg(); msg.setData(null); msg.setTaskId("1"); msg.setMsg("测试消息发送"); msg.setMsgId(System.currentTimeMillis() + MathUtil.getFiveRandom()); msg.setSuccess("true"); msg.setCode("200"); msg.setMsgType(100); String result = messageService.sendControl(msg, topic); System.out.println(result);


messageService类:
@Service
public class MessageService 
    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private BinderAwareChannelResolver resolver;

    /**
     * 发送预警消息到指定topic,这里的topic是由平台编码+平台名称组成
     * 若发现kafka中没有该topic,它会自动创建一个由平台编码+平台名称组成的topic
     * @param warnings
     * @param topic
     * @return
     */
    public String sendWarning(final Msg warnings, String topic) 
        logger.info("Sending warnings ", warnings);

        // 获取预警的topic,然后发送预警消息到kafka的topic
        MessageChannel messageChannel = resolver.resolveDestination(topic);
        messageChannel.send(MessageBuilder
                .withPayload(warnings)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build());

        return "send msg ok";
    

    /**
     * 发送布控消息到指定topic,这里的topic是由平台编码+平台名称组成
     * 若发现kafka中没有该topic,它会自动创建一个由平台编码+平台名称组成的topic
     * @param msg
     * @param topic
     * @return
     */
    public String sendControl(final Msg msg, String topic) 
        logger.info("Sending controlMsg ", JSON.toJSONString(msg));
        // 获取布控的topic,然后发送布控消息到kafka的topic
        MessageChannel messageChannel = resolver.resolveDestination(topic);
        messageChannel.send(MessageBuilder
                .withPayload(msg)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build());

        return "send msg ok";
    

发送完毕后会在服务器中的topic下看到你传入的那个topic,并且会有三个分区,每个分区分别对应三台服务器并且每台服务器中会有两条消息,如下图:

技术图片

 

3.消息接收

@RestController
@RequestMapping("/kafka")
@EnableBinding(value = WarningStreams.class)
public class kafkaTest 

    /**
     * 测试消息接收,接收对象用Object,否则收不到
     * @param playLoad
     */
    @StreamListener(WarningStreams.INPUT)
    public void receive(Object playLoad) 
        System.out.println("消息消费..result=="+ JSON.toJSONString(playLoad));
    

当消息被消费后,分区中的数据释放被清空,但是会保存在硬盘的log日志中。也就是在server.properties中你配置的log目录

 

以上是关于springcloud+kafka集群的主要内容,如果未能解决你的问题,请参考以下文章

基于Kafka实现的Spring Cloud消息总线

springcloud2021版微服务集群

搭建基于 docker 的 Kafka 集群及 Spring Boot 应用访问

Spring Cloud 数据流中的 Kafka 源码

Spring Cloud第八篇 | Hystrix集群监控Turbine

java——spring boot集成kafka——kafka集群中controller的作用