Kafaka 总结

Posted tesla-turing

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafaka 总结相关的知识,希望对你有一定的参考价值。

Kafka是一个分布式的Streaming处理平台,Kafka可以用于数据库中数据的导入导出,也可以用于实时流的处理,但是Kafka最核心的功能就是作为分布式的消息中间件。
Kafka集群是由多个Broker Server组成的,消息的发送者称为Producer,消息的消费者称为Consumer,topic则是Kafka消息的发送、存储和消费中最核心的抽象,每一个Producer都需要指定将消息发往哪个topic,而Consumer则需要指定消费哪一个topic的数据,所以topic是连接Producer和Consumer的桥梁。
topic可以分成多个分区,这些分区都是分布式的均匀的分布在多个Broker Server上,每一个topic的每一个Partition都可以配置备份冗余存储在多个Broker Server上,这样可以提高数据的高可用性。每一个topic的数据都是按照每一个分区存储在Kafka Broker Server指定的存储文件中的,这个存储的时间默认是7天,过了7天这些数据将会被删除掉,这个7天当然可以配置。
Producer发送消息的时候只需要指定topic即可,那么一个topic可能有多个partition,那么Producer发送的一条数据到底发送到这个topic的哪一个partition中呢,这个就是Producer在发送消息时需要使用Partitioner来为发送的数据进行分区了,按照一定的规则来计算出将要发送的数据需要发往哪个分区,这个Partitioner默认是按照轮询的规则进行分区,当然可以自定义这个规则
Consumer消息消息的时候除了需要指定topic外,还需要指定这个Consumer属于哪一个Consumer Group。每一个Consumer Group消费topic所有的partition的数据,而属于一个Consumer Group的所有的Consumer平均消费同一个topic的所有partition的数据,每一个Consumer消费topic中的partitions数据的时候都是按照offset来消费的,这个offset就是消息在Kafka中topic的位置
 
技术图片

 

 技术图片

 

 技术图片

 

 Kafka基本术语 - Consumer

技术图片

 

 

一个topic的数据可以被多个Consumer消费:
1、Consumer是根据offset来消费topic中的Record的
2、offset是Consumer控制的,所以Consumer可以按照不同需求消费任何位置的数据,在数据存在的7天内
 
 
Consumer Group
每一个Consumer都被归为一个Consumer Group
一个Consumer Group可以包含一个或者多个Consumer
一个topic中的一条Record会被所有订阅了这个topic的Consumer Group消费
技术图片

 

 技术图片

 

 

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * Created by tangweiqun on 2017/12/23.
 */
public class SimpleComsumerGroup1 
    public static void main(String[] args) 
        Properties props = new Properties();
        props.put("bootstrap.servers", "master:9092"); 
        props.put("group.id", "group1");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("test-group"));
        while (true) 
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) 
                System.out.printf("offset = %d, key = %s, value = %s, topic = %s, partition = %d",
                        record.offset(), record.key(), record.value(), record.topic(), record.partition());
                System.out.println();
            
        
    

  

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * Created by tangweiqun on 2017/12/23.
 */
public class SimpleComsumerGroup2 
    public static void main(String[] args) 
        Properties props = new Properties();
        props.put("bootstrap.servers", "master:9092");
        props.put("group.id", "group2");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("test-group"));
        while (true) 
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) 
                System.out.printf("offset = %d, key = %s, value = %s, topic = %s, partition = %d",
                        record.offset(), record.key(), record.value(), record.topic(), record.partition());
                System.out.println();
            
        
    

  

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class SimpleProducer 
    public static void main(String[] args) 
        Properties props = new Properties();
        props.put("bootstrap.servers", "master:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("batch.size", "10");    

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) 
            producer.send(new ProducerRecord<String, String>("test-group",
                    Integer.toString(i), Integer.toString(i)));
        

        producer.close();
    

  

以上是关于Kafaka 总结的主要内容,如果未能解决你的问题,请参考以下文章

springboot整合rabbitMQ

kafaka quickstart

kafaka

MQ之主流MQ:kafaka+RocketMQ+RabbitMQ对比

消息队列(MQ)与kafaka概述(Filebeat+Kafka+ELK部署)

Kafaka详细介绍机制原理