SpringBoot 集成 Kafka

Posted 深色風信子

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot 集成 Kafka相关的知识,希望对你有一定的参考价值。

SpringBoot 集成 Kafka

1 安装 Kafka

Docker 安装 Kafka

2 创建 Topic

创建两个topic:topic1、topic2,其分区和副本数都设置为1 (可以在Java代码中创建)

PS C:\\Users\\Administrator> docker exec -it kafka /bin/sh

$ kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic1
Created topic topic1.

$ kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic2
Created topic topic2.

3 Java 创建 Topic

package com.xu.mq.demo.test.service;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import org.apache.kafka.clients.admin.NewTopic;

/**
 * kafka 初始化配置类 创建 Topic
 *
 * @author Administrator
 * @date 2023年2月17日11点30分
 */
@Configuration
public class KafkaInitialConfig 

    public static final String AUDIO_UPLOAD_TOPIC = "AudioUploadTopic";

    public static final String TEXT_UPLOAD_TOPIC = "TextUploadTopic";

    @Bean
    public NewTopic audioUploadTopic() 
        return new NewTopic(AUDIO_UPLOAD_TOPIC, 1, (short) 1);
    

    @Bean
    public NewTopic textUploadTopic() 
        return new NewTopic(TEXT_UPLOAD_TOPIC, 1, (short) 1);
    


4 SpringBoot 项目

4.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.7.8</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>

	<groupId>com.xu</groupId>
	<artifactId>kafka</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<name>kafka</name>

	<description>Demo project for Spring Boot</description>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>

		<dependency>
			<groupId>cn.hutool</groupId>
			<artifactId>hutool-all</artifactId>
			<version>5.8.12</version>
		</dependency>

		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-streams</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<scope>runtime</scope>
			<optional>true</optional>
		</dependency>

	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<configuration>
					<excludes>
						<exclude>
							<groupId>org.projectlombok</groupId>
							<artifactId>lombok</artifactId>
						</exclude>
					</excludes>
				</configuration>
			</plugin>
		</plugins>
	</build>

</project>

4.2 application.yml

server:
  port: 8001

spring:
  application:
    name: hello-kafka
  kafka:
    # 以逗号分隔的地址列表,用于建立与Kafka集群的初始连接(kafka 默认的端口号为9092)
    bootstrap-servers: 192.168.1.92:9092
    producer:
      # 发生错误后,消息重发的次数。
      retries: 0
      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      acks: all
    consumer:
      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
      auto-offset-reset: earliest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: true
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 在侦听器容器中运行的线程数。
      concurrency: 4

4.3 KafkaApplication.java

package com.xu.mq.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;

/**
 * @author Administrator
 */
@EnableKafka
@SpringBootApplication
public class KafkaApplication 

	public static void main(String[] args) 
		SpringApplication.run(DemoApplication.class, args);
	


4.4 CustomizePartitioner.java

package com.xu.kafka.config;


import java.util.Map;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

/**
 * @author Administrator
 */
public class CustomizePartitioner implements Partitioner 

    /**
     * 自定义分区规则
     *
     * @param topic      The topic name
     * @param key        The key to partition on (or null if no key)
     * @param keyBytes   The serialized key to partition on( or null if no key)
     * @param value      The value to partition on or null
     * @param valueBytes The serialized value to partition on or null
     * @param cluster    The current cluster metadata
     * @return
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) 
        return 0;
    

    @Override
    public void close() 

    

    @Override
    public void configure(Map<String, ?> configs) 

    


4.5 KafkaInitialConfig.java

package com.xu.kafka.config;


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import org.apache.kafka.clients.admin.NewTopic;

/**
 * kafka 初始化配置类 创建 Topic
 *
 * @author Administrator
 * @date 2023年2月17日11点30分
 */
@Configuration
public class KafkaInitialConfig 

    public static final String AUDIO_UPLOAD_TOPIC = "AudioUploadTopic";

    public static final String TEXT_UPLOAD_TOPIC = "TextUploadTopic";

    @Bean
    public NewTopic audioUploadTopic() 
        return new NewTopic(AUDIO_UPLOAD_TOPIC, 1, (short) 1);
    

    @Bean
    public NewTopic textUploadTopic() 
        return new NewTopic(TEXT_UPLOAD_TOPIC, 1, (short) 1);
    


4.6 SendMessageController.java

package com.xu.kafka.message.controller;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.xu.kafka.config.KafkaInitialConfig;

import cn.hutool.json.JSONUtil;

/**
 * @author Administrator
 */
@RequestMapping(value = "/kafka")
@RestController
public class SendMessageController 

    @Autowired
    private KafkaTemplate template;

    /**
     * KafkaTemplate 发送消息
     *
     * @param message
     */
    @GetMapping("/test1/message")
    public void test1(@PathVariable("message") String message) 
        template.send(KafkaInitialConfig.AUDIO_UPLOAD_TOPIC, message);
    

    /**
     * KafkaTemplate 发送消息 有回调
     *
     * @param message
     */
    @GetMapping("/test2/message")
    public void test2(@PathVariable("message") String message) 
        template.send(KafkaInitialConfig.AUDIO_UPLOAD_TOPIC, message).addCallback(success -> 
            System.out.println("发送成功\\t" + success);
        , fail -> 
            System.out.println("发送失败\\t" + fail);
        );
    

5 测试

发送成功	SendResult [producerRecord=ProducerRecord(topic=AudioUploadTopic, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=有回调的消息推送111, timestamp=null), recordMetadata=AudioUploadTopic-0@4]

以上是关于SpringBoot 集成 Kafka的主要内容,如果未能解决你的问题,请参考以下文章

KafkaKafka核心使用和SpringBoot整合

项目系统中使用Spring boot集成kafka业务实现系统处理消费实例

SpringBoot Kafka 整合使用

Springboot 消息生产与消费基于Kafka

Linux安装flume和集成kafka测试

大数据技术之KafkaKafka APIKafka监控Flume对接KafkaKafka面试题