kafka第三次课!

Posted teayear

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka第三次课!相关的知识,希望对你有一定的参考价值。

1,课程回顾
2,本章重点
springboot整合kafka
springcloud整合kafka
3,具体内容
3.1 springboot整合kafka
3.1.1 pom.xml添加jar


org.springframework.kafka
spring-kafka
2.8.1

com.alibaba fastjson 1.2.79 注意:此处使用的springboot版本为2.4.1 kafka是编写课件时最新版本2.6.6,不是任意版本都兼容 3.1.2 配置文件application.properties #Kafka集群配置,注意如果集群搭建时用的是名字 需要配置对应的主机名称 C:\\Windows\\System32\\drivers\\etc\\hosts

#生产者配置

#spring整合kafka配置
#连接集群配置
spring.kafka.bootstrap-servers=cluster1:9092,cluster2:9092,cluster3:9092

重试次数

spring.kafka.producer.retries=3

应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)

spring.kafka.producer.acks=-1

批量大小

spring.kafka.producer.batch-size=16384

提交延时

spring.kafka.producer.properties.linger.ms=10

当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka

linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了

生产端缓冲区大小

spring.kafka.producer.buffer-memory = 33554432

Kafka提供的序列化和反序列化类

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

自定义分区器

spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner

#自定义topic名称
topicName=topic-deptinfo

#消费者配置
#springboot整合 kafka
#消费者配置
#连接集群配置
spring.kafka.bootstrap-servers=cluster1:9092,cluster2:9092,cluster3:9092

默认的消费组ID

spring.kafka.consumer.properties.group.id=defaultConsumerGroup

是否自动提交offset

spring.kafka.consumer.enable-auto-commit=true

提交offset延时(接收到消息后多久提交offset)

spring.kafka.consumer.auto.commit.interval.ms=1000

当kafka中没有初始offset或offset超出范围时将自动重置offset

earliest:重置为分区中最小的offset;

latest:重置为分区中最新的offset(消费分区中新产生的数据);

none:只要有一个分区不存在已提交的offset,就抛出异常;

spring.kafka.consumer.auto-offset-reset=latest

消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)

spring.kafka.consumer.properties.session.timeout.ms=120000

消费请求超时时间

spring.kafka.consumer.properties.request.timeout.ms=180000

Kafka提供的序列化和反序列化类

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

消费端监听的topic不存在时,项目启动会报错(关掉)

spring.kafka.listener.missing-topics-fatal=false

设置批量消费

spring.kafka.listener.type=batch

批量消费每次最多消费多少条消息

spring.kafka.consumer.max-poll-records=50

#自定义topic名称
topicName=topic-deptinfo

3.1.3 生成者代码
package com.aaa.sbm.task;
import com.aaa.sbm.entity.Dept;
import com.aaa.sbm.service.DeptService;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import javax.annotation.Resource;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**

  • @ fileName:TimedSendDeptInfoTask
  • @ description:
  • @ author:zhz
  • @ createTime:2022/1/13 9:37
  • @ version:1.0.0
    /
    @Component //不在3层之内,交给IOC处理
    @EnableScheduling //开启定时任务
    @EnableAsync //开启异步处理 可以在任务方法上使用@Async 该方法多线程处理时,可以异步处理,提高执行效率
    @Slf4j
    public class TimedSendDeptInfoTask
    //spring用到了哪些涉及模式 模板模式(封装出来一个通用的工具模板,供你完成什么功能,简化整个操作流程)
    @Resource
    private KafkaTemplate kafkaTemplate;
    //juc包下线程安全的类,可以实现多线程同步自增
    private AtomicInteger atomicInteger =new AtomicInteger();
    //注入topic名称
    @Value(“$topicName”)
    private String topicN;
    /
    @Resource
    private DeptService deptService;/
    /
    @Resource
    private RestTemplate restTemplate; / //HttpClient
    /
    @Resource //模板模式
    private RedisTemplate redisTemplate;*/
    /**
    • 定制执行任务
    • 每隔3秒 使用多线程发送5条部门信息到kafka中
      /
      // cron=“秒 分 时 日 月 周” / 每 - 范围(3-10 每分钟的3秒到10秒每秒执行一次) , 选项 (3,10,15 每分钟第3秒第10秒第15秒执行)
      @Scheduled(cron = "
      /3 * * * * ?")
      public void timedExecute()
      //实例化固定线程池
      ExecutorService executorService = Executors.newFixedThreadPool(5);
      //lambda表达式 简化代码写法 ()->sendDeptInfo() 左边是参数 右边是执行业务
      /* executorService.execute(new Runnable()
      @Override
      public void run()
      sendDeptInfo();
      System.out.println(“1”);
      System.out.println(“2”);

      );/
      /
      executorService.execute(()->
      sendDeptInfo();
      System.out.println(“1”);
      System.out.println(“2”);
      );*/
      //启动5个线程执行
      executorService.execute(()->sendDeptInfo());
      executorService.execute(()->sendDeptInfo());
      executorService.execute(()->sendDeptInfo());
      executorService.execute(()->sendDeptInfo());
      executorService.execute(()->sendDeptInfo());
      //关闭线程池
      executorService.shutdown();

      /**
    • 发送部门信息
      */
      // @Async //异步处理,提高效率
      public void sendDeptInfo()
      log.info(“线程信息为:”+Thread.currentThread().getName()+“,正在执行。。。。。。。。。。。。。。”);
      int getAndIncrement = atomicInteger.getAndIncrement();
      Dept dept =new Dept(getAndIncrement,“dev”+getAndIncrement,“zz”+getAndIncrement);
      log.info(“要发送的部门信息为:”+dept);
      //发送部门信息到kafka中 一定要是字符串格式
      kafkaTemplate.send(topicN, JSON.toJSONString(dept));

3.1.4 消费者代码
package com.aaa.sbm.util;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**

  • @ fileName:KafkaConsumer

  • @ description:工具类用来监控topic-deptinof,不停的获取message

  • @ author:zhz

  • @ createTime:2022/1/13 10:26

  • @ version:1.0.0
    */
    @Component
    @Slf4j
    public class KafkaConsumer

    /**

    • 消费消息方法 借助 @KafkaListener指定消费的topic 如果该topic有信息都回被拉取pull 到参数中
    • @param record
      */
      @KafkaListener(topics = “$topicName”) //监听注解 监听指定的topic
      public void pullKafkaMsg(ConsumerRecord<?,?> record)
      //jdk8之后封装的专门处理空值一个类,有效防止空指针异常
      Optional<?> optional = Optional.ofNullable(record.value());
      // isPresent等同于if(record!=null)
      if(optional.isPresent())
      log.info(“接受到的信息为:”+ record);
      log.info(“接受到的部门信息为:”+ optional.get());


3.1.5 测试
消费消息
启动消费者项目,观察控制台
生产信息
直接启动生成者,观察控制台

3.2 springcloud整合kafka(以121讲课项目为例子)
3.2.1 添加jar
父项目:


org.springframework.kafka
spring-kafka
2.2.14.RELEASE

        <!-- fastjson的jar包-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.18</version>
        </dependency>

注意:这里的springboot2.1.11只可以和spring-kafka2.2.*的版本匹配,否则会报异常
微服务:

org.projectlombok
lombok



org.springframework.kafka
spring-kafka



com.alibaba
fastjson

3.2.2 生成者配置application.properties
图片: https://shimo.im/fake.png

#springboot 整合kafka
#Kafka集群配置,注意如果集群搭建时用的是名字 需要配置对应的主机名称 C:\\Windows\\System32\\drivers\\etc\\hosts
spring.kafka.bootstrap-servers=kafka1:9092,kafka2:9092,kafka3:9092

#生产者配置

重试次数

spring.kafka.producer.retries=0

应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)

spring.kafka.producer.acks=1

批量大小

spring.kafka.producer.batch-size=16384

提交延时

spring.kafka.producer.properties.linger.ms=0

当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka

linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了

生产端缓冲区大小

spring.kafka.producer.buffer-memory = 33554432

Kafka提供的序列化和反序列化类

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

自定义分区器

spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner

3.2.3 消费者配置application.properties

#springboot 整合kafka
#Kafka集群配置,注意如果集群搭建时用的是名字 需要配置对应的主机名称 C:\\Windows\\System32\\drivers\\etc\\hosts
spring.kafka.bootstrap-servers=kafka1:9092,kafka2:9092,kafka3:9092

#消费者配置

默认的消费组ID

spring.kafka.consumer.properties.group.id=defaultConsumerGroup

是否自动提交offset

spring.kafka.consumer.enable-auto-commit=true

提交offset延时(接收到消息后多久提交offset)

spring.kafka.consumer.auto.commit.interval.ms=1000

当kafka中没有初始offset或offset超出范围时将自动重置offset

earliest:重置为分区中最小的offset;

latest:重置为分区中最新的offset(消费分区中新产生的数据);

none:只要有一个分区不存在已提交的offset,就抛出异常;

spring.kafka.consumer.auto-offset-reset=latest

消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)

spring.kafka.consumer.properties.session.timeout.ms=120000

消费请求超时时间

spring.kafka.consumer.properties.request.timeout.ms=180000

Kafka提供的序列化和反序列化类

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

消费端监听的topic不存在时,项目启动会报错(关掉)

spring.kafka.listener.missing-topics-fatal=false

设置批量消费

spring.kafka.listener.type=batch

批量消费每次最多消费多少条消息

spring.kafka.consumer.max-poll-records=50

3.2.4 生产者代码
图片: https://shimo.im/fake.png
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
//发送消息方法
@GetMapping(“productOrder”)
public String send()
Order order =new Order();
order.setId(100);
order.setMemberUsername(“测试生产者”);
order.setShopId(1001);
//log.info(“+++++++++++++++++++++ message = ”, JSON.toJSONString(dept));
//topic-dept为主题
kafkaTemplate.send(“topic-order”, JSON.toJSONString(order));
return “suc”;

3.2.5 消费者代码
图片: https://shimo.im/fake.png
package com.aaa.ss.util;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**

  • @ fileName:KafkaConsumer

  • @ description:

  • @ author:zhz

  • @ createTime:2021/2/20 17:20
    */
    @Component
    @Slf4j
    public class KafkaConsumer
    @KafkaListener(topics = “topic-order”)
    public void consumer(ConsumerRecord<?, ?> record)
    Optional<?> kafkaMessage = Optional.ofNullable(record.value());
    if (kafkaMessage.isPresent())
    Object message = kafkaMessage.get();
    log.info(“----------------- record =” + record);
    log.info(“------------------ message =” + message);


        3.2.6  测试
                1,生产者,地址栏请求(具体要看业务需求,讲课只位了测试效果)
                 http://localhost:2221/order/productOrder
                2,观察消费者项目
    

图片: https://shimo.im/fake.png
3,还可以使用命令查看后台topic
图片: https://shimo.im/fake.png

4,知识点总结
5,本章面试题

以上是关于kafka第三次课!的主要内容,如果未能解决你的问题,请参考以下文章

linux三周第三次课笔记

2018.3.28 二周第三次课

四周第三次课(2月28日)

六周第三次课(1月17日) 9.6/9.7 awk

四周第三次课(1月4日)

一周第三次课(12月13日)