kafka学习记录—Broker(服役退役节点,kafka副本,文件存储)

Posted 柒柚jun

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka学习记录—Broker(服役退役节点,kafka副本,文件存储)相关的知识,希望对你有一定的参考价值。

kafka学习记录—Broker

Broker工作流程

zookeeper存储的kafka信息

  1. 启动zookeeper

    [root@hadoop103 bin] ./zkCli.sh 
    
    WATCHER::
    
    WatchedEvent state:SyncConnected type:None path:null
    [zk: localhost:2181(CONNECTED) 0] 
    
    #查看信息
    [zk: localhost:2181(CONNECTED) 2] ls /
    [kafka, zookeeper]
    

    连接prettyzoo

    1. /kafka/brokers/ids 【0,1,2】:记录有哪些服务器
    2. /kafka/brokers/topics/first/atitions/0/state:记录谁是leader,有哪些服务器可用
    3. /kafka/cluster/id
    4. /kafka/consumers:offset存储在kafka主题中
    5. /kafka/controller:辅助选举谁是leader
    6. /kafka/config
  2. broker总体流程

    上面是zookeeper集群,下面是kafka集群。每台kafka的broker节点启动之后都会在zookeeper进行注册,选择controller节点(选举leader,在isr活着为前提,按照AR(kafka分区所有副本统称)排在前面的优先),controller讲节点信息上传到zookeeper,生产者发送数据到broker,follower主动与leader同步信息,

生产经验—节点服役和退役

  1. 服役新节点

    【取巧】:直接克隆hadoop103。配置完主机名称和ip,利用xshell连接,【注意!!】:讲kafka目录下的datas和logs删除(是dadoop103的数据,不删除不能同时上线)。

  2. 执行负载均衡操作

    • 创建要均衡的主题

      [root@hadoop103 bin] vim topics-to-move.json
      
      
         "topics":[
             "topic":"first"
         ],
         "version":1
      
      
    • 生成一个负载均衡的计划

      [root@hadoop103 bin] bin/kafka-reassign-partitions.sh --bootstrap-server hadoop101:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate
      

      报错:Error: The input string is not a valid JSON
      kafka.admin.AdminOperationException: The input string is not a valid JSON
      at kafka.admin.ReassignPartitionsCommand . p a r s e T o p i c s D a t a ( R e a s s i g n P a r t i t i o n s C o m m a n d . s c a l a : 1272 ) a t k a f k a . a d m i n . R e a s s i g n P a r t i t i o n s C o m m a n d .parseTopicsData(ReassignPartitionsCommand.scala:1272) at kafka.admin.ReassignPartitionsCommand .parseTopicsData(ReassignPartitionsCommand.scala:1272)atkafka.admin.ReassignPartitionsCommand.parseGenerateAssignmentArgs(ReassignPartitionsCommand.scala:722)
      at kafka.admin.ReassignPartitionsCommand . g e n e r a t e A s s i g n m e n t ( R e a s s i g n P a r t i t i o n s C o m m a n d . s c a l a : 583 ) a t k a f k a . a d m i n . R e a s s i g n P a r t i t i o n s C o m m a n d .generateAssignment(ReassignPartitionsCommand.scala:583) at kafka.admin.ReassignPartitionsCommand .generateAssignment(ReassignPartitionsCommand.scala:583)atkafka.admin.ReassignPartitionsCommand.handleAction(ReassignPartitionsCommand.scala:236)
      at kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:206)
      at kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala)

      解决:

      服役新节点后,生成负载均衡计划报错:The input string is not a valid JSON,查看资料新建的文件只包括计划分区情况,修改文件后重新生成成功。

    创建副本存储计划

    [root@hadoop103 bin] vim increase-replication-factor.json
    #将生成的计划放进去
    

    执行副本存储计划

    [root@hadoop103 bin] bin/kafka-reassign-partitions.sh --bootstrap-server hadoop101:9092 --reassignment-json-file increase-replication-factor.json --execute
    

    验证计划

    [root@hadoop103 bin] x bin/kafka-reassign-partitions.sh --bootstrap-server hadoop101:9092 --reassignment-json-file increase-replication-factor.json --verify
    
  3. 退役旧节点

    执行负载均衡操作

    将hadoop104停掉

kafka副本

  1. 副本基本信息

    • 作用:提高数据可靠性
    • 默认一个副本,生产环境中一般配置两个副本(配置太多降低效率,增加压力)
    • 副本分为leader和follower,有别于hadoop,hadoop副本三个是等价的,操作的对象只针对leader
    • 统称为AR=ISR+OSR。
    • ISR:表示和leader保持同步的follower集合,长期未发送请求则被踢出(30s),OSR:延迟过多的副本
  2. leader选举流程

    controller监听节点变化(若kafka节点挂掉,zookeeper中的id也会停掉),选举新的leader。

    #创建四个分区,四个副本
    [root@hadoop101 kafka] bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --create --topic qiyou01 --partitions 4 --replication-factor 4
    
    #查看
    [root@hadoop101 kafka] bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic qiyou01
    Topic: qiyou01	TopicId: n6ufjP7GRSKngztIX-aLog	PartitionCount: 4	ReplicationFactor: 4	Configs: segment.bytes=1073741824
    	Topic: qiyou01	Partition: 0	Leader: 3	Replicas: 3,1,0,2	Isr: 3,1,0,2
    	Topic: qiyou01	Partition: 1	Leader: 1	Replicas: 1,0,2,3	Isr: 1,0,2,3
    	Topic: qiyou01	Partition: 2	Leader: 0	Replicas: 0,2,3,1	Isr: 0,2,3,1
    	Topic: qiyou01	Partition: 3	Leader: 2	Replicas: 2,3,1,0	Isr: 2,3,1,0
    

    将hadoop104停掉

    [root@hadoop104 kafka] bin/kafka-server-stop.sh 
    [root@hadoop104 kafka] jps
    11105 Jps
    3506 Kafka
    [root@hadoop104 kafka] jps
    11136 Jps
    

    再次查看hadoop101

    [root@hadoop101 kafka] bin/kafka-topics.sh --bootstrap-server hadoop101:9092 --describe --topic qiyou01
    Topic: qiyou01	TopicId: n6ufjP7GRSKngztIX-aLog	PartitionCount: 4	ReplicationFactor: 4	Configs: segment.bytes=1073741824
    	Topic: qiyou01	Partition: 0	Leader: 1	Replicas: 3,1,0,2	Isr: 1,0,2
    	Topic: qiyou01	Partition: 1	Leader: 1	Replicas: 1,0,2,3	Isr: 1,0,2
    	Topic: qiyou01	Partition: 2	Leader: 0	Replicas: 0,2,3,1	Isr: 0,2,1
    	Topic: qiyou01	Partition: 3	Leader: 2	Replicas: 2,3,1,0	Isr: 2,1,0
    

    现象:停掉哪个broker,leader会顺序后延,如上所示。回复hadoop104,在最后面加入ISR队列。

  3. follower故障处理

    • LEO:每个副本的最后一个offset,LEO是最新的offset+1。

    • HW:所有副本中最小的LEO。

    • follower挂了,被踢出ISR,其他leader和follower继续接收数据,恢复后读取上次的HW,将lod文件高于HW的截掉,从HW开始向leader同步。等到该LEO大于等于该patition的HW,即追上leader,重新加入ISR。

  4. leader出故障

    在ISR选举新的leader,保证数据一致性,其余follower将高于HW的log文件截掉,从新的leader同步数据,【注意!!】:不能保证数据不丢失。

  5. 分区副本的分配

    尽量均匀的分配

  6. leader patition自动平衡

    默认是true

    默认是10%,超过该值,控制器出发leader的平衡

    默认值300s,检查leader负载是否平衡的间隔时间

  7. 增加副本因子

    某个主题等级提升,考虑增加副本

    • 创建主题

    • 手动增加副本存储

    • 执行副本计划

文件存储

  1. 文件存储机制

    一个topic分为多个patition,一个patition分为多个segment,一个segment包含:.log日志文件,.index偏移量索引文件,.timeindex时间戳索引文件,其他文件。

    命名规则:topic名称+分区序号:first-0。

  2. [root@hadoop101 kafka] cd datas/
    [root@hadoop101 datas] ll
    总用量 16
    -rw-r--r--. 1 root root   0 422 15:29 cleaner-offset-checkpoint
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-1
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-10
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-13
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-16
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-19
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-22
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-25
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-28
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-31
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-34
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-37
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-4
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-40
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-43
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-46
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-49
    drwxr-xr-x. 2 root root 167 425 16:23 __consumer_offsets-7
    drwxr-xr-x. 2 root root 204 425 16:23 first-0
    -rw-r--r--. 1 root root   4 425 20:01 log-start-offset-checkpoint
    -rw-r--r--. 1 root root  88 425 16:23 meta.properties
    drwxr-xr-x. 2 root root 167 425 16:52 qiyou01-0
    drwxr-xr-x. 2 root root 167 425 16:52 qiyou01-1
    drwxr-xr-x. 2 root root 167 425 16:52 qiyou01-2
    drwxr-xr-x. 2 root root 167 425 16:52 qiyou01-3
    -rw-r--r--. 1 root root 468 425 20:01 recovery-point-offset-checkpoint
    -rw-r--r--. 1 root root 468 425 20:02 replication-offset-checkpoint
    [root@hadoop101 datas] cd first-0/
    [root@hadoop101 first-0] ll
    总用量 20
    -rw-r--r--. 1 root root 10485760 425 16:23 00000000000000000000.index
    -rw-r--r--. 1 root root      438 422 16:15 00000000000000000000.log
    -rw-r--r--. 1 root root 10485756 425 16:23 00000000000000000000.timeindex
    -rw-r--r--. 1 root root       10 423 00:06 00000000000000000006.snapshot
    -rw-r--r--. 1 root root        8 425 16:23 leader-epoch-checkpoint
    -rw-r--r--. 1 root root       43 422 15:57 partition.metadata
    

    直接查看log日志是乱码,用工具

    [root@hadoop101 first-0] kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index
    Dumping ./00000000000000000000.index
    offset: 0 position: 0
    
    #查看000....log
    [root@hadoop101 first-0] kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log
    Dumping ./00000000000000000000.log
    Starting offset: 0
    baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1650614951198 size: 73 magic: 2 compresscodec: none crc: 1028480518 isvalid: true
    baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 73 CreateTime: 1650615093214 size: 75 magic: 2 compresscodec: none crc: 1935213717 isvalid: true
    baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 148 CreateTime: 1650615105104 size: 73 magic: 2 compresscodec: none crc: 2561039741 isvalid: true
    baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 221 CreateTime: 1650615108653 size: 73 magic: 2 compresscodec: none crc: 2173917041 isvalid: true
    baseOffset: 4 lastOffset: 4 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 294 CreateTime: 1650615341320 size: 72 magic: 2 compresscodec: none crc: 1877835761 isvalid: true
    baseOffset: 5 lastOffset: 5 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 366 CreateTime: 1650615344197 size: 72 magic: 2 compresscodec: none crc: 3585889592 isvalid: true
    

    【注意!!】:原理:index为稀疏索引(方式),每往log写入4kb数据,会往index写入一条索引。index文件中保存的为相对offset,保证控件占用不会过大。

    文件清除策略

以上是关于kafka学习记录—Broker(服役退役节点,kafka副本,文件存储)的主要内容,如果未能解决你的问题,请参考以下文章

Kafka 之Broker工作流程节点服役和退役

Kafka消息队列大数据实战教程-第五篇(Broker工作原理以及节点服役)

Kafka消息队列大数据实战教程-第五篇(Broker工作原理以及节点服役)

Hbase节点的管理|服役和退役节点

HDFS服役新数据节点和退役旧节点步骤

大数据-kafka学习——Kafka Broker