Kafka

Posted Mr.zhou_Zxy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka相关的知识,希望对你有一定的参考价值。

Kafka

1 消息队列——消息中间件

1.1 消息队列的作用

1.2 消息队列的概念——MQ

  • Message
在互联网中,多台设备产生通信的数据的总称:可以是视频、文本、音频等等。
  • Quene
一种特殊的线性表,满足先进先出的原则。

1.3 消息队列的种类

MQ分为两种:
P2P : peer to peer
Pub/Sub : 发布与订阅

1.3.1 peer to peer

1.3.2 pub/sub

1.3.3 二者之间的区别

共同点:
消息生产者将消息生产到队列,消息消费者从队列中消费消息。
不同点:
p2p:一个生产者生产的消息只能被一个消费者消费。打电话
pub/sub:每个消息都可以有多个消费者消费,说明消息存在队列中。所以他使用偏移量方式管理消息

1.3.4 常见的消息队列

RabbitMQ : erlang编写。支持负载均衡,数据持久化。pub/sub\\p2p

Redis : kv的nosql的缓存数据库,但是也支持pub/sub。对于短消息(小于10kb)的性能RabbitMQ 还好。

zeroMQ:轻量级的MQ。P2P

ActiveMQ:JMS实现,P2P,持久化,分布式事务

Kafka/Jafka:高性能跨语言分布式基于发布/订阅的全分布式支持数据持久化,也可以离线或者实时处理数据的。

RocketMQ:纯Java实现。发布/订阅,本地事务和分布式事务

2 Kafka快速入门

2.1 介绍

kafka是分布式的基于消息的发布-订阅的消息队列。LinkedIn(领英),Scala编写的

2.2 三大特点

  • 高吞吐量

可满足每秒百万级别的消息的生产和消费

  • 持久性

具备一套完整的消息的存储机制,可以确保消息数据的高效的安全的持久化

  • 分布式

既有扩展以及容错性。

2.3 kafka服务

  • topic : 主题,kafka处理的消息分为不同的分类,分类就是按照主题来划分。
  • broker:消息服务器的代理。kafka集群中的一个节点一般我都门都叫做一个broker;主要是用来存储消息。存在硬盘中。
  • partition:分区。Topic的在物理上的分组。一个topic在broker上被分为1个或者多个partition。分区在创建主题的时候指定的。
  • message:消息,通信的基本单位,每个消息属于某一个partition
  • Producer: 生产者,消息和数据都是由这个组件产生的,由它发送到kafka集群中的。
  • Consumer:消费者,消息和数据都是由这个组件来消费的。
  • Zookeeper: 他需要zk来做分布式协调

3 Kafka安装

3.1 安装

##1. 解压安装
[root@hadoop software]# tar -zxvf kafka_2.11-1.1.1.tgz -C /opt/apps/
[root@hadoop apps]# mv kafka_2.11-1.1.1/ kafka-1.1.1/
[root@hadoop kafka-1.1.1]# vi /etc/profile
## 自定义环境变量
export JAVA_HOME=/opt/apps/jdk1.8.0_261
export HADOOP_HOME=/opt/apps/hadoop-2.8.1
export HIVE_HOME=/opt/apps/hive-1.2.1
export HBASE_HOME=/opt/apps/hbase-1.2.1
export COLLECT_HOME=/opt/apps/collect-app
export FRP_HOME=/opt/apps/frp
export SCRIPT_HOME=/opt/apps/scripts
export SQOOP_HOME=/opt/apps/sqoop-1.4.7
export AZKABAN_HOME=/opt/apps/azkaban-solo-server
export SCALA_HOME=/opt/apps/scala-2.11.8
export SPARK_HOME=/opt/apps/spark-2.2.0
export KAFKA_HOME=/opt/apps/kafka-1.1.1

export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$HIVE_HOME/bin:$HBASE_HOME/bin
export PATH=$PATH:$COLLECT_HOME:$FRP_HOME:$SCRIPT_HOME:$SQOOP_HOME/bin:$KAFKA_HOME/bin
export CLASS_PATH=.:$JAVA_HOME/lib
export FLUME_HOME=/opt/apps/flume-1.9.0
export PATH=$PATH:/opt/apps/flume-1.9.0/bin:$AZKABAN_HOME/bin:$SCALA_HOME/bin:$SPARK_HOME/bin:$SPARK_HOME/sbin

##2. server.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1 ## kafka的broker id的值,是一个整数,在集群中不能出现重复
############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/opt/apps/kafka-1.1.1/logs ## kafka的日志文件的路径

############################# Zookeeper #############################
zookeeper.connect=hadoop:2181/kafka ## kafka的一些数据在zk中的目录

##3. 接下来是全分布式的操作
##3.1 拷贝,将本机的kafka-1.1.1目录拷贝到其他的节点上
scp -r $KAFKA_HOME/ hadoop2:/opt/apps
scp -r $KAFKA_HOME/ hadoop3:/opt/apps

##3.2 修改server.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2 ## kafka的broker id的值,是一个整数,在集群中不能出现重复

3.2 启动测试

##1. 如果是全分布式,请先启动zk的集群,如果不是全分布式请先启动kafka自带的zk的脚本。
[root@hadoop bin]# sh zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties
##2. 启动kafka的服务
[root@hadoop bin]# sh kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

##3. 测试zk客户端连接zk服务
[root@hadoop bin]# sh zookeeper-shell.sh localhost:2181

4 Kafka的基本操作

4.0 kafka在zookeeper中的目录说明

cluster

​ - id : {“version”:“1”,“id”:"_8TJvXJoQPqD1b2Vr-BVBA"} -> 包含了集群版本和集群的id

controller : {“version”:1,“brokerid”:1,“timestamp”:“1623914309134”} -> 控制partition的leader选举,topic的crud。当前集群中的brokerid=1的broker充当了controller的角色

controller_epoch :1。表示controller的纪元。表示controller的迭代。每当controller换一个broker,值自增1

brokers

​ -ids [1, 2, 3] -> 当前kafka中的broker的实例列表

​ -topics:[hadoop] --> 当前的kafka的主题列表

​ -seqid : 系统的序列id

consumers :

​ 老版本的kafka中适用于存放kafka的消费者信息,主要保存从就偏移量。

​ 新版本基本不用,消费的偏移量记录在集群中的broker节点的磁盘中,$KAFKA_HOME/logs/__consumer_offsets

config : 存放配置信息

4.1 Topic操作

4.1.1 创建主题

[root@hadoop bin]# kafka-topics.sh \\
> --create \\ ## 创建事件
> --topic hadoop \\ ## 主题名称
> --zookeeper hadoop/kafka \\ ## kafka关联zk的地址
> --partitions 3 \\ ## 分区数
> --replication-factor 1 ## 副本因子的个数必须要小于等于broker的实例的个数
Created topic "hadoop".

4.1.2 列举主题列表

[root@hadoop kafka-1.1.1]# kafka-topics.sh \\
> --list \\
> --zookeeper hadoop:2181/kafka
hadoop

4.1.3 查看主题详情


[root@hadoop kafka-1.1.1]# kafka-topics.sh \\
> --describe \\
> --topic hadoop \\
> --zookeeper hadoop:2181/kafka
Topic:hadoop    PartitionCount:3        ReplicationFactor:3     Configs:
        Topic: hadoop   Partition: 0    Leader: 1       Replicas: 1,3,2     Isr: 1,3,2
        Topic: hadoop   Partition: 1    Leader: 2       Replicas: 2,1,3     Isr: 2,1,3
        Topic: hadoop   Partition: 2    Leader: 3       Replicas: 3,2,1     Isr: 3,2,1
        
Partition: 当前的主题的分区号
Replicas:副本因子,当前kafka对应的分区所在的broker实例的brokerid
Leader:当前的kafka对应分区的broker中的leader,只有leader才负责处理读写请求
Isr:该分区存活的副本对用的broker的borkerid

4.1.4 修改主题

[root@hadoop kafka-1.1.1]# kafka-topics.sh \\
> --alter \\
> --topic hadoop \\
> --partitions 4 \\
> --zookeeper hadoop:2181/kafka

tip: 修改分区,只能+,不能-

4.1.5 删除主题

[root@hadoop kafka-1.1.1]# kafka-topics.sh \\
> --delete \\
> --topic hadoop \\
> --zookeeper hadoop:2181/kafka
Topic hadoop is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

4.2 测试kafka的生产与消费能力

4.2.1 开启生产端

[root@hadoop bin]# sh kafka-console-producer.sh \\
> --topic hadoop \\
> --broker-list hadoop:9092
> 

4.2.2 开启消费端

[root@hadoop bin]# sh kafka-console-consumer.sh \\
> --topic hadoop \\
> --bootstrap-server hadoop:9092

4.2.3 消费者消费总结

​ kafka消费者在消费数据的时候,一般都是分组的。这个分组叫做消费者组。不同组的消费相互之间不影响。相同组内的消费,同组内的偏移量相互影响;但是需要注意的是,如果你的分区是3个,那么你的消费者组内的消费者最优是3个,因为再多也没有用了,因为组内的消费者,一个消费者消费一个主题的分区。

[root@hadoop bin]# sh kafka-console-consumer.sh \\
> --topic hadoop \\
> --group default \\ ##指定到default的消费者组
> --bootstrap-server hadoop:9092 \\
> --partition 2 ## 消费分区2
> --offset ealiest ## 从什么位置开始消费

5 Kafka的API操作

5.1 导入依赖

<!-- kafka -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>${kafka.version}</version>
</dependency>

5.2 生产者和消费者

5.2.1 Demo1_Producer

package com.zxy.bigdata.kafka.day2;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.IOException;
import java.util.Properties;

public class Demo1_Producer {
    public static void main(String[] args) throws IOException, InterruptedException {
        // 1.创建配置属性对象
        Properties properties = new Properties();
        properties.load(Demo1_Producer.class.getClassLoader().getResourceAsStream("producer.properties"));

        /* 2.创建生产者对象
         * K : 向topic发送的消息的key的类型
         * V : 向topic发送的消息的value的类型
         */
        Producer<Integer, String> producer = new KafkaProducer(properties);
        //3. 发送消息
        for(int start = 0; start < 20; start++) {
            ProducerRecord<Integer, String> record = new ProducerRecord<>("hadoop", "ni dead bu dead");
            producer.send(record);
        }

        Thread.sleep(10000);
        //4 释放资源
        producer.close();
    }
}

5.2.2 producer.properties

bootstrap.servers=hadoop:9092
key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
acks=[0|-1|1|all] ## 消息确认机制
				## 0 : 不做确认,只管发送
				## -1|all : 首先保证leader将数据写入到磁盘,并确认;还要保证等待数据同步到其他的非leader节点。
				## 1 : 只确保leader写入数据完毕。后期leader和其他节点自动完成同步。
batch.size=1024  ## 每个分区内的用户缓存未发送记录的容量
linger.ms=10     ## 无论你的缓冲区是否填满,都会延迟10ms发送请求
buffer.memory=10240 ## 设置你的生产者的所有的缓存空间
retries=0       ## 发送消息失败之后重试的次数

5.2.3 Demo2_Consumer

package com.zxy.bigdata.kafka.day2;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;

public class Demo2_Consumer {
    public static void main(String[] args) throws IOException {
        // 1.创建配置属性对象
        Properties properties = new Properties();
        properties.load(Demo1_Producer.class.getClassLoader().getResourceAsStream("consumer.properties"));

        //2. 创建消费者对象
        KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(properties);

        //3. 订阅主题
        consumer.subscribe(Arrays.asList("hadoop"));

        //4. 拉取
        while (true) {
            ConsumerRecords<Integer, String> records = consumer.poll(1000);
            for (ConsumerRecord<Integer, String> record : records) {
                Integer key = record.key();
                String value = record.value();
                int partition = record.partition();
                long offset = record.offset();
                String topic = record.topic();
                System.out.println(key + "-->" + value + "-->" + partition + "-->" + offset + "-->" + topic);
            }
        }
    }
}

5.2.4 consumer.properties

bootstrap.servers=hadoop:9092
group.id=hzbigdata2101
key.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
auto.offset.reset=earliest    ## ealiest latest none

5.3 操作Topic

5.3.1 创建主题

package com.zxy.bigdata.kafka.day3;


import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;

import java.util.Arrays;
import java.util.Properties;

public class Demo1_Admin {
    public static void main(String[] args) {
        //1. 创建一个管理员对象
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop:9092"); // 146.56.208.76
        AdminClient admin = AdminClient.create(properties);
        //2. 创建主题
        admin.createTopics(Arrays.asList(new NewTopic("spark", 3, (short)1)));
        //3. 释放资源
        admin.close();
    }
}

5.3.2 查询主题

package com.zxy.bigdata.kafka.day3;


import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;

import java.util.Arrays;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;

public class Demo2_Admin {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1. 创建一个管理员对象
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop:9092"); // 146.56.208.76
        AdminClient admin = AdminClient.create(properties);
        //2. 查询主题
        ListTopicsResult listTopicsResult = admin.listTopics();
        //2.1 获取到你所有的主题名称
        KafkaFuture<Set<String>> names = listTopicsResult.names();
        Set<String> topics = names.get();
        //2.2 遍历
        for (String topic : topics) {
            System.out.println(topic);
        }
        
        //3. 释放资源
        admin.close();
    }
}

5.3.3 查询详情

package com.zxy.bigdata.kafka.day3;


import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartitionInfo;

import java.util.*;
import java.util.concurrent.ExecutionException;

public class Demo3_Admin {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //1. 创建一个管理员对象
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop:9092"); // 146.56.208.76
        AdminClient admin = AdminClient.create(properties);
        //2. 查询主题
        DescribeTopicsResult describeTopicsResult = admin.describeTopics(Arrays.asList("hadoop"));
        KafkaFuture<Map<String, TopicDescription>> mapResult = describeTopicsResult.all();
        Map<String, TopicDescription> map = mapResult.get();

        //3. 遍历
        for (Map.Entry<String, TopicDescription> entry:map.entrySet()) {
            System.out.println(entry.getKey() + "  ----->");
            TopicDescription value = entry.getValue();
            System.out.println(value.name());
            List<TopicPartitionInfo> partitions = value.partitions();
            for (TopicPartitionInfo info : partitions) {
                System.out.println(info.partition() + ":" +info.leader()+":"+info.replicas());
            }
        }

        //3. 释放资源
        admin.close();
    }
}

5.4 record进行主题的分区策略

每条ProducerRecord(topic名称,可选的partition编号,以及一组key和value)

  1. 如果指定了partition,按照指定的分区编号发送
  2. 如果没有指定partition,但是指定了key,使用key进行hash,根据hash结果选择partition
  3. 如果没有指定partition也没有指定key,那么是轮循的方式选择partition

5.5 分区器

5.5.1 核心类

public interface Partitioner extends Configurable, Closeable {

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes The serialized key to partition on( or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes The serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

    /**
     * This is called when partitioner is closed.
     */
    public void close();

}

5.5.2 自定义分区器

5.5.2.1 随机分区器
  • 代码
public class RandomPartitioner implements Partitioner {

    private Random random = new Random();

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //1. 根据主题获取到当前的主题的分区数量
        Integer partitionNum = cluster.partitionCountForTopic(topic);
        //2. 随机分区
        int partitionId = random.nextInt(partitionNum);
        return partitionId;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
  • 注册
partitioner.class=com.zxy.bigdata.kafka.day3.RandomPartitioner
5.5.2.2 Hash分区器
  • 编码
package com.zxy.bigdata.kafka.day3;

import org.apache.kafka.clientsKafka-文件管理

Flink实战系列Flink 1.14.0 消费 kafka 数据自定义反序列化器

kafkaThe group member needs to have a valid member id before actually entering a consumer group(代码片段

SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段

Camel-Kafka java.io.EOFException - NetworkReceive.readFromReadableChannel

在ansible模板中使用动态组名称