kafka消费者接收分区测试
Posted PacosonSWJTU
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka消费者接收分区测试相关的知识,希望对你有一定的参考价值。
【README】
- 本文演示了当有新消费者加入组后,其他消费者接收分区情况;
- 本文还模拟了 broker 宕机的情况;
- 本文使用的是最新的 kafka3.0.0 ;
- 本文测试案例,来源于 消费者接收分区的5种模型,建议先看模型,refer2 https://blog.csdn.net/PacosonSWJTU/article/details/121853461https://blog.csdn.net/PacosonSWJTU/article/details/121853461
【1】kafka测试环境准备
1)kafka集群
- 3个broker,分别为 centos201, centos202, centos203 ,id分别为 1,2,3 ;
- topic, 3个分区,2个副本;
2)生产者代码;
public class MyProducer
public static void main(String[] args)
/* 1.创建kafka生产者的配置信息 */
Properties props = new Properties();
/*2.指定连接的kafka集群, broker-list */
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");
/*3.ack应答级别*/
props.put(ProducerConfig.ACKS_CONFIG, "all");
/*4.重试次数*/
props.put(ProducerConfig.RETRIES_CONFIG, 0);
/*5.批次大小,一次发送多少数据,当数据大于16k,生产者会发送数据到 kafka集群 */
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K);
/*6.等待时间, 等待时间超过1毫秒,即便数据没有大于16k, 也会写数据到kafka集群 */
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// 超时时间
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 3000);
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000);
/*7. RecordAccumulator 缓冲区大小*/
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M);
/*8. key, value 的序列化类 */
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
/** 设置压缩算法 */
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
/** 设置拦截器 */
// props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(TimeInterceptor.class.getName()));
/** 设置阻塞超时时间 */
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3600 * 1000);
/* 9.创建生产者对象 */
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
/* 10.发送数据 */
int order = 1;
for (int i = 0; i < 100000; i++)
for (int j = 0; j < 3; j++)
Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("hello10",j, "", String.format("[%s] ", order++) + " > " + DataFactory.INSTANCE.genChar(5)));
try
System.out.println("[生产者] 分区【" + future.get().partition() + "】-offset【" + future.get().offset() + "】");
catch (Exception e)
try
TimeUnit.MILLISECONDS.sleep(500);
catch (InterruptedException e)
e.printStackTrace();
/* 11.关闭资源 */
producer.close();
System.out.println("kafka生产者写入数据完成");
生产者,会向每个分区发送1条消息,发送完成后,睡眠500ms; 共计循环 10w次; 共计5w秒;计划耗时 10+小时;(这里其他同学可以自行设置为其他值)
3)4个消费者;编号为1,2,3,4
public class MyConsumer1
public static void main(String[] args)
/* 1.创建消费者配置信息 */
Properties props = new Properties();
/*2.给配置信息赋值*/
/*2.1连接的集群*/
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092,centos202:9092,centos203:9092");
/*2.2开启自动提交 */
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
/*2.3 自动提交的间隔时间*/
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
/*2.4 key value的反序列化 */
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
/*2.5 消费者组 */
props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello10G1"); // group.id
/*2.6 重置消费者的offset */
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 默认值是 lastest
/*2.7 关闭自动提交 */
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
/* 创建消费者 */
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
/* 订阅主题 */
consumer.subscribe(Arrays.asList("hello10"));
/* 指定消费者的每个分区从偏移量1开始读取,下面的poll方法就会从位置1开始消费消息 */
// for (TopicPartition partition : consumer.assignment())
// consumer.seek(partition, 1);
//
// 消费消息
try
// 死循环
while(!Thread.interrupted())
try
System.out.println(DateUtils.getNowTimestamp() + " 消费者1-等待消费消息");
TimeUnit.MILLISECONDS.sleep(100);
catch (InterruptedException e)
e.printStackTrace();
// 消费消息-获取数据
ConsumerRecords<String, String> consumerRds = consumer.poll(100);
// 遍历 ConsumerRecords
for(ConsumerRecord<String, String> rd : consumerRds)
System.out.println("消费者1-分区【" + rd.partition() + "】offset【" + rd.offset() + "】 -> " + DateUtils.getNowTimestamp() + rd.key() + "--" + rd.value());
consumer.commitSync(); // 同步提交
finally
// 记得关闭消费者
consumer.close();
这样的消费者有4个,分别编号为 消费者 1,2,3,4 ;我的意思是4个不同的消费者类,以便打印日志标识;
我的消费者消费的是 lastest 最新产生的消费,这里可以自行设置为其他值,如 earlies;
4)添加日志配置,不打印 debug日志(因为kafka消费者debug日志很多)
新建 logback.xml ,设置仅打印info以上级别日志;
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<logger name="org.apache.kafka.clients" level="info" />
</configuration>
5)为了直观展示消费详情,我会用命令行启动4个不同消费者,而用idea启动生产者;但编译都通过maven;
【2】kafka测试
【2.1】测试1:当有新消费者加入后,整个消费者组成员接收分区情况;
写在前面: 文末会po出命令行启动消费者的命令及参数;
消费者接收分区消息模型,参见
step0)启动 生产者,发送消息到kafka;
step2)命令行启动消费者1,消息消费日志如下:
消费者1接收了3个分区消息;
step2)命令行启动消费者2,群组消费日志如下:
消费者1接收了个分区2消息;
消费者2接收了分区0和分区2的消息;
step3)命令行继续启动消费者3,群组消费日志如下:
消费者1接收了个分区2消息;
消费者2接收了分区0的消息;
消费者3接收了分区1的消息;
step4)命令行继续启动消费者4, 日志如下:
消费者1接收了个分区2消息;
消费者2接收了分区0的消息;
消费者3接收了分区1的消息;消费者4空闲;
【2】 模拟kafka broker 宕机
写在前面,模拟宕机前先查看 topic 详情
(图1)
step1) 停止掉 201 broker的服务
情况1:topic的分区没有受影响,但leader 副本选举为3,比较本图和图1,看差别;
情况2:所有消费者全部阻塞,直到超时全部抛出异常;
等待 kafka集群的控制器,首领副本选择完成后,又可以接收消费者请求;
- 补充1: 这里有一小段时间延时,即当有broker宕机后,需要重新选举控制器,首领副本等;而且会发生分区再均衡;
step2)重启 201;消费日志:如下:
消费者1接收了个分区1消息;
消费者2接收了分区2的消息;
消费者3空闲;
消费者4接收了分区0的消息;
之所以 消费者3空闲,消费者4忙碌,是因为 broker 动态上下线,导致了分区再均衡使得分区所有权从消费者A转到消费者B(201宕机前,是消费者3忙碌,消费者4空闲);
【小结】
1,要保证kafka消息可靠性,需要 生产者,broker,消费者3方的全力配合;
2,本文这里仅记录了一部分 kafka集群异常的情况;
【附录】
命令行启动消费者命令及参数;仅供参考;因为路径肯定不一样;
其实,这是拷贝idea的执行日志里的命令,如下:
java -classpath D:\\Java\\jdk1.8.0_172\\jre\\lib\\charsets.jar;D:\\Java\\jdk1.8.0_172\\jre\\lib\\deploy.jar;D:\\Java\\jdk1.8.0_172\\jre\\lib\\ext\\access-bridge-64.jar;D:\\Java\\jdk1.8.0_172\\jre\\lib\\ext\\cldrdata.jar;D:\\Java\\jdk1.8.0_172\\jre\\lib\\ext\\dnsns.jar;D:\\Java\\jdk1.8.0_172\\jre\\lib\\ext\\jaccess.jar;D:\\Java\\jdk1.8.0_172\\jre\\lib\\ext\\jfxrt.jar;D:\\Java\\jdk1.8.0_172\\jre\\lib\\ext\\localedata.jar;D:\\Java\\jdk1.8.0_172\\jre\\lib\\ext\\nashorn.jar;D:\\Java\\jdk1.8.0_172\\jre\\lib\\ext\\sunec.jar;D:\\Java\\jdk1.8.0_172\\jre\\lib\\ext\\sunjce_provider.jar;D:\\Java\\jdk1.8.0_172\\jre\\lib\\ext\\sunmscapi.jar;D:\\Java\\jdk1.8.0_172\\jre\\lib\\ext\\sunpkcs11.jar;D:\\Java\\jdk1.8.0_172\\jre\\lib\\ext\\zipfs.jar;D:\\Java\\jdk1.8.0_172\\jre\\lib\\javaws.jar;D:\\Java\\jdk1.8.0_172\\jre\\lib\\jce.jar;D:\\Java\\jdk1.8.0_172\\jre\\lib\\jfr.jar;D:\\Java\\jdk1.8.0_172\\jre\\lib\\jfxswt.jar;D:\\Java\\jdk1.8.0_172\\jre\\lib\\jsse.jar;D:\\Java\\jdk1.8.0_172\\jre\\lib\\management-agent.jar;D:\\Java\\jdk1.8.0_172\\jre\\lib\\plugin.jar;D:\\Java\\jdk1.8.0_172\\jre\\lib\\resources.jar;D:\\Java\\jdk1.8.0_172\\jre\\lib\\rt.jar;D:\\workbench_idea\\study4vw\\vwstudy22\\target\\classes;D:\\software_cluster\\mvn_repo\\.m2\\repository\\org\\springframework\\boot\\spring-boot-starter-web\\2.5.4\\spring-boot-starter-web-2.5.4.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\org\\springframework\\boot\\spring-boot-starter\\2.5.4\\spring-boot-starter-2.5.4.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\org\\springframework\\boot\\spring-boot\\2.5.4\\spring-boot-2.5.4.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\org\\springframework\\boot\\spring-boot-autoconfigure\\2.5.4\\spring-boot-autoconfigure-2.5.4.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\org\\springframework\\boot\\spring-boot-starter-logging\\2.5.4\\spring-boot-starter-logging-2.5.4.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\ch\\qos\\logback\\logback-classic\\1.2.5\\logback-classic-1.2.5.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\ch\\qos\\logback\\logback-core\\1.2.5\\logback-core-1.2.5.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\org\\apache\\logging\\log4j\\log4j-to-slf4j\\2.14.1\\log4j-to-slf4j-2.14.1.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\org\\apache\\logging\\log4j\\log4j-api\\2.14.1\\log4j-api-2.14.1.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\org\\slf4j\\jul-to-slf4j\\1.7.32\\jul-to-slf4j-1.7.32.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\jakarta\\annotation\\jakarta.annotation-api\\1.3.5\\jakarta.annotation-api-1.3.5.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\org\\springframework\\spring-core\\5.3.9\\spring-core-5.3.9.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\org\\springframework\\spring-jcl\\5.3.9\\spring-jcl-5.3.9.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\org\\yaml\\snakeyaml\\1.28\\snakeyaml-1.28.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\org\\springframework\\boot\\spring-boot-starter-json\\2.5.4\\spring-boot-starter-json-2.5.4.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\com\\fasterxml\\jackson\\core\\jackson-databind\\2.12.4\\jackson-databind-2.12.4.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\com\\fasterxml\\jackson\\core\\jackson-annotations\\2.12.4\\jackson-annotations-2.12.4.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\com\\fasterxml\\jackson\\core\\jackson-core\\2.12.4\\jackson-core-2.12.4.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\com\\fasterxml\\jackson\\datatype\\jackson-datatype-jdk8\\2.12.4\\jackson-datatype-jdk8-2.12.4.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\com\\fasterxml\\jackson\\datatype\\jackson-datatype-jsr310\\2.12.4\\jackson-datatype-jsr310-2.12.4.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\com\\fasterxml\\jackson\\module\\jackson-module-parameter-names\\2.12.4\\jackson-module-parameter-names-2.12.4.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\org\\springframework\\boot\\spring-boot-starter-tomcat\\2.5.4\\spring-boot-starter-tomcat-2.5.4.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\org\\apache\\tomcat\\embed\\tomcat-embed-core\\9.0.52\\tomcat-embed-core-9.0.52.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\org\\apache\\tomcat\\embed\\tomcat-embed-el\\9.0.52\\tomcat-embed-el-9.0.52.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\org\\apache\\tomcat\\embed\\tomcat-embed-websocket\\9.0.52\\tomcat-embed-websocket-9.0.52.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\org\\springframework\\spring-web\\5.3.9\\spring-web-5.3.9.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\org\\springframework\\spring-beans\\5.3.9\\spring-beans-5.3.9.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\org\\springframework\\spring-webmvc\\5.3.9\\spring-webmvc-5.3.9.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\org\\springframework\\spring-aop\\5.3.9\\spring-aop-5.3.9.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\org\\springframework\\spring-context\\5.3.9\\spring-context-5.3.9.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\org\\springframework\\spring-expression\\5.3.9\\spring-expression-5.3.9.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\org\\apache\\kafka\\kafka-clients\\3.0.0\\kafka-clients-3.0.0.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\com\\github\\luben\\zstd-jni\\1.5.0-2\\zstd-jni-1.5.0-2.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\org\\lz4\\lz4-java\\1.7.1\\lz4-java-1.7.1.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\org\\xerial\\snappy\\snappy-java\\1.1.8.1\\snappy-java-1.1.8.1.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\org\\slf4j\\slf4j-api\\1.7.32\\slf4j-api-1.7.32.jar;D:\\software_cluster\\mvn_repo\\.m2\\repository\\org\\slf4j\\slf4j-simple\\1.7.25\\slf4j-simple-1.7.25.jar kafka.consumer.MyConsumer2
以上是关于kafka消费者接收分区测试的主要内容,如果未能解决你的问题,请参考以下文章