kafka08-生产者
Posted apeLew
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka08-生产者相关的知识,希望对你有一定的参考价值。
1.1 消息发送
1.1.1 数据生产流程解析
1. Producer创建时,会创建一个Sender线程并设置为守护线程。
2. 生产消息时,内部时异步的。消息经过拦截器=》序列化器=》分区器=》缓冲区
3. 缓冲区批次发送消息条件为batch.size或者linger.ms。满足其一就会批次发送数据
4. 批次消息发送后,发往指定分区,落入broker磁盘。如果生产者配置了retries大于0,则当消息发送失败,还会重试批次,重试批次的数据会在缓冲区后追加(会导致消息先后顺序错乱)
5. 落盘broker时,是否成功判断有三种形式。不管消息成功失败,返回;二,写入leader分区视为成功;3,leader和副本分区都写成功才视为成功
6. 落盘broker磁盘后,返回生产元数据给生产者,元数据返回有两种方式:一种是通过阻塞直接返回,另一种是通过回调返回
1.1.2 参数
属性 | 解释 |
---|---|
bootstrap.servers | 生产者客户端与broker集群建立初始连接需要的broker地址列表, 由该初始连接发现Kafka集群中其他的所有broker。该地址列表不需 要写全部的Kafka集群中broker的地址,但也不要写一个,以防该节 点宕机的时候不可用。 |
key.serializer | 实现了接口 org.apache.kafka.common.serialization.Serializer的key 序列化类。 |
value.serializer | 实现了接口 org.apache.kafka.common.serialization.Serializer的 value序列化类。 |
acks | 该选项控制着已发送消息的持久性。 acks=0:生产者不等待broker的任何消息确认。只要将消息放到了 socket的缓冲区,就认为消息已发送。不能保证服务器是否收到该 消息,retries设置也不起作用,因为客户端不关心消息是否发送 失败。客户端收到的消息偏移量永远是-1。 acks=1:leader将记录写到它本地日志,就响应客户端确认消息, 而不等待follower副本的确认。如果leader确认了消息就宕机,则可 能会丢失消息,因为follower副本可能还没来得及同步该消息。 acks=all:leader等待所有同步的副本确认该消息。保证了只要有 一个同步副本存在,消息就不会丢失。这是最强的可用性保证。等 价于acks=-1。默认值为1,字符串。可选值:[all, -1, 0, 1] |
compression.type | 生产者生成数据的压缩格式。默认是none(没有压缩)。允许的 值:none,gzip,snappy和lz4。压缩是对整个消息批次来讲 的。消息批的效率也影响压缩的比例。消息批越大,压缩效率越 好。字符串类型的值。默认是none。 |
retries | 设置该属性为一个大于1的值,将在消息发送失败的时候重新发送消 息。该重试与客户端收到异常重新发送并无二至。允许重试但是不 设置max.in.flight.requests.per.connection为1,存在消息 乱序的可能,因为如果两个批次发送到同一个分区,第一个失败了 重试,第二个成功了,则第一个消息批在第二个消息批后。int类型 的值,默认:0,可选值:[0,...,2147483647] |
1.1.3 序列化器
数据的序列化一般生产中使用avro。 自定义序列化器需要实org.apache.kafka.common.serialization.Serializer接口,并实现其 中的 serialize 方法。
实体
public class InfoEntity {
int id;
String msg;
public void setId(int id) {
this.id = id;
}
public void setMsg(String msg) {
this.msg = msg;
}
public int getId() {
return id;
}
public String getMsg() {
return msg;
}
}
序列化实现
@Override
public void configure(Map<String, ?> map, boolean b) {
//获取配置
}
@Override
public byte[] serialize(String topic, InfoEntity infoEntity) {
try {
if (infoEntity == null) {
return null;
} else {
Integer id = infoEntity.getId();
String msg = infoEntity.getMsg();
int length = 0;
byte[] bytes = null;
if (msg != null) {
bytes = msg.getBytes("utf-8");
length = bytes.length;
}
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);
buffer.putInt(id);
buffer.putInt(length);
buffer.put(bytes);
return buffer.array();
}
} catch (Exception e) {
e.printStackTrace();
throw new SerializationException("序列化InfoEntity失败");
}
}
@Override
public void close() {
}
调用
public static void main(String[] args) {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "lew1:9092");
//key的反序列化器
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//value的反序列化器
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, InfoSerializer.class);
KafkaProducer<String, InfoEntity> producer = new KafkaProducer<>(config);
InfoEntity infoEntity = new InfoEntity();
infoEntity.setId(111);
infoEntity.setMsg("张三");
ProducerRecord<String, InfoEntity> record = new ProducerRecord<>("gc_info", infoEntity.getMsg(), infoEntity);
producer.send(record, (recordMetadata, exception) -> {
System.out.println(recordMetadata.topic() +
"\\t" + recordMetadata.partition() +
"\\t" + recordMetadata.offset());
});
producer.close();
}
1.1.4 自定义分区器
如果要自定义分区器,则需要 1. 首先开发Partitioner接口的实现类 2. 在KafkaProducer中进行设置:configs.put("partitioner.class", "xxx.xx.Xxx.class")
默认分区代码 org.apache.kafka.clients.producer.internals.DefaultPartitioner
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = this.nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return ((PartitionInfo)availablePartitions.get(part)).partition();
} else {
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
分析:
- 若有key,经过murmur2 算法算出hash再取绝对值,在对全部分区numPartitions取余得到分区数
- 若无key
- 可用分区availablePartitions大于0,再根据自增nextValue取绝对值再对可用分区取余得到分区
- 若无可用分区,则根据自增nextValue取绝对值再对全部分区取余得到分区
1.1.5 拦截器
Producer拦截器(interceptor)和Consumer端Interceptor是在Kafka 0.10版本被引入的,主要 用于实现Client端的定制化控制逻辑。 对于Producer而言,Interceptor使得用户在消息发送前以及Producer回调逻辑前有机会对消息做 一些定制化需求,比如修改消息等。
同时,Producer允许用户指定多个Interceptor按序作用于同一条 消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是 org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
- onSend(ProducerRecord):该方法封装进KafkaProducer.send方法中,即运行在用户主线 程中。Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息 做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。
- onAcknowledgement(RecordMetadata, Exception):该方法会在消息被应答之前或消息发 送失败时调用,并且通常都是在Producer回调逻辑触发之前。onAcknowledgement运行在 Producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢Producer的消息发 送效率。
- close:关闭Interceptor,主要用于执行一些资源清理工作。
如前所述,Interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。 另外倘若指定了多个Interceptor,则Producer将按照指定顺序调用它们,并仅仅是捕获每个 Interceptor可能抛出的异常记录到错误日志中而非在向上传递。
自定义拦截器:
- 实现ProducerInterceptor接口
-
- 在KafkaProducer的设置中设置自定义的拦截器
1.1.6 原理图
以上是关于kafka08-生产者的主要内容,如果未能解决你的问题,请参考以下文章
2021年大数据Kafka:❤️Kafka的java API编写❤️