Kafka两种配置文件方式

Posted 一个抓手

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka两种配置文件方式相关的知识,希望对你有一定的参考价值。

1.yml配置文件(简单配置)

spring:
  kafka:
    bootstrap-servers: ip:端口
    consumer:
      group-id: group-test
      enable-auto-commit: true
      auto-commit-interval: 1000ms
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

2.类配置方式(使用了SASL/PLAINTEXT安全认证协议)

出现[Consumer clientId=consumer-1, groupId=group1] Bootstrap broker xxx (id: -1 rack: null) disconnected报错信息就需要用这种配置方式,加入安全认证

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

/**
 * @author 向振华
 * @date 2021/05/10 15:58
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(10);
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>(16);
        // 服务地址
        propsMap.put("bootstrap.servers", "ip:端口");
        // 安全认证协议
        propsMap.put("security.protocol", "SASL_PLAINTEXT");
        propsMap.put("sasl.mechanism", "PLAIN");
        // 填充安全认证用户名和密码
        propsMap.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\\"usnm\\" password=\\"pwd\\";");
        propsMap.put("group.id", "group-test");
        propsMap.put("enable.auto.commit", true);
        propsMap.put("auto.commit.interval.ms", 1000);
        // latest: 从最新的偏移量开始消费
        propsMap.put("auto.offset.reset", "latest");
        // 反序列化方式
        propsMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        propsMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return propsMap;
    }
}

再加一个配置文件sasl.jaas.config到resource

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="kafkaadmin"
    password="kafkaadminpwd"
    user_kafkaadmin="kafkaadminpwd"
    user_kafkaclient1="kafkaclient1pwd"
    user_kafkaclient2="kafkaclient2pwd";
}; 

 

以上是关于Kafka两种配置文件方式的主要内容,如果未能解决你的问题,请参考以下文章

Kafka-文件管理

Kafka详解(上)——消息系统分类Kafka安装两种启动基本概念两种架构核心配置文件

配置 kafka 同步刷盘

kafka---broker 保存消息

给kafka配置外部连接

sparkStreaming读取kafka的两种方式