Springboot集成Kafka
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Springboot集成Kafka相关的知识,希望对你有一定的参考价值。
参考技术A 最近陆续收到不少朋友反馈,他们在计划要在springboot项目引入kafka中间件。在网上找过很多资料,但都比较零散,按照其说明进行操作后让项目正常跑起来仍是比较坎坷,听说我对kafka比较了解,希望给予一起分享。刚好最近因为疫情,实际相对于平常稍宽松,也想借此写点东西,一来作为自己的总结,二来可以给予有需要的朋友一些引导,针对此文期望对各位遇到问题的朋友有一定的帮助。1. 安装JDK ,具体版本可以根据项目实际情况选择,目前使用最多的为jdk8
2. 安装Zookeeper ,具体版本可以根据项目实际情况选择,本项目使用的是3.5.8
3. 安装Kafka ,具体版本可以根据项目实际情况选择,本项目使用的是3.5.1
4. 安装Kafka Manage (非必要:安装主要是了对kafka项目操作提供图形化界面操作),具体版本可以根据项目实际情况选择,本项目使用的是1.3.3.7
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.12.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<!--springboot web依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--kafka依赖 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
producer: #生产者
# 发生错误后,消息重发的次数。重试1次,此值需要结合业务场景,重试与否各有千秋(重试,好处:尽可能的确保生产者写入block成功;坏处:有可能时带有顺序写入的数据打乱顺序
#比如:依次写入数据 1/2/3,但写入1时因网络异常,进行了重写,结果落到block的数据成了2/3/1)
retries: 1
#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。模式时16k
batch-size: 16384 #16k
# 设置生产者内存缓冲区的大小
buffer-memory: 33554432
acks: 1
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
#group-id: default-group
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D.此属性只有在enable-auto-commit:true时生效
auto-commit-interval: 1S
enable-auto-commit: false
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: earliest
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringSerializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringSerializer
listener:
# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
# RECORD
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
# BATCH
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
# TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
# COUNT
# TIME | COUNT 有一个条件满足时提交
# COUNT_TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
# MANUAL
# 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
# MANUAL_IMMEDIATE
ack-mode: manual_immediate
# 在侦听器容器中运行的线程数
concurrency: 5
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
-- 127.0.0.1:2181 zookeeper 服务器地址
-- replication-factor partitions 副本数量
--partitions partition数量
点击【Cluster】>【Add Cluster】打开如下添加集群的配置界面:
输入集群的名字(如Kafka-Cluster-1)和 Zookeeper 服务器地址(如localhost:2181),选择最接近的Kafka版本(如0.10.1.0)
package com.charlie.cloudconsumer.service.impl.kafka;
import com.charlie.cloudconsumer.common.utils.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.UUID;
/**
* @Author: charlie
* @CreateTime: 2022/4/9
* @Description : kafka消息生产端,为实践业务提供向kafka block发现消息的API
*/
@Component
@Slf4j
public class QueueProducer
@Autowired
private KafkaTemplatekafkaTemplate;
public void sendQueue(String topic,Object msgContent)
String obj2String =JSON.toJSONString(msgContent);
log.info("kafka service 准备发送消息为:",obj2String);
//发送消息
ListenableFuture>future =kafkaTemplate.send(topic,UUID.randomUUID().toString(),obj2String);
future.addCallback(new ListenableFutureCallback>()
//消息发送成功
@Override
public void onSuccess(SendResult stringObjectSendResult)
log.info("[kafka service-生产成功]topic:,结果",topic, stringObjectSendResult.toString());
//消息发送失败
@Override
public void onFailure(Throwable throwable)
//发送失败的处理,本处只是记录了错误日志,可结合实际业务做处理
log.info("[kafka service-生产失败]topic:,失败原因",topic, throwable.getMessage());
);
package com.charlie.cloudconsumer.service.impl.kafka;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.stereotype.Component;
/**
* @Author: charlie
* @CreateTime: 2022/4/9
* @Description : 消费端实际的业务处理对象
*/
@Component //添加此注解的原因是因为消费端在项目启动时就会初始化,消费端需要用到此类,故也让此类在项目启动时进行注册
public class QueueDataProcess
public boolean doExec(Object obj)
// todu 具体的业务逻辑
if (ObjectUtils.isNotEmpty(obj))
return true;
else
return false;
package com.charlie.cloudconsumer.service.impl.kafka;
import com.charlie.cloudconsumer.common.utils.JSON;
import com.charlie.cloudconsumer.model.Order;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* @Author: charlie
* @CreateTime: 2022/4/9
* @Description : kafka消息消费端,负责消费特定topic消息
*/
@Component
@Slf4j
@SuppressWarnings("all")
public class QueueConsumer
@Autowired
private QueueDataProcess queueDataProcess;
/**
*
*/
@KafkaListener(topics ="test", groupId ="consumer")
public void doConsumer(ConsumerRecord record,Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC)String topic)
Optional message =Optional.ofNullable(record.value());
if (message.isPresent())
try
Object msg =message.get();
log.info("[kafka-消费] doConsumer 消费了: Topic:" + topic +",Message:" +msg);
boolean res =queueDataProcess.doExec(JSON.parseObject(msg.toString(),Order.class));
if (res)
ack.acknowledge();
catch (Exception ex)
log.error("[kafka-消费异常] doConsumer Error ",ExceptionUtils.getFullStackTrace(ex));
package com.charlie.cloudconsumer.controller;
import com.alibaba.fastjson.JSON;
import com.charlie.cloudconsumer.common.utils.AjaxResult;
import com.charlie.cloudconsumer.common.utils.BuildResponseUtils;
import com.charlie.cloudconsumer.model.Order;
import com.charlie.cloudconsumer.service.impl.kafka.QueueProducer;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author: charlie
* @CreateTime: 2022/4/9
* @Description : kafka消息发送控制器,负责接受用户的发送的队列消息
*/
@RestController
@RequestMapping(value ="/kafka",produces =MediaType.APPLICATION_JSON_VALUE)
public class KafkaController
@Autowired
private QueueProducer queueProducer;
@RequestMapping(value = "/send",method = RequestMethod.POST)
public AjaxResult<?> sendMsg(@RequestBody Order order)
AjaxResult<?> ajaxResult= null;
if (ObjectUtils.isNotEmpty(order))
queueProducer.sendQueue("test",order);
ajaxResult = BuildResponseUtils.success(0,"发送消息:"+ JSON.toJSONString(order) + "成功!");
else
ajaxResult = BuildResponseUtils.success(1,"发送消息:"+ JSON.toJSONString(order) + "失败!");
return ajaxResult;
以上是关于Springboot集成Kafka的主要内容,如果未能解决你的问题,请参考以下文章