Kafka,RabbitMQ,RockedMQ真实应用开发大汇总1

Posted 在南邮学到秃头

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka,RabbitMQ,RockedMQ真实应用开发大汇总1相关的知识,希望对你有一定的参考价值。

Kafka,RabbitMQ,RockedMQ实际应用大汇总1.Kafka

  • 本文结合官网用例,记载了三大主流mq的实例以及实际运用。
  • 本文不涉及相关环境的安装与配置,涉及较为全面的代码(包括配置文件及maven)
  • 本文直接上代码及用例,适合对mq已经学过一遍或了解过的同学进行学习&复习,可加入生产环境。
  • 关于各种mq的介绍及对比可以参照我之前的文章
  • 其实mq的设计思想都差不多,可以细细感受一下mq的设计理念以及基本的服务对象。
  • 如果有问题,欢迎留言区或私信进行交流。
  • 虽然已经挑重点代码拿出来了,但是一篇文章里写三个mq还是很长,因此还是分为三篇记录吧,这是第一篇,主讲kafka

一:Kafka

  • 基本/核心概念

    • Broker
      Kafka的服务端程序,可以认为一个mq节点就是一个broker
      broker存储topic的数据

    • Producer生产者
      创建消息Message,然后发布到MQ中
      该角色将消息发布到Kafka的topic中

    • Consumer消费者:
      消费队列里面的消息

    • ConsumerGroup消费者组
      同个topic, 广播发送给不同的group,一个group中只有一个consumer可以消费此消息

    • Topic
      每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,主题的意思

    • Partition分区
      kafka数据存储的基本单元,topic中的数据分割为一个或多个partition,每个topic至少有一个partition,是有序的
      一个Topic的多个partitions, 被分布在kafka集群中的多个server上
      消费者数量 <=小于或者等于Partition数量

    • Replication 副本(备胎)
      同个Partition会有多个副本replication ,多个副本的数据是一样的,当其他broker挂掉后,系统可以主动用副本提供服务
      默认每个topic的副本都是1(默认是没有副本,节省资源),也可以在创建topic的时候指定
      如果当前kafka集群只有3个broker节点,则replication-factor最大就是3了,如果创建副本为4,则会报错

    • ReplicationLeader、ReplicationFollower
      Partition有多个副本,但只有一个replicationLeader负责该Partition和生产者消费者交互
      ReplicationFollower只是做一个备份,从replicationLeader进行同步

    • ReplicationManager
      负责Broker所有分区副本信息,Replication 副本状态切换

    • offset
      每个consumer实例需要为他消费的partition维护一个记录自己消费到哪里的偏移offset
      kafka把offset保存在消费端的消费者组里

  • kafka特点

    • 多订阅者
      一个topic可以有一个或者多个订阅者
      每个订阅者都要有一个partition,所以订阅者数量要少于等于partition数量
      高吞吐量、低延迟: 每秒可以处理几十万条消息

    • 高并发:几千个客户端同时读写

    • 容错性:多副本、多分区,允许集群中节点失败,如果副本数据量为n,则可以n-1个节点失败

    • 扩展性强:支持热扩展

  • 实例代码(jdk11+kafka2.8)

    • topic管理中心
package net.xdclass.kafkatest;

import org.apache.kafka.clients.admin.*;
import org.junit.jupiter.api.Test;

import java.util.*;
import java.util.concurrent.ExecutionException;

/**
 * @Author NJUPT wly
 * @Date 2021/8/14 12:15 上午
 * @Version 1.0
 */
public class KafkaAdminTest {
    private static final String TOPIC_NAME = "xdclass-sp-topic-1";
    /**
     * 设置admin客户端
     */
    public static AdminClient initAdminClient(){
        Properties properties = new Properties();
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");

        return AdminClient.create(properties);
    }

    @Test
    public void createTopicTest(){
        AdminClient adminClient = initAdminClient();

        //指定分区数据,副本数量
        NewTopic newTopic = new NewTopic(TOPIC_NAME,5,(short) 1);
        CreateTopicsResult result =  adminClient.createTopics(Collections.singletonList(newTopic));
        try {
            //future 等待创建
            result.all().get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void listTopicTest() throws ExecutionException, InterruptedException {
        AdminClient adminClient = initAdminClient();

        ListTopicsOptions options =  new ListTopicsOptions();
        options.listInternal(true);

        ListTopicsResult listTopicsResult = adminClient.listTopics(options);
        Set<String> topics = listTopicsResult.names().get();
        for (String name : topics){
            System.out.println(name);
        }
    }

    @Test
    public void delTopicTest() throws ExecutionException, InterruptedException {
        AdminClient adminClient =initAdminClient();

        DeleteTopicsResult result = adminClient.deleteTopics(Collections.singletonList("xdclass-sp-topic"));
        result.all().get();

    }

    @Test
    public void detailTopicTest() throws ExecutionException, InterruptedException {
        AdminClient adminClient = initAdminClient();

        DescribeTopicsResult result = adminClient.describeTopics(Collections.singletonList(TOPIC_NAME));
        Map<String,TopicDescription> stringTopicDescriptionMap = result.all().get();
        Set<Map.Entry<String,TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
        entries.forEach((entry)-> System.out.println("name:"+entry.getKey()+",des"+entry.getValue()));
    }

    @Test
    public void incrPartitionTest() throws ExecutionException, InterruptedException {
        AdminClient adminClient = initAdminClient();
        NewPartitions newPartitions = NewPartitions.increaseTo(5);
        Map<String,NewPartitions> map = new HashMap<>();
        map.put(TOPIC_NAME,newPartitions);
        CreatePartitionsResult result = adminClient.createPartitions(map);
        result.all().get();
    }
}

  • 消费者实例
package net.xdclass.kafkatest;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.*;

/**
 * @Author NJUPT wly
 * @Date 2021/8/15 10:43 上午
 * @Version 1.0
 */
public class KafkaConsumerTest {
    private static final String TOPIC_NAME = "xdclass-sp-topic-1";

    public static Properties getProperties(){
        Properties properties = new Properties();

        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"xdclass-g-1");

        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

//        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
//        properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");



        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        return properties;
    }

    @Test
    public void simpleConsumerTest(){
        Properties properties = getProperties();
        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        kafkaConsumer.subscribe(Collections.singleton(TOPIC_NAME));

        while (true){
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord record : records){
                System.err.printf("topic=%s,offset=%d,key=%s %n",record.topic(),record.offset(),record.key(),record.value());
            }
//            kafkaConsumer.commitSync();
            if (!records.isEmpty()){
                kafkaConsumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                        if (exception == null){
                            System.err.println("手动提交success"+offsets.toString());
                        } else {
                            System.err.println("手动提交fail"+offsets.toString());
                        }
                    }
                });
            }
        }
    }

}
  • 生产者实例
package net.xdclass.kafkatest;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
 * @Author NJUPT wly
 * @Date 2021/8/14 4:12 下午
 * @Version 1.0
 */
public class KafkaProductTest {
    private static final String TOPIC_NAME = "xdclass-sp-topic-1";

    public static Properties getProperties(){

        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
//        properties.put("bootstrap.server","127.0.0.1:9092");
        properties.put("acks","all");
        properties.put("retries",0);
        properties.put("batch",16384);
        properties.put("linger.ms",1);
        properties.put("buffer.memory",33554432);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        return properties;
    }

    @Test
    public void testSend(){
        Properties properties = getProperties();
        Producer<String,String> producer = new KafkaProducer<String, String>(properties);
        for (int i=0 ; i<3 ; i++){
            Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC_NAME,"xdclass-key"+i,"xdclass-value"+i));
            try {
                RecordMetadata metadata = future.get();
                System.out.println("发送状态"+metadata.toString());
            } catch (ExecutionException | InterruptedException e) {
                e.printStackTrace();
            }
        }
        producer.close();


    }
    @Test
    public void testSendCallBack(){
        Properties properties = getProperties();
        Producer<String,String> producer = new KafkaProducer<String, String>(properties);
        for (int i=0 ; i<3 ; i++){
            Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC_NAME, "xdclass-key" + i, "xdclass-value" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null){
                        System.out.println("发送状态"+recordMetadata.toString());
                    } else {
                        e.printStackTrace();
                    }
                }
            });
            try {
                RecordMetadata metadata = future.get();
                System.out.println("发送状态"+metadata.toString());
            } catch (ExecutionException | InterruptedException e) {
                e.printStackTrace();
            }
        }
        producer.close();
    }
    @Test
    public void testSendPartition(){
        Properties properties = getProperties();
        Producer<String,String> producer = new KafkaProducer<String, String>(properties);
        for (int i=0 ; i<3 ; i++){
            Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC_NAME,4,"xdclass-key"+i,"xdclass-value"+i));
            try {
                RecordMetadata metadata = future.get();
                System.out.println("发送状态"+metadata.toString());
            } catch (ExecutionException | InterruptedException e) {
                e.printStackTrace();
            }
        }
        producer.close();

    }

    @Test
    public void testSendP(){
        Properties properties = getProperties();
        properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG,"net.xdclass.kafkatest.config.PartitionerT");
        Producer<String,String> producer = new KafkaProducer<String, String>(properties);
        for (int i=0 ; i<3 ; i++){
            Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC_NAME, "xdclass" + i, "xdclass-value" + irabbitmq和kafka的区别

请问kafka和rabbitmq有什么区别啊?

为啥说rabbitmq 比kafka可靠

kafka与rabbitMQ选型比较

rabbitmq与kafka到底用哪个好

RabbitMQ和Kafka对比