(十三)ATP应用测试平台——springboot集成kafka案例实战

Posted 北溟溟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了(十三)ATP应用测试平台——springboot集成kafka案例实战相关的知识,希望对你有一定的参考价值。

前言

kafka消息中间键也是我们经常要用到的功能,尤其是在大数据、高并发的项目中,如日志收集、业务数据分发等等。其最核心的俩大功能作用是:①削峰填谷②异步解耦。本节我们主要介绍一下如何在springboot项目中集成kafka消息中间键,实现简单的数据分发以及消费的案例。

正文

  • kafka集群搭建

快速搭建一个kafka集群,我们这里以docker环境为例,搭建一个kafka集群,具体搭建过程请参考以下博客,这里不再赘述。

(六)史上最强ELK集群搭建系列教程——kafka集群搭建_北溟的博客-CSDN博客

  • springboot项目引入kafka依赖

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

  • 引入kafka配置

spring:
  #kafka配置
  kafka:
    #kafka集群地址
    bootstrap-servers: 192.168.56.10:9091,192.168.56.10:9092,192.168.56.10:9093
    producer:
      #批量发送的数据量大小
      batch-size: 16384
      #可用发送数量的最大缓存
      buffer-memory: 33554432
      #key序列化器
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      #value序列化器
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        linger.ms: 1
    consumer:
      enable-auto-commit: false
      auto-commit-interval: 100ms
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        session.timeout.ms: 15000
      group-id: atp-group-id

  • 创建一个kafka生产者生产消息

①创建一个kafka生产者类KafkaProducer生产消息

package com.yundi.atp.platform.kafka;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * @Author: yanp
 * @Description: kafka生产者
 * @Date: 2021/11/3 18:31
 * @Version: 1.0.0
 */
@Component
@Slf4j
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<Integer, String> kafkaTemplate;

    public void sendMsg(String topic, String msg) {
        log.info("开始发送kafka消息:[topic:{},msg:{}]", topic, msg);
        ListenableFuture<SendResult<Integer, String>> sendMsg = kafkaTemplate.send(topic, msg);
        //消息确认
        sendMsg.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.error("send error:[ex:{},topic:{},msg:{}]", throwable, topic, msg);
            }

            @Override
            public void onSuccess(SendResult<Integer, String> stringStringSendResult) {
                log.info("send success:[topic:{},msg:{}]", topic, msg);
            }
        });
    }
}

 ②通过web服务发送生产消息测试

package com.yundi.atp.platform.module.test.controller;

import com.yundi.atp.platform.common.Result;
import com.yundi.atp.platform.kafka.KafkaProducer;
import io.swagger.annotations.Api;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Author: yanp
 * @Description:
 * @Date: 2021/11/4 17:32
 * @Version: 1.0.0
 */
@Api(tags = {"Springboot集成Kafka测试"})
@RestController
@RequestMapping(value = "/kafka")
public class KafkaContoller {
    @Autowired
    private KafkaProducer kafkaProducer;

    @GetMapping(value = "/sendKafkaTestMsg/{topic}/{msg}")
    public Result sendKafkaTestMsg(@PathVariable(value = "topic")String topic,@PathVariable(value = "msg")String msg){
        kafkaProducer.sendMsg(topic,msg);
        return Result.success();
    }
}

③通过测试页面发送生产者消息

<template>
  <div class="container">
    <div class="title">
      <span>Springboot集成kafka案例</span>
      <el-divider direction="vertical"></el-divider>
      <router-link to="home">
        <span style="font-size: 18px;">退出</span>
      </router-link>
    </div>
    <el-divider>Test Staring</el-divider>
    <div style="width: 400px;background: #ddd;padding: 40px 20px;">
      <el-form ref="form" :model="form" label-width="70px" class="login">
        <el-form-item label="发送主题" prop="topic">
          <el-input v-model="form.topic"></el-input>
        </el-form-item>
        <el-form-item label="发送消息" prop="name">
          <el-input v-model="form.msg"></el-input>
        </el-form-item>
        <el-button type="primary" @click="sengMsg" style="width: 100%;margin: 0;">立即发送</el-button>
      </el-form>
    </div>
  </div>
</template>

<script>
export default {
  name: "Kafka",
  data() {
    return {
      form: {
        topic: 'atp',
        msg: 'hello world!',
      }
    }
  },
  methods: {
    sengMsg() {
      this.$http.get('/kafka/sendKafkaTestMsg/'+this.form.topic+"/"+this.form.msg,).then(res => {
        if (res.data.code === 1) {
          this.$refs.form.resetFields();
          this.$message.success(res.data.msg);
        } else {
          this.$refs.form.resetFields();
          this.$message.warning(res.data.msg);
        }
      }).catch(error => {
        this.$message.error(error);
      });
    }
  }
}
</script>

<style scoped lang="scss">
.container {
  padding: 10px;
}
</style>

 

 

  • 创建kafka消费者消费消息

package com.yundi.atp.platform.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * @Author: yanp
 * @Description: kafka消费者
 * @Date: 2021/11/3 18:32
 * @Version: 1.0.0
 */
@Component
@Slf4j
public class KafkaConsumer {

    /**
     * 消费者:监听test主题,分组:groupId1
     */
    @KafkaListener(topics = {"test"}, groupId = "groupId1")
    public void consumer1(ConsumerRecord<Integer, String> record) {
        log.info("consumer1 start consume message:[topic:{},msg:{}]", record.topic(), record.value());
    }

    /**
     * 消费者:监听atp主题,分组:groupId1
     */
    @KafkaListener(topics = {"atp"}, groupId = "groupId1")
    public void consumer2(ConsumerRecord<Integer, String> record) {
        log.info("consumer2 start consume message:[topic:{},msg:{}]", record.topic(), record.value());
    }

    /**
     * 消费者:监听atp主题,分组:groupId3
     */
    @KafkaListener(topics = {"atp"}, groupId = "groupId2")
    public void consumer3(ConsumerRecord<Integer, String> record) {
        log.info("consumer3 start consume message:[topic:{},msg:{}]", record.topic(), record.value());
    }


}

  • 结果测试

①发送生产者消息

②消费者消费消息

结语

ok,到这里关于springboot集成kafka案例实战就结束了,我们下期见。。。

以上是关于(十三)ATP应用测试平台——springboot集成kafka案例实战的主要内容,如果未能解决你的问题,请参考以下文章

(十八)ATP应用测试平台——关于springboot应用监控的那些事

(十八)ATP应用测试平台——关于springboot应用监控的那些事

(十九)ATP应用测试平台——springboot集成RocketMQ案例实战

(二十四)ATP应用测试平台——springboot集成fastdfs上传与下载功能

ATP应用测试平台——使用bat批处理实现springboot项目的启动与关闭

(十七)ATP应用测试平台——自定义实现一个springboot2的线程池启动器starter