kafka基础入门:kafka生产者(producer)
Posted THE WHY
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka基础入门:kafka生产者(producer)相关的知识,希望对你有一定的参考价值。
目录
kafka生产者
数据发送原理
在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator;main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker
重要参数:
- batch.size:只有数据积累到batch.size之后,sender才会从RecordAccumulator拉取数据。默认16k
- linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送数据。单位ms,默认值是0ms,表示没有延迟
- acks:应答策略;
-
- acks=0:生产者发送过来的数据,不需要等数据落盘应答
- acks=1:生产者发送过来的数据,Leader收到数据后应答
- acks=-1(all):生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答
列表:
相关API
相关依赖:
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> </dependency> </dependencies>
参数配置:
//kafka配置文件 Properties properties = new Properties(); //添加配置信息 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092"); //key/value的序列化类型 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //提高集群的吞吐量 //批次大小,默认是16k properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384); // linger.ms:等待时间,默认 0,修改为1 properties.put(ProducerConfig.LINGER_MS_CONFIG,10); //compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy"); //RecordAccumulator:缓冲区大小,默认 32M:buffer.memory(注意是以字节为单位) properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432); //设置ack的类型 properties.put(ProducerConfig.RETRIES_CONFIG,3); //重试次数 properties.put(ProducerConfig.ACKS_CONFIG,"all");//数字也可
创建生产者:
//创建生产者对象 Producer<String, String> firstProducer = new KafkaProducer<String, String>(properties);
异步发送
send方法
参数(new ProducerRecord
)
for (int i = 0; i < 50; i++) firstProducer.send(new ProducerRecord<String, String>("first","why"+i));
带回调函数的异步发送
send方法:
参数:(new ProducerRecord
,new Callback()
)
重写onCompletion
方法,Exception
是异常信息,RecordMetadata
是主题的元数据信息
//发送信息 for (int i = 0; i < 500; i++) firstProducer.send(new ProducerRecord<String, String>("first", "why" + i), new Callback() @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) if (e != null) e.printStackTrace(); else System.out.println("partition:"+recordMetadata.partition()+"-----"+recordMetadata.topic()); ); Thread.sleep(2);
同步发送
只需在异步发送的基础上,再调用一下 get()方法即可
//发送信息 for (int i = 0; i < 5; i++) firstProducer.send(new ProducerRecord<String, String>("first","why"+i)).get();
分区
分区的好处
- 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果
- 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据
分区策略
1.指明partition的情况下,直接将指明的值作为partition值;例如partition=0,所有数据写入分区0
//Integer partition,直接赋值即可 public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) if (topic == null) throw new IllegalArgumentException("Topic cannot be null."); if (timestamp != null && timestamp < 0) throw new IllegalArgumentException( String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp)); if (partition != null && partition < 0) throw new IllegalArgumentException( String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition)); this.topic = topic; this.partition = partition; this.key = key; this.value = value; this.timestamp = timestamp; this.headers = new RecordHeaders(headers);
2.没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;
public ProducerRecord(String topic, K key, V value) this(topic, null, null, key, value, null);
3.既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)
/** * Create a record with no key * * @param topic The topic this record should be sent to * @param value The record contents */ public ProducerRecord(String topic, V value) this(topic, null, null, null, value, null);
具体如何进行分区:
/** * Compute the partition for the given record. * * @param topic The topic name * @param numPartitions The number of partitions of the given @code topic * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,int numPartitions) if (keyBytes == null) //没有指定key return stickyPartitionCache.partition(topic, cluster); //使用粘性分区器 // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
自定义分区器
步骤:
- 定义类实现 Partitioner 接口
- 重写 partition()方法
/** * @author why * 自定义的分区器 */ public class MyPartitioner implements Partitioner @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) int partition = 0; String msg = value.toString(); if (msg.contains("why")) partition = 1; //设置该条数据应处的分区 return partition; @Override public void close() @Override public void configure(Map<String, ?> configs)
在配置文件中配置即可:
//设置自定义的分区器 (使用全类名) properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.why.kafka.tools.MyPartitioner");
ack应答原理—数据可靠性
不同ack策略的可靠性分析
- ack=0:生产者发送过来的数据,不需要等数据落盘应答
-
- 容易出现丢数据的情况
- 可靠性差,效率高
- ack=1:生产者发送过来的数据,Leader收到数据后应答
-
- 应答完成后,还没开始同步副本,Leader挂了,但是新的Leader不会继续同步副本,因为生产者已经收到Leader的应答,认为数据已经发送完毕
- 可靠性中等,效率中等
- ack=-1:生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答
-
- 原理分析(如何解决副本同步迟迟不能完成的问题,因为同步不完成将一直等待而无法应答)
-
-
- Leader维护了一个动态的in-sync replica set(ISR),意为和Leader保持同步的Follower+Leader集合(leader:0,isr:0,1,2)
- 如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由
replica.lag.time.max.ms
参数设定,默认30s。例如2超时,(leader:0, isr:0,1);这样就不用等长期联系不上或者已经故障的节点
-
-
- 数据完全可靠条件 = ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
-
-
- 如果分区副本设置为1个,或 者ISR里应答的最小副本数量(min.insync.replicas 默认为1)设置为1,和ack=1的效果是一样的,仍然有丢数的风险(leader:0,isr:0)
-
-
- 可靠性高,效率低
总结:在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景
数据去重
数据传递语义
- 至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2
- 最多一次(At Most Once)= ACK级别设置为0
- 精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失
At Least Once可以保证数据不丢失,但是不能保证数据不重复
At Most Once可以保证数据不重复,但是不能保证数据不丢失
Exactly Once:可以保证数据不重复也不丢失,通过幂等性和事务来实现(Kafka 0.11版本以后的新特性)
幂等性
幂等性就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复
重复数据的判断标准:具有
<PID, Partition, SeqNumber>
相同主键的消息提交时,Broker只会持久化一条
其中PID是Kafka每次重启都会分配一个新的;
Partition 表示分区号;
Sequence Number是单调自增的。
所以幂等性只能保证的是在单分区单会话内不重复
如何开启幂等性:
开启参数 enable.idempotence
默认为 true(开启),false 关闭
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true); //从以下源码中可以看到ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG代表的就是enable.idempotence /** <code>enable.idempotence</code> */ public static final String ENABLE_IDEMPOTENCE_CONFIG = "enable.idempotence";
生产者事务
开启事务,必须开启幂等性
//设置事务id(必须) properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0"); //创建生产者对象 Producer<String, String> firstProducer = new KafkaProducer<String, String>(properties); //初始化事务 firstProducer.initTransactions(); //开启事务 firstProducer.beginTransaction(); for (int i = 0; i < 5; i++) firstProducer.send(new ProducerRecord<String, String>("first", "why" + i)); //提交事务 firstProducer.commitTransaction(); //关闭资源 firstProducer.close();
数据有序
单分区内,有序
多分区,分区与分区间无序
单分区有序的条件:
- 1.x版本之前:
max.in.flight.requests.per.connection=1
- 在1.x及以后版本
-
- 未开启幂等性:
max.in.flight.requests.per.connection
需要设置为1 - 开启了幂等性:
max.in.flight.requests.per.connection
需要设置小于等于5
- 未开启幂等性:
原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,故无论如何,都可以保证最近5个request的数据都是有序的
以上是关于kafka基础入门:kafka生产者(producer)的主要内容,如果未能解决你的问题,请参考以下文章
四.Kafka入门到精通-SpringBoot整合Kafka(Producer拦截器&Producer监听器)
四.Kafka入门到精通-SpringBoot整合Kafka(Producer拦截器&Producer监听器)