Kafka,RabbitMQ,RockedMQ真实应用开发大汇总1
Posted 在南邮学到秃头
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka,RabbitMQ,RockedMQ真实应用开发大汇总1相关的知识,希望对你有一定的参考价值。
Kafka,RabbitMQ,RockedMQ实际应用大汇总1.Kafka
- 本文结合官网用例,记载了三大主流mq的实例以及实际运用。
- 本文不涉及相关环境的安装与配置,涉及较为全面的代码(包括配置文件及maven)
- 本文直接上代码及用例,适合对mq已经学过一遍或了解过的同学进行学习&复习,可加入生产环境。
- 关于各种mq的介绍及对比可以参照我之前的文章
- 其实mq的设计思想都差不多,可以细细感受一下mq的设计理念以及基本的服务对象。
- 如果有问题,欢迎留言区或私信进行交流。
- 虽然已经挑重点代码拿出来了,但是一篇文章里写三个mq还是很长,因此还是分为三篇记录吧,这是第一篇,主讲kafka。
一:Kafka
-
基本/核心概念
-
Broker
Kafka的服务端程序,可以认为一个mq节点就是一个broker
broker存储topic的数据 -
Producer生产者
创建消息Message,然后发布到MQ中
该角色将消息发布到Kafka的topic中 -
Consumer消费者:
消费队列里面的消息 -
ConsumerGroup消费者组
同个topic, 广播发送给不同的group,一个group中只有一个consumer可以消费此消息 -
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,主题的意思 -
Partition分区
kafka数据存储的基本单元,topic中的数据分割为一个或多个partition,每个topic至少有一个partition,是有序的
一个Topic的多个partitions, 被分布在kafka集群中的多个server上
消费者数量 <=小于或者等于Partition数量 -
Replication 副本(备胎)
同个Partition会有多个副本replication ,多个副本的数据是一样的,当其他broker挂掉后,系统可以主动用副本提供服务
默认每个topic的副本都是1(默认是没有副本,节省资源),也可以在创建topic的时候指定
如果当前kafka集群只有3个broker节点,则replication-factor最大就是3了,如果创建副本为4,则会报错 -
ReplicationLeader、ReplicationFollower
Partition有多个副本,但只有一个replicationLeader负责该Partition和生产者消费者交互
ReplicationFollower只是做一个备份,从replicationLeader进行同步 -
ReplicationManager
负责Broker所有分区副本信息,Replication 副本状态切换 -
offset
每个consumer实例需要为他消费的partition维护一个记录自己消费到哪里的偏移offset
kafka把offset保存在消费端的消费者组里
-
-
kafka特点
-
多订阅者
一个topic可以有一个或者多个订阅者
每个订阅者都要有一个partition,所以订阅者数量要少于等于partition数量
高吞吐量、低延迟: 每秒可以处理几十万条消息 -
高并发:几千个客户端同时读写
-
容错性:多副本、多分区,允许集群中节点失败,如果副本数据量为n,则可以n-1个节点失败
-
扩展性强:支持热扩展
-
-
实例代码(jdk11+kafka2.8)
- topic管理中心
package net.xdclass.kafkatest;
import org.apache.kafka.clients.admin.*;
import org.junit.jupiter.api.Test;
import java.util.*;
import java.util.concurrent.ExecutionException;
/**
* @Author NJUPT wly
* @Date 2021/8/14 12:15 上午
* @Version 1.0
*/
public class KafkaAdminTest {
private static final String TOPIC_NAME = "xdclass-sp-topic-1";
/**
* 设置admin客户端
*/
public static AdminClient initAdminClient(){
Properties properties = new Properties();
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
return AdminClient.create(properties);
}
@Test
public void createTopicTest(){
AdminClient adminClient = initAdminClient();
//指定分区数据,副本数量
NewTopic newTopic = new NewTopic(TOPIC_NAME,5,(short) 1);
CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(newTopic));
try {
//future 等待创建
result.all().get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
@Test
public void listTopicTest() throws ExecutionException, InterruptedException {
AdminClient adminClient = initAdminClient();
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
ListTopicsResult listTopicsResult = adminClient.listTopics(options);
Set<String> topics = listTopicsResult.names().get();
for (String name : topics){
System.out.println(name);
}
}
@Test
public void delTopicTest() throws ExecutionException, InterruptedException {
AdminClient adminClient =initAdminClient();
DeleteTopicsResult result = adminClient.deleteTopics(Collections.singletonList("xdclass-sp-topic"));
result.all().get();
}
@Test
public void detailTopicTest() throws ExecutionException, InterruptedException {
AdminClient adminClient = initAdminClient();
DescribeTopicsResult result = adminClient.describeTopics(Collections.singletonList(TOPIC_NAME));
Map<String,TopicDescription> stringTopicDescriptionMap = result.all().get();
Set<Map.Entry<String,TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
entries.forEach((entry)-> System.out.println("name:"+entry.getKey()+",des"+entry.getValue()));
}
@Test
public void incrPartitionTest() throws ExecutionException, InterruptedException {
AdminClient adminClient = initAdminClient();
NewPartitions newPartitions = NewPartitions.increaseTo(5);
Map<String,NewPartitions> map = new HashMap<>();
map.put(TOPIC_NAME,newPartitions);
CreatePartitionsResult result = adminClient.createPartitions(map);
result.all().get();
}
}
- 消费者实例
package net.xdclass.kafkatest;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.util.*;
/**
* @Author NJUPT wly
* @Date 2021/8/15 10:43 上午
* @Version 1.0
*/
public class KafkaConsumerTest {
private static final String TOPIC_NAME = "xdclass-sp-topic-1";
public static Properties getProperties(){
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"xdclass-g-1");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
// properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
// properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
return properties;
}
@Test
public void simpleConsumerTest(){
Properties properties = getProperties();
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
kafkaConsumer.subscribe(Collections.singleton(TOPIC_NAME));
while (true){
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records){
System.err.printf("topic=%s,offset=%d,key=%s %n",record.topic(),record.offset(),record.key(),record.value());
}
// kafkaConsumer.commitSync();
if (!records.isEmpty()){
kafkaConsumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception == null){
System.err.println("手动提交success"+offsets.toString());
} else {
System.err.println("手动提交fail"+offsets.toString());
}
}
});
}
}
}
}
- 生产者实例
package net.xdclass.kafkatest;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* @Author NJUPT wly
* @Date 2021/8/14 4:12 下午
* @Version 1.0
*/
public class KafkaProductTest {
private static final String TOPIC_NAME = "xdclass-sp-topic-1";
public static Properties getProperties(){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
// properties.put("bootstrap.server","127.0.0.1:9092");
properties.put("acks","all");
properties.put("retries",0);
properties.put("batch",16384);
properties.put("linger.ms",1);
properties.put("buffer.memory",33554432);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
return properties;
}
@Test
public void testSend(){
Properties properties = getProperties();
Producer<String,String> producer = new KafkaProducer<String, String>(properties);
for (int i=0 ; i<3 ; i++){
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC_NAME,"xdclass-key"+i,"xdclass-value"+i));
try {
RecordMetadata metadata = future.get();
System.out.println("发送状态"+metadata.toString());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
producer.close();
}
@Test
public void testSendCallBack(){
Properties properties = getProperties();
Producer<String,String> producer = new KafkaProducer<String, String>(properties);
for (int i=0 ; i<3 ; i++){
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC_NAME, "xdclass-key" + i, "xdclass-value" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null){
System.out.println("发送状态"+recordMetadata.toString());
} else {
e.printStackTrace();
}
}
});
try {
RecordMetadata metadata = future.get();
System.out.println("发送状态"+metadata.toString());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
producer.close();
}
@Test
public void testSendPartition(){
Properties properties = getProperties();
Producer<String,String> producer = new KafkaProducer<String, String>(properties);
for (int i=0 ; i<3 ; i++){
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC_NAME,4,"xdclass-key"+i,"xdclass-value"+i));
try {
RecordMetadata metadata = future.get();
System.out.println("发送状态"+metadata.toString());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
producer.close();
}
@Test
public void testSendP(){
Properties properties = getProperties();
properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG,"net.xdclass.kafkatest.config.PartitionerT");
Producer<String,String> producer = new KafkaProducer<String, String>(properties);
for (int i=0 ; i<3 ; i++){
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC_NAME, "xdclass" + i, "xdclass-value" + irabbitmq和kafka的区别