kafka第三次课!
Posted teayear
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka第三次课!相关的知识,希望对你有一定的参考价值。
1,课程回顾
2,本章重点
springboot整合kafka
springcloud整合kafka
3,具体内容
3.1 springboot整合kafka
3.1.1 pom.xml添加jar
org.springframework.kafka
spring-kafka
2.8.1
#生产者配置
#spring整合kafka配置
#连接集群配置
spring.kafka.bootstrap-servers=cluster1:9092,cluster2:9092,cluster3:9092
重试次数
spring.kafka.producer.retries=3
应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=-1
批量大小
spring.kafka.producer.batch-size=16384
提交延时
spring.kafka.producer.properties.linger.ms=10
当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
自定义分区器
spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
#自定义topic名称
topicName=topic-deptinfo
#消费者配置
#springboot整合 kafka
#消费者配置
#连接集群配置
spring.kafka.bootstrap-servers=cluster1:9092,cluster2:9092,cluster3:9092
默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
当kafka中没有初始offset或offset超出范围时将自动重置offset
earliest:重置为分区中最小的offset;
latest:重置为分区中最新的offset(消费分区中新产生的数据);
none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
设置批量消费
spring.kafka.listener.type=batch
批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=50
#自定义topic名称
topicName=topic-deptinfo
3.1.3 生成者代码
package com.aaa.sbm.task;
import com.aaa.sbm.entity.Dept;
import com.aaa.sbm.service.DeptService;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import javax.annotation.Resource;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
- @ fileName:TimedSendDeptInfoTask
- @ description:
- @ author:zhz
- @ createTime:2022/1/13 9:37
- @ version:1.0.0
/
@Component //不在3层之内,交给IOC处理
@EnableScheduling //开启定时任务
@EnableAsync //开启异步处理 可以在任务方法上使用@Async 该方法多线程处理时,可以异步处理,提高执行效率
@Slf4j
public class TimedSendDeptInfoTask
//spring用到了哪些涉及模式 模板模式(封装出来一个通用的工具模板,供你完成什么功能,简化整个操作流程)
@Resource
private KafkaTemplate kafkaTemplate;
//juc包下线程安全的类,可以实现多线程同步自增
private AtomicInteger atomicInteger =new AtomicInteger();
//注入topic名称
@Value(“$topicName”)
private String topicN;
/ @Resource
private DeptService deptService;/
/ @Resource
private RestTemplate restTemplate; / //HttpClient
/@Resource //模板模式
private RedisTemplate redisTemplate;*/
/**- 定制执行任务
- 每隔3秒 使用多线程发送5条部门信息到kafka中
/
// cron=“秒 分 时 日 月 周” / 每 - 范围(3-10 每分钟的3秒到10秒每秒执行一次) , 选项 (3,10,15 每分钟第3秒第10秒第15秒执行)
@Scheduled(cron = "/3 * * * * ?")
public void timedExecute()
//实例化固定线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);
//lambda表达式 简化代码写法 ()->sendDeptInfo() 左边是参数 右边是执行业务
/* executorService.execute(new Runnable()
@Override
public void run()
sendDeptInfo();
System.out.println(“1”);
System.out.println(“2”);
);/
/ executorService.execute(()->
sendDeptInfo();
System.out.println(“1”);
System.out.println(“2”);
);*/
//启动5个线程执行
executorService.execute(()->sendDeptInfo());
executorService.execute(()->sendDeptInfo());
executorService.execute(()->sendDeptInfo());
executorService.execute(()->sendDeptInfo());
executorService.execute(()->sendDeptInfo());
//关闭线程池
executorService.shutdown();
/** - 发送部门信息
*/
// @Async //异步处理,提高效率
public void sendDeptInfo()
log.info(“线程信息为:”+Thread.currentThread().getName()+“,正在执行。。。。。。。。。。。。。。”);
int getAndIncrement = atomicInteger.getAndIncrement();
Dept dept =new Dept(getAndIncrement,“dev”+getAndIncrement,“zz”+getAndIncrement);
log.info(“要发送的部门信息为:”+dept);
//发送部门信息到kafka中 一定要是字符串格式
kafkaTemplate.send(topicN, JSON.toJSONString(dept));
3.1.4 消费者代码
package com.aaa.sbm.util;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
-
@ fileName:KafkaConsumer
-
@ description:工具类用来监控topic-deptinof,不停的获取message
-
@ author:zhz
-
@ createTime:2022/1/13 10:26
-
@ version:1.0.0
*/
@Component
@Slf4j
public class KafkaConsumer/**
- 消费消息方法 借助 @KafkaListener指定消费的topic 如果该topic有信息都回被拉取pull 到参数中
- @param record
*/
@KafkaListener(topics = “$topicName”) //监听注解 监听指定的topic
public void pullKafkaMsg(ConsumerRecord<?,?> record)
//jdk8之后封装的专门处理空值一个类,有效防止空指针异常
Optional<?> optional = Optional.ofNullable(record.value());
// isPresent等同于if(record!=null)
if(optional.isPresent())
log.info(“接受到的信息为:”+ record);
log.info(“接受到的部门信息为:”+ optional.get());
3.1.5 测试
消费消息
启动消费者项目,观察控制台
生产信息
直接启动生成者,观察控制台
3.2 springcloud整合kafka(以121讲课项目为例子)
3.2.1 添加jar
父项目:
org.springframework.kafka
spring-kafka
2.2.14.RELEASE
<!-- fastjson的jar包-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.18</version>
</dependency>
注意:这里的springboot2.1.11只可以和spring-kafka2.2.*的版本匹配,否则会报异常
微服务:
org.projectlombok
lombok
org.springframework.kafka
spring-kafka
com.alibaba
fastjson
3.2.2 生成者配置application.properties
图片: https://shimo.im/fake.png
#springboot 整合kafka
#Kafka集群配置,注意如果集群搭建时用的是名字 需要配置对应的主机名称 C:\\Windows\\System32\\drivers\\etc\\hosts
spring.kafka.bootstrap-servers=kafka1:9092,kafka2:9092,kafka3:9092
#生产者配置
重试次数
spring.kafka.producer.retries=0
应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
批量大小
spring.kafka.producer.batch-size=16384
提交延时
spring.kafka.producer.properties.linger.ms=0
当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
自定义分区器
spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
3.2.3 消费者配置application.properties
#springboot 整合kafka
#Kafka集群配置,注意如果集群搭建时用的是名字 需要配置对应的主机名称 C:\\Windows\\System32\\drivers\\etc\\hosts
spring.kafka.bootstrap-servers=kafka1:9092,kafka2:9092,kafka3:9092
#消费者配置
默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
当kafka中没有初始offset或offset超出范围时将自动重置offset
earliest:重置为分区中最小的offset;
latest:重置为分区中最新的offset(消费分区中新产生的数据);
none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
设置批量消费
spring.kafka.listener.type=batch
批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=50
3.2.4 生产者代码
图片: https://shimo.im/fake.png
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
//发送消息方法
@GetMapping(“productOrder”)
public String send()
Order order =new Order();
order.setId(100);
order.setMemberUsername(“测试生产者”);
order.setShopId(1001);
//log.info(“+++++++++++++++++++++ message = ”, JSON.toJSONString(dept));
//topic-dept为主题
kafkaTemplate.send(“topic-order”, JSON.toJSONString(order));
return “suc”;
3.2.5 消费者代码
图片: https://shimo.im/fake.png
package com.aaa.ss.util;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
-
@ fileName:KafkaConsumer
-
@ description:
-
@ author:zhz
-
@ createTime:2021/2/20 17:20
*/
@Component
@Slf4j
public class KafkaConsumer
@KafkaListener(topics = “topic-order”)
public void consumer(ConsumerRecord<?, ?> record)
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent())
Object message = kafkaMessage.get();
log.info(“----------------- record =” + record);
log.info(“------------------ message =” + message);
3.2.6 测试 1,生产者,地址栏请求(具体要看业务需求,讲课只位了测试效果) http://localhost:2221/order/productOrder 2,观察消费者项目
图片: https://shimo.im/fake.png
3,还可以使用命令查看后台topic
图片: https://shimo.im/fake.png
4,知识点总结
5,本章面试题
以上是关于kafka第三次课!的主要内容,如果未能解决你的问题,请参考以下文章