Kafka 分区备份实战
Posted 哥不是小萝莉
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka 分区备份实战相关的知识,希望对你有一定的参考价值。
1.概述
在 Kafka 集群中,我们可以对每个 Topic 进行一个或是多个分区,并为该 Topic 指定备份数。这部分元数据信息都是存放在 Zookeeper 上,我们可以使用 zkCli 客户端,通过 ls 和 get 命令来查看元数据信息。通过 log.dirs 属性控制消息存放路径,每个分区对应一个文件夹,文件夹命名方式为:TopicName-PartitionIndex,该文件夹下存放这该分区的所有消息和索引文件,如下图所示:
2.内容
Kafka 集群在生产消息入库的时候,通过 Key 来进行分区存储,按照相应的算法,生产分区规则,让所生产的消息按照该规则分布到不同的分区中,以达到水平扩展和负载均衡。而我们在消费这些消息的时候,可以使用多线程来消费该 Topic 下的所有分区中的消息。
分区规则的制定,通过实现 kafka.producer.Partitioner 接口,该接口我们可以进行重写,按照自己的方式去实现分区规则。如下,我们按照 Key 的 Hash 值,然后取模得到分区索引,代码如下所示:
1package cn.hadoop.hdfs.kafka.partition;
2
3import kafka.producer.Partitioner;
4import kafka.utils.VerifiableProperties;
5
6/**
7 * @Date Nov 3, 2016
8 *
9 * @Author dengjie
10 *
11 * @Note 先 Hash 再取模,得到分区索引
12 */
13public class CustomerPartitioner implements Partitioner {
14
15 public CustomerPartitioner(VerifiableProperties props) {
16 }
17
18 public int partition(Object key, int numPartitions) {
19 int partition = 0;
20 String k = (String) key;
21 partition = Math.abs(k.hashCode()) % numPartitions;
22 return partition;
23 }
24
25}
在创建 Topic 的时候,若按照上述规则创建分区,分区数最后为 Brokers 的整数倍,这样才能发挥其负载均衡的作用,比如:当前我们集群节点由 3 个 Broker 组成,如下图所示:
2.1 创建分区
我们在创建分区的时候,可以通过 Kafka 提供的客户端命令进行创建,如下,我们创建一个6分区,3备份的一个 Topic,命令如下所示:
1./kafka-topics.sh --create --zookeeper k1:2181,k2:2181,k3:2181 --replication-factor 3 --partitions 6 --topic ke_test
这里需要注意的是,指定备份数的时候,备份数要小于等于 Brokers 数。否则创建失败。在创建分区的时候,假设,我们只创建 2 个分区,而我们上述图中, Brokers 有 3 个,会造成有一个 Broker 上没有该 Topic 的分区,以致分布不均。
2.2 分区入库
一般,我们在入库消息的时候,都有使用 Kafka 的 API,如下,我们使用生产 API ,按照上述的 Hash 取模规则,进行分区入库,代码如下所示:
1package cn.hadoop.hdfs.kafka.partition;
2
3import java.util.List;
4import java.util.Properties;
5
6import cn.hadoop.hdfs.kafka.partition.data.FileRead;
7import kafka.javaapi.producer.Producer;
8import kafka.producer.KeyedMessage;
9import kafka.producer.ProducerConfig;
10
11/**
12 * @Date Nov 3, 2016
13 *
14 * @Author dengjie
15 *
16 * @Note 按照先 Hash 再取模的规则,进行分区入库
17 */
18public class PartitionerProducer {
19 public static void main(String[] args) {
20 producerData();
21 }
22
23 private static void producerData() {
24 Properties props = new Properties();
25 props.put("serializer.class", "kafka.serializer.StringEncoder");
26 props.put("metadata.broker.list", "k1:9092,k2:9092,k3:9092");
27 props.put("partitioner.class", "cn.hadoop.hdfs.kafka.partition.CustomerPartitioner");
28 Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));
29 String topic = "ke_test";
30 List<String> list = FileRead.readData();
31 for (int i = 0; i < list.size(); i++) {
32 String k = "key" + i;
33 String v = new String(list.get(i));
34 producer.send(new KeyedMessage<String, String>(topic, k, v));
35 if (i == (list.size() - 1)) {
36 return;
37 }
38 }
39 producer.close();
40 }
41}
这里,我们分析发现,生产者在生产消息入库时,会按照 CustomerPartitioner 的规则,进行分区入库,在入库时,将 Key 先做 Hash,然后分区数取模(这里分区数是 6).我们计算可以得到一下信息:
1hashCode("key0") % 6 = 1
2hashCode("key1") % 6 = 2
3hashCode("key2") % 6 = 3
4hashCode("key3") % 6 = 4
5hashCode("key4") % 6 = 5
6hashCode("key5") % 6 = 0
7// ... 以此循环
按照该表述规则进行分区入库。
2.3 分区入库验证
接下里,我们通过 Kafka 的消费者 API 来验证,在消费时,消费 Topic 各分区的详情,代码如下所示:
1package cn.hadoop.hdfs.kafka.partition;
2
3import java.util.HashMap;
4import java.util.List;
5import java.util.Map;
6import java.util.Properties;
7
8import kafka.consumer.Consumer;
9import kafka.consumer.ConsumerConfig;
10import kafka.consumer.ConsumerIterator;
11import kafka.consumer.KafkaStream;
12import kafka.javaapi.consumer.ConsumerConnector;
13import kafka.message.MessageAndMetadata;
14
15/**
16 * @Date Nov 3, 2016
17 *
18 * @Author dengjie
19 *
20 * @Note 通过 Kafka 的消费者 API 验证分区入库的消息
21 */
22public class PartitionerConsumer {
23 public static void main(String[] args) {
24 String topic = "ke_test";
25 ConsumerConnector consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
26 Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
27 topicCountMap.put(topic, new Integer(1));
28 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
29 KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
30 ConsumerIterator<byte[], byte[]> it = stream.iterator();
31 while (it.hasNext()) {
32 MessageAndMetadata<byte[], byte[]> mam = it.next();
33 System.out.println("consume: Partition [" + mam.partition() + "] Message: [" + new String(mam.message())
34 + "] ..");
35 }
36
37 }
38
39 private static ConsumerConfig createConsumerConfig() {
40 Properties props = new Properties();
41 props.put("group.id", "group1");
42 props.put("zookeeper.connect", "zk1:2181,zk2:2181,zk3:2181");
43 props.put("zookeeper.session.timeout.ms", "40000");
44 props.put("zookeeper.sync.time.ms", "200");
45 props.put("auto.commit.interval.ms", "1000");
46 props.put("auto.offset.reset", "smallest");
47 return new ConsumerConfig(props);
48 }
49}
这里笔者只是验证消费数据,若在实际生产线上,需将上述单线程消费改造成多线程消费,来提升处理消息的能力。
2.4 验证结果
这里,我们线运行生产者,让其生产消息,并分区入库;然后,在启动消费者,消费消息验证其结果,如下图所示:
3.总结
需要注意的是,分区数建议为 Brokers 的整数倍,让其达到均匀分布;备份数必须小于等于 Brokers。以及,多线程消费的控制,其线程数建议和分区数相等。
长按关注,获取更多干货
这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以发送邮件给我,我会尽我所能为您解答,与君共勉!
以上是关于Kafka 分区备份实战的主要内容,如果未能解决你的问题,请参考以下文章
Flink实战系列Flink SQL 写入 kafka 自定义分区策略
Kafka消息队列大数据实战教程-第三篇(Kafka分区和副本的创建)
Kafka消息队列大数据实战教程-第三篇(Kafka分区和副本的创建)