kafka配置双监听

Posted 1994jinnan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka配置双监听相关的知识,希望对你有一定的参考价值。

kafka配置双监听

由于公司需要在其他城市开通业务所以另外开了一个数据库但是kafka消费想直接使用旧的项目,以免多部署一个项目占用服务器内存空间。

首先先在配置文件中配置2个kafka的ip端口等信息

spring.kafka.xx.bootstrap-servers=xxx.xxx.x.x:xxxx  kafkaip和端口
spring.kafka.xx.group-id=camphor
spring.kafka.xx.enable-auto-commit=false  自动提交设置关闭
spring.kafka.zz.bootstrap-servers=xxx.xxx.x.x:xxxx kafkaip和端口
spring.kafka.zz.group-id=camphor
spring.kafka.zz.enable-auto-commit=false 自动提交设置关闭

然后添加2个kafka的对应的配置类


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;


@Configuration
public class KafkaxxConfig {


    @Value("${spring.kafka.xx.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.xx.group-id}")
    private String groupId;
    @Value("${spring.kafka.xx.enable-auto-commit}")
    private boolean enableAutoCommit;

	// 配置kafka模板
    @Bean(name = "kafkaxxTemplate")
    public KafkaTemplate<String, String> kafkaxxTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

	// 配置kafka工厂类
    @Bean(name = "kafkaxxContainerFactory")
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaxxContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        //Listener配置
        factory.getContainerProperties()
                .setPollTimeout(3000);
        factory.getContainerProperties()
                .setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties()
                .setPollTimeout(15000);
        return factory;
    }

	// 生产工厂
    private ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
	// 消费工厂
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

	// 生产者配置
    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
		// 用于建立到Kafka集群的初始连接的主机/端口对列表。放入ip端口
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		// 生产者发送失败后的重试次数,默认0
        props.put(ProducerConfig.RETRIES_CONFIG, 10);
		/**
         * 在考虑请求完成之前,生产者要求leader收到的确认数量,这将控制发送的记录的持久性。

         acks=0如果设置为零,则生产者不会等待来自服务器的任何确认。该记录将被立即添加到套接字缓冲区并被视为已发送。在这种情况下,retries不能保证服务器已经收到记录,并且配置不会生效(因为客户端通常不会知道任何故障)。为每个记录返回的偏移量将始终设置为-1。

         acks=1这意味着领导者会将记录写入其本地日志中,但会在未等待所有追随者完全确认的情况下作出响应。在这种情况下,如果领导者在承认记录后但在追随者复制之前立即失败,那么记录将会丢失。

         acks=all这意味着领导者将等待全套的同步副本确认记录。这保证只要至少有一个同步副本保持活动状态,记录就不会丢失。这是最强有力的保证。这相当于acks = -1设置。
         */
        props.put(ProducerConfig.ACKS_CONFIG, "all");
		// 只要有多个记录被发送到同一个分区,生产者就会尝试将记录一起分成更少的请求。这有助于客户端和服务器的性能。该配置以字节为单位控制默认的批量大小。不会尝试批量大于此大小的记录。发送给brokers的请求将包含多个批次,每个分区有一个可用于发送数据的分区。小批量大小将使批次不太常见,并可能降低吞吐量(批量大小为零将完全禁用批次)。一个非常大的批量大小可能会更浪费一点使用内存,因为我们将始终为预期的额外记录分配指定批量大小的缓冲区。
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1000);
		// 生产者可用于缓冲等待发送到服务器的记录的总内存字节数。如果记录的发送速度比发送到服务器的速度快,那么生产者将会阻止max.block.ms它,然后它会抛出异常。
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
		// 用于实现Serializer接口的密钥的串行器类。
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

	// 消费者配置
    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
		// 用于建立到Kafka集群的初始连接的主机/端口对列表。放入ip端口
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		// 唯一的指明了consumer的group的名字,group名一样的进程属于同一个consumer group。
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
		// 如果设为true,consumer会定时向ZooKeeper发送已经获取到的消息的offset。当consumer进程挂掉时,已经提交的offset可以继续使用,让新的consumer继续工作。
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
		// consumer向ZooKeeper发送offset的时间间隔。
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
		// 用于实现Serializer接口的密钥的串行器类。
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
}

第二个配置类

@Configuration
public class KafkazzConfig {

    @Value("${spring.kafka.zz.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.zz.group-id}")
    private String groupId;
    @Value("${spring.kafka.zz.enable-auto-commit}")
    private boolean enableAutoCommit;

    @Bean(name = "kafkazzTemplate")
    public KafkaTemplate<String, String> kafkazzTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean(name = "kafkazzContainerFactory")
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkazzContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        //Listener配置
        factory.getContainerProperties()
                .setPollTimeout(3000);
        factory.getContainerProperties()
                .setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties()
                .setPollTimeout(15000);
        return factory;
    }
	
	// 下面的配置跟上一个一样

接下来就是发送消息的工具类

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

@Component
public class KafkaSendUtil {
    private static KafkaTemplate kafkaxxTemplate;

    private static KafkaTemplate kafkazzTemplate;

    @Autowired
    public KafkaSendUtil(@Qualifier("kafkaxxTemplate") KafkaTemplate kafkaxxTemplate,
                         @Qualifier("kafkazzTemplate") KafkaTemplate kafkazzTemplate) {
        KafkaSendUtil.kafkaxxTemplate = kafkaxxTemplate;
        KafkaSendUtil.kafkazzTemplate = kafkazzTemplate;
    }

    public static ListenableFuture send(String topic, String type, String msg) {
        String str = "通过配置或其他方式获取发送的对象";
        ListenableFuture listenableFuture = null;
        switch (str) {
            case "xx":
                listenableFuture = kafkaxxTemplate.send(topic, type, msg);
                break;
            case "zz":
                listenableFuture = kafkazzTemplate.send(topic, type, msg);
                break;
            default:
                break;
        }
        return listenableFuture;
    }

}

最后是消费者的监听写法

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

    public static final String topic = "test-topic";
    // containerFactory 的值要与配置中 KafkaListenerContainerFactory 的 Bean 名相同
    @KafkaListener(topics = {topic}, containerFactory = "kafkaxxContainerFactory")
    public void listenerOne(ConsumerRecord<?, ?> record, Acknowledgment ack) {
        LOGGER.info(" kafka xx 消费者 接收到消息:{}", record.toString());
        ack.acknowledge(); // 消费提交
    }

    @KafkaListener(topics = {topic}, containerFactory = "kafkazzContainerFactory")
    public void listenerTwo(ConsumerRecord<?, ?> record, Acknowledgment ack) {
        LOGGER.info(" kafka zz消费者 接收到消息:{}", record.toString());
        ack.acknowledge();
    }
}
 来源:锌闻网

以上是关于kafka配置双监听的主要内容,如果未能解决你的问题,请参考以下文章

kafka专栏SpringBoot下`@KafkaListener`消费监听属性详解

kafka flumn sparkstreaming java实现监听文件夹内容保存到Phoenix中

springboot集成kafka详细步骤(发送及监听消息示例)

如何将 kubernetes 服务端点 IP 传递到 KAFKA 广告监听器

linux学习:Nginx--常见功能配置片段与优化-06

Kafka中@KafkaListener不自动监听消费数据了