SpringBoot整合Kafka下
Posted 楚天阔.
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot整合Kafka下相关的知识,希望对你有一定的参考价值。
当消费端是批量接收消息,配置中的自动提交需要关闭,同时要把手动提交打开
kafka:
###########【Kafka集群】###########
bootstrap-servers: 192.168.188.128:9092
producer:
retries: 0 # 重试次数
acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
batch-size: 16384 # 批量大小
buffer-memory: 33554432 # 生产端缓冲区大小
# Kafka提供的序列化和反序列化类
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#额外的,没有直接有properties对应的参数,将存放到下面这个Map对象中,一并初始化
# properties:
# #自定义分区器
# partitioner.class: com.example.demo.kafka.CustomizePartitioner
consumer:
group-id: javagroup
enable-auto-commit: false
auto-commit-interval: 1000
# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 50
# 批量消费每次最多消费多少条消息
listener:
type: batch
ack-mode: manual_immediate
注意:批量消费和批量发送是无关的,可以一条条发送批量消费也可以批量发送批量消费
当批量发送时:
Producer:
代码如下:
@Data
public class KafkaMessage
public Integer index;
public String id;
public String value;
@GetMapping("BatchSend")
public void sendProducerRecord()
for(int i=0; i<100; i++)
KafkaMessage kafkaMessage = new KafkaMessage();
kafkaMessage.setIndex(i);
kafkaMessage.setId(UUID.randomUUID().toString());
kafkaMessage.setValue("producerRecord " + i);
ProducerRecord<String, Object> producerRecord =
new ProducerRecord<>("topic1", "key1", JSONObject.toJSONString(kafkaMessage));
kafkaTemplate.send(producerRecord);
Consumer:
代码:
@KafkaListener(id = "consumer1",groupId = "javagroup", topics = "topic1")
public void onMessage3(List<ConsumerRecord<?, ?>> records)
System.out.println(">>>批量消费一次,records.size()="+records.size());
for (ConsumerRecord<?, ?> record : records)
System.out.println("****"+record.value());
ApiPost发送请求:
当Producer发送一条消息,Consumer批量消费时:
Producer:
@GetMapping("Producer")
public void sendMessage(String normalMessage)
kafkaTemplate.send("topic1",normalMessage);
Consumer不变
ApiPost发送请求:
注意:如果把配置文件里批量消费关掉
但是Consumer还是批量接收时,会报错:
因此批量消费时务必修改配置文件中的批量监听以及手动提交
当配置文件中批量监控以及手动提交开启
而Consumer为简单消费时:
无论批量发送还是简单发送都会报错;
总结:
配置关闭,批量消费报错
配置打开,简单消费报错
因此配置和消费要保持一致
当Producer批量发送,配置文件批量监听手动提交都关闭,而Consumer为单体消费时:
@KafkaListener(topics = "topic1")
public void onMessage1(ConsumerRecord<?,?>record)
System.out.println("简单消费: "+record.topic()+"_"+record.partition()+"_"+"_"+record.value());
二.回调:
kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法,
// 带回掉的Producer .addCallback(SuccessCallback<>,FailureCallback<>)
@GetMapping("CallBackOne")
public void sendMessage2(String callBackMessage)
kafkaTemplate.send("topic1",callBackMessage).addCallback(successCallBack->
String topic = successCallBack.getRecordMetadata().topic();
int partition = successCallBack.getRecordMetadata().partition();
long offset = successCallBack.getRecordMetadata().offset();
System.out.println("发送消息成功: "+topic+"-"+partition+"-"+offset);
,failureCallBack->
System.out.println("发送消息失败:"+failureCallBack.getMessage());
);
第二种写法:
//带回调的Producer addCallback(new ListenableFutureCallback<SendResult<String, Object>>() )
@GetMapping("CallBackTwo")
public void sendMessage3(String callBackMessage)
kafkaTemplate.send("topic2",callBackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>()
@Override
public void onFailure(Throwable ex)
System.out.println("发送消息失败: "+ex.getMessage());
@Override
public void onSuccess(SendResult<String, Object> result)
System.out.println("发送消息成功: "+"-"+result.getRecordMetadata().offset());
);
补充:全局回调
注意:这样设置以后,该单例的kafkaTemplate就有了一个回调,注意是全局回调,只要一个实例中设置,全局生效
@Component
@Slf4j
public class KafkaSendResultHandler implements ProducerListener
@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata)
System.out.println("Message send success : " + producerRecord.toString());
@Override
public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception)
System.out.println("Message send error : " + producerRecord.toString());
@Autowired(required = false)
private KafkaSendResultHandler producerListener;
//全局回调
@GetMapping("GlobalCallBack")
public void testProducerListen() throws InterruptedException
kafkaTemplate.setProducerListener(producerListener);
kafkaTemplate.send("topic1", "test producer listen");
Thread.sleep(1000);
此时如果调用另一个接口:
此接口虽然未设置回调,但由于是全局回调,所以回调依然生效
三:自定义分区器
我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:
① 若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区;
② 若发送消息时未指定 patition,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区;
③ patition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 patition;
※ 我们来自定义一个分区策略,将消息发送到我们指定的partition,首先新建一个分区器类实现Partitioner接口,重写方法,其中partition方法的返回值就表示将消息发送到几号分区,
public class CustomizePartitioner implements Partitioner
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)
// 得到 topic 的 partitions 信息
List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
System.out.println("partitionInfoList"+partitionInfoList);
System.out.println("topic: "+topic+"-"+"key: "+key.toString()+"-"+"value: "+value.toString());
int size = partitionInfoList.size();
System.out.println("分区有: "+size+"个");
//模拟某客服
if (key.toString().equals("10000")||key.toString().equals("11111"))
//放到最后一个分区中
return size-1;
String phoneNum= key.toString();
return phoneNum.substring(0,3).hashCode()%(size-1);
@Override
public void close()
@Override
public void configure(Map<String, ?> map)
编写一个Kafka生产者来使用自定义分区器:
/**
* @program:demo
* @description:测试自定义分区器Producer
* @auth0r: SYP
* @create: Creted on 2022-05-16 11:38
**/
public class PartitionerProducer
private static final String[] PHONE_NUMS=new String[]"10000", "10000", "11111", "13700000003", "13700000004",
"10000", "15500000006", "11111", "15500000008",
"17600000009", "10000", "17600000011"
;
public static void main(String[] args) throws ExecutionException, InterruptedException
//定义配置信息
Properties props = new Properties();
//kafka地址,多个地址用逗号分隔
props.put("bootstrap.servers","192.168.188.128:9092");
props.put("acks", "all");// 记录完整提交,最慢的但是最大可能的持久化
props.put("retries", 3);// 请求失败重试的次数
props.put("batch.size", 16384);// batch的大小
props.put("linger.ms", 1);// 默认情况即使缓冲区有剩余的空间,也会立即发送请求,设置一段时间用来等待从而将缓冲区填的更多,单位为毫秒,producer发送数据会延迟1ms,可以减少发送到kafka服务器的请求数据
props.put("buffer.memory", 33554432);// 提供给生产者缓冲内存总量
//设置分区器
props.put("partitioner.class","com.example.demo.kafka.CustomizePartitioner");
//设置序列化类,可以写类的全路径
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
int count=0;
int length=PHONE_NUMS.length;
while (count<10)
Random rand = new Random();
String phoneNum=PHONE_NUMS[rand.nextInt(length)];
ProducerRecord<String, String> record = new ProducerRecord<>("topic4", phoneNum, phoneNum);
RecordMetadata metadata = producer.send(record).get();
String result="phoneNum ["+record.value()+"] has been sent to partition " +metadata.partition();
System.out.println(result);
Thread.sleep(500);
count++;
producer.close();
编写一个消费者来消费数据:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
/**
* @program:demo
* @description:自定义分区器Consumer
* @auth0r: SYP
* @create: Creted on 2022-05-18 09:42
**/
public class PartitionerConsumer
private static KafkaConsumer<String,String> consumer;
private final static String TOPIC="topic4";
public PartitionerConsumer()
Properties props = new Properties();
props.put("bootstrap.servers","192.168.188.128:9092");
//每个消费者分配独立的组号
props.put("group.id","javagroup");
//如果value合法,则自动提交偏移量
props.put("enable.auto.commit", "true");
//设置多久一次更新被消费消息的偏移量
props.put("auto.commit.interval.ms", "1000");
//设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
props.put("session.timeout.ms", "30000");
//自动重置offset
//earliest 在偏移量无效的情况下 消费者将从起始位置读取分区的记录
//latest 在偏移量无效的情况下 消费者将从最新位置读取分区的记录
props.put("auto.offset.reset","earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer=new KafkaConsumer<String,String>(props);
public void consume()
//消费消息
consumer.subscribe(Arrays.asList(TOPIC));
try
while (true)
//拉取records
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String,String> record: records)
System.out.printf("消费端"+"partition=%d, offset=%d, key=%s, value=%s",record.partition(),
record.offset(),record.key(),record.value());
System.out.println();
finally
consumer.close();
public static void main(String[] args)
new PartitionerConsumer().consume();
我们可以看一下
topic4下面有3个partition
分别运行生产者和消费者代码并查看测试结果:
四:kafka事务提交:
如果在发送时需要创建事务.可以使用kafkaTemplate的executeIntransaction方法来声明事务:
@GetMapping("transaction")
public void snedMessage7()
// 声明事务:后面报错消息不会发出去
// kafkaTemplate.executeInTransaction(operationsCallBack->
// operationsCallBack.send("topic1","test executeInTransaction");
// throw new RuntimeException("fail");
// );
//不声明事务:后面报错但前面消息已经发送成功了
kafkaTemplate.send("topic1","test executeInTransaction");
throw new RuntimeException("fail");
调用接口可以发现不声明事务时,后面报错但前面消息已经发送成功了
声明事务:
@GetMapping("transaction")
@Transactional
public void snedMessage7()
// 声明事务:后面报错消息不会发出去
kafkaTemplate以上是关于SpringBoot整合Kafka下的主要内容,如果未能解决你的问题,请参考以下文章
三.Kafka入门到精通-SpringBoot整合Kafka(上)
SpringBoot系列八:SpringBoot整合消息服务(SpringBoot 整合 ActiveMQSpringBoot 整合 RabbitMQSpringBoot 整合 Kafka)