spring-kafka整合:KafkaTemplate-kafka模板类介绍

Posted PacosonSWJTU

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring-kafka整合:KafkaTemplate-kafka模板类介绍相关的知识,希望对你有一定的参考价值。

【README】

1,本文主要关注 KafkaTemplate的重点方法,并非全部方法;

2,KafkaTemplate  底层依赖于 DefaultKafkaProducerFactory , 关于 DefaultKafkaProducerFactory 的介绍,refer2 

spring-kafka整合:DefaultKafkaProducerFactory默认kafka生产者工厂介绍_PacosonSWJTU的博客-CSDN博客【1】 类描述类描述:单例共享 Producer 实例的 ProducerFactory 实现。此实现将为每次 createProducer() 调用时提供的 Map 配置和可选的 Serializer 实现返回相同的 Producer 实例(如果未启用事务)。如果您使用的序列化器没有参数构造函数并且不需要设置,那么最简单的方法是在传递给 DefaultKafkaProducerFactory 构造函数的配置中针对 ProducerConfig.KEY_SERIALIZER_CLASS_Chttps://blog.csdn.net/PacosonSWJTU/article/details/121306370


【1】KafkaTemplate 类说明

用于执行高级操作的模板。 当与 DefaultKafkaProducerFactory 一起使用时,模板是线程安全的。 生产者工厂和 org.apache.kafka.clients.producer.KafkaProducer 确保这一点;

public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationContextAware, BeanNameAware,
		ApplicationListener<ContextStoppedEvent>, DisposableBean {

 【1.1】构造方法

使用提供的生产者工厂和 autoFlush 设置创建一个实例。


如果您已将生产者的 linger.ms 配置为非默认值并希望立即在此模板上发送操作,无论该设置如何, 又或者您希望阻塞直到服务器根据acs属性确认已收到消息, 需要把autoFlush设置为true

如果 configOverrides 不为 null 或不为空,则将使用合并的生产者属性创建一个新的 DefaultKafkaProducerFactory,这些属性在提供的工厂属性之后进行覆盖。

public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush,
			@Nullable Map<String, Object> configOverrides) {

    Assert.notNull(producerFactory, "'producerFactory' cannot be null");
    this.autoFlush = autoFlush;
    this.micrometerEnabled = KafkaUtils.MICROMETER_PRESENT;
    // 是否自定义生产者工厂 
    this.customProducerFactory = configOverrides != null && configOverrides.size() > 0;
    if (this.customProducerFactory) {
	Map<String, Object> configs = new HashMap<>(producerFactory.getConfigurationProperties()); 
// 覆盖工厂属性 
	configs.putAll(configOverrides); 

// 创建新的 DefaultKafkaProducerFactory
	DefaultKafkaProducerFactory<K, V> newFactory = new DefaultKafkaProducerFactory<>(configs, producerFactory.getKeySerializerSupplier(), producerFactory.getValueSerializerSupplier());
// 设置物理关闭生产者的超时时间  
	newFactory.setPhysicalCloseTimeout((int) producerFactory.getPhysicalCloseTimeout().getSeconds());
// 设置是否分区 
       newFactory.setProducerPerConsumerPartition(
            producerFactory.isProducerPerConsumerPartition());
// 设置是否 每个线程创建一个 生产者; 
        newFactory.setProducerPerThread(producerFactory.isProducerPerThread());
// 新工厂赋值
	this.producerFactory = newFactory;
    } else {
        this.producerFactory = producerFactory;
    }
// 是否开启kafka事务 
    this.transactional = this.producerFactory.transactionCapable();
}

【1.2】发送消息方法(非常重要)

发送消息有很多方法,大致分为两类;

  • send();
  • doSend();

【1.2.1】send() 发送消息

有4个外观方法,使用的都是默认topic;

@Override
public ListenableFuture<SendResult<K, V>> sendDefault(@Nullable V data) {
	return send(this.defaultTopic, data);
}

@Override
public ListenableFuture<SendResult<K, V>> sendDefault(K key, @Nullable V data) {
	return send(this.defaultTopic, key, data);
}

@Override
public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, @Nullable V data) {
	return send(this.defaultTopic, partition, key, data);
}

@Override
public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, @Nullable V data) {
	return send(this.defaultTopic, partition, timestamp, key, data);
}
@Override
public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
	ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, data);
	return doSend(producerRecord);
}

可以看到,最后还是调用了 底层的 doSend() 方法;


【1.2.2】doSend() 方法

5个 doSend() 方法的外观方法 ,这5个方法对 topic ,分区, 消息key,时间戳,消息value  进行了重载

@Override
public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
	ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data);
	return doSend(producerRecord);
}

@Override
public ListenableFuture<SendResult<K, V>> send(String topic, K key, @Nullable V data) {
	ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, key, data);
	return doSend(producerRecord);
}

@Override
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, @Nullable V data) {
	ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, key, data);
	return doSend(producerRecord);
}

@Override
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key,
		@Nullable V data) {

	ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, partition, timestamp, key, data);
	return doSend(producerRecord);
}

@Override
public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record) {
	Assert.notNull(record, "'record' cannot be null");
	return doSend(record);
}

底层 doSend() 定义如下:

protected ListenableFuture<SendResult<K, V>>

        doSend(final ProducerRecord<K, V> producerRecord)

protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
	// 获取生产者 
	final Producer<K, V> producer = getTheProducer(producerRecord.topic());
	this.logger.trace(() -> "Sending: " + producerRecord);
	final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
	Object sample = null;
	if (this.micrometerEnabled && this.micrometerHolder == null) {
		this.micrometerHolder = obtainMicrometerHolder();
	}
	if (this.micrometerHolder != null) {
		sample = this.micrometerHolder.start();
	}
	
	// 发送消息 
	Future<RecordMetadata> sendFuture =
			producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample));
	// May be an immediate failure (注意,这里可能马上失败,或有运行时异常抛出)
	if (sendFuture.isDone()) { 
		try {
			sendFuture.get(); // 这里调用get会阻塞,如果发送没有完成的话 
		}
		catch (InterruptedException e) {
			Thread.currentThread().interrupt();
			throw new KafkaException("Interrupted", e);
		}
		catch (ExecutionException e) {
			throw new KafkaException("Send failed", e.getCause()); // NOSONAR, stack trace
		}
	}
	if (this.autoFlush) { // 自动刷新 
		flush();
	}
	this.logger.trace(() -> "Sent: " + producerRecord);
	return future;
}

【代码解说】

step1, 调用了 getTheProducer() 获取生产者 ;

关于 DefaultKafkaProducerFactory.createProducer() 可以参见 以下博文,因篇幅,本文不再赘述;

spring-kafka整合:DefaultKafkaProducerFactory默认kafka生产者工厂介绍_PacosonSWJTU的博客-CSDN博客

step2,producer.send(producerRecord, buildCallback(producerRecord, producer, future, sample)) ; 调用了 buildCallback(...) 构建回调对象;

 

非事务模式,则关闭生产者

由 DefaultKafkaProducerFactory 可知, 生产者是  CloseSafeProducer, 其包裹了 原生 kafka生产者; 所以 调用了 CloseSafeProducer.close() 方法;

 

 step3,自动刷新缓存 flush(); 

如果 ProducerFactory 提供单例生产者(例如 DefaultKafkaProducerFactory),则调用此方法才有意义。

public void flush() {
	Producer<K, V> producer = getTheProducer();
	try {
		producer.flush();
	}
	finally {
		closeProducer(producer, inTransaction());
	}
}

protected void closeProducer(Producer<K, V> producer, boolean inTx) {
	if (!inTx) { // 非事务才关闭 
		producer.close(this.closeTimeout);
	}
}


【2】KafkaTemplate 发送消息与生产者复用 

 我们再次 follow了 DefaultKafkaProducerFactory的 doCreateProducer() 方法;

第1次因为发送消息 新建了 producer;

第2次再发送消息时,因为producer 不为null;所以直接取走;

同时 synchronized同步块可以避免并发问题;

发送消息后,是否关闭生产者,可以参考 【小结】


【小结】

通过分析 KafkaTemplate.doSend() 消息发送分发, 我们可以看到,

每发送一条消息,如果抛出异常的话,则会关闭kafka生产者,否则不会关闭生产者;原因参见  

spring-kafka整合:DefaultKafkaProducerFactory默认kafka生产者工厂介绍_PacosonSWJTU的博客-CSDN博客https://blog.csdn.net/PacosonSWJTU/article/details/121306370中的章节 【4.5.1】;

以上是关于spring-kafka整合:KafkaTemplate-kafka模板类介绍的主要内容,如果未能解决你的问题,请参考以下文章

springboot整合kafka

springboot 整合kafka

Spring与Kafka整合

spring boot整合kafka

SpringBoot整合Kafka消息队列并实现发布订阅和消费

springboot2.x整合kafka