伪分布式Kafka环境搭建与SpringBoot集成

Posted 迷途老码

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了伪分布式Kafka环境搭建与SpringBoot集成相关的知识,希望对你有一定的参考价值。

安装包下载

 下载安装包
wget https://archive.apache.org/dist/kafka/2.2.0/kafka_2.12-2.2.0.tgz

# 解压
tar -zxf kafka_2.12-2.2.0.tgz

# 复制kafka
cp -rp kafka_2.12-2.2.0 kafka-broker-1

新建数据和日志路径

新建kafka目录

cd /usr/local/kafka

mkdir -p kafka/001/log
mkdir -p kafka/002/log
mkdir -p kafka/003/log

新建zookeeper目录

cd /usr/local/kafka

mkdir -p zookeeper/001/log
mkdir -p zookeeper/002/log
mkdir -p zookeeper/003/log
mkdir -p zookeeper/001/data
mkdir -p zookeeper/002/data
mkdir -p zookeeper/003/data


# 新建myid
cat 1 > zookeeper/001/data/myid
cat 2 > zookeeper/002/data/myid
cat 3 > zookeeper/003/data/myid

修改配置文件

第一个broker(路径:/usr/local/kafka/kafka-broker-1/config)

新增jaas.conf

KafkaServer 
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
    user_admin="admin";
;

新增zk_jaas.conf

Server 
   org.apache.zookeeper.server.auth.DigestLoginModule required
   username="admin"
   password="admin"
   user_admin="admin";
;

Client 
       org.apache.zookeeper.server.auth.DigestLoginModule required
       username="admin"
       password="admin";
;

修改zookeeper配置文件

zookeeper.properties

# 数据文件与日志文件
dataDir=/usr/local/kafka/zookeeper/001/data
dataLogDir=/usr/local/kafka/zookeeper/001/log

# 端口
clientPort=2181

# 服务端口,第一个是给客户端提供服务端口,后面一个是内部通讯端口,例如选举leader
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889

# SASL认证
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew=3600000
requireClientAuthScheme=sasl
zookeeper.sasl.client=true

修改zookeeper启动命令

zookeeper-server-start.sh

# 加入export这一行
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/kafka-broker-1/config/zk_jaas.conf $KAFKA_OPTS"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain "$@"

修改kafka配置文件

server.properties

broker.id=0

listeners=SASL_PLAINTEXT://:9092

listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

log.dirs=/usr/local/kafka/kafka/001/log

zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

修改kafka启动命令

kafka-run-class.sh

# Generic jvm settings you want to add
if [ -z "$KAFKA_OPTS" ]; then
  #KAFKA_OPTS=""
# 加入下面这一行
  KAFKA_OPTS="-Djava.security.auth.login.config=$base_dir/config/jaas.conf"
fi

配置第二个broker(路径:/usr/local/kafka/kafka-broker-2)

cd /usr/local/kafka
cp -rp kafka-broker-1 kafka-broker-2

修改zookeeper配置文件

zookeeper.properties

# 修改
# 数据文件与日志文件
dataDir=/usr/local/kafka/zookeeper/002/data
dataLogDir=/usr/local/kafka/zookeeper/002/log

# 修改
# 端口
clientPort=2182

# 服务端口,第一个是给客户端提供服务端口,后面一个是内部通讯端口,例如选举leader
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889

# SASL认证
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew=3600000
requireClientAuthScheme=sasl
zookeeper.sasl.client=true

修改zookeeper启动命令

zookeeper-server-start.sh

# 修改
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/kafka-broker-2/config/zk_jaas.conf $KAFKA_OPTS"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain "$@"

修改kafka配置文件

server.properties

# 修改
broker.id=1

# 修改
listeners=SASL_PLAINTEXT://:9093

listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

# 修改
log.dirs=/usr/local/kafka/kafka/002/log

zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

配置第三个broker(路径:/usr/local/kafka/kafka-broker-3)

cd /usr/local/kafka
cp -rp kafka-broker-1 kafka-broker-3

修改zookeeper配置文件

zookeeper.properties

# 修改
# 数据文件与日志文件
dataDir=/usr/local/kafka/zookeeper/003/data
dataLogDir=/usr/local/kafka/zookeeper/003/log

# 修改
# 端口
clientPort=2183

# 服务端口,第一个是给客户端提供服务端口,后面一个是内部通讯端口,例如选举leader
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889

# SASL认证
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew=3600000
requireClientAuthScheme=sasl
zookeeper.sasl.client=true

修改zookeeper启动命令

zookeeper-server-start.sh

# 修改
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/kafka-broker-3/config/zk_jaas.conf $KAFKA_OPTS"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain "$@"

修改kafka配置文件

server.properties

# 修改
broker.id=2

# 修改
listeners=SASL_PLAINTEXT://:9094

listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

# 修改
log.dirs=/usr/local/kafka/kafka/003/log

zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

启动

zookeeper启动

# 启动命令
nohup /usr/local/kafka/kafka-broker-1/bin/zookeeper-server-start.sh /usr/local/kafka/kafka-broker-1/config/zookeeper.properties &
nohup /usr/local/kafka/kafka-broker-2/bin/zookeeper-server-start.sh /usr/local/kafka/kafka-broker-2/config/zookeeper.properties &
nohup /usr/local/kafka/kafka-broker-3/bin/zookeeper-server-start.sh /usr/local/kafka/kafka-broker-3/config/zookeeper.properties &

# 查看端口(正常应该有2181、3887、3888、3889和2887|2888|2889中的一个)
ss -lnput | egrep '2181|2887|3887|2888|3888|2889|3889'

kafka启动

# 启动命令
/usr/local/kafka/kafka-broker-1/bin/kafka-server-start.sh -daemon /usr/local/kafka/kafka-broker-1/config/server.properties
/usr/local/kafka/kafka-broker-2/bin/kafka-server-start.sh -daemon /usr/local/kafka/kafka-broker-2/config/server.properties
/usr/local/kafka/kafka-broker-3/bin/kafka-server-start.sh -daemon /usr/local/kafka/kafka-broker-3/config/server.properties

# 查看端口
ss -lnput | egrep '9092|9093|9094'

安装kafka-manager

# 安装yum-utils软件包
yum install -y yum-utils
# 设置docker仓库
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
# 安装docker
yum install docker-ce docker-ce-cli containerd.io
# 启动docker
systemctl start docker

# 安装kafka-manager
docker run -itd --rm  -p 9000:9000 -e ZK_HOSTS="172.30.129.14:2181" -e APPLICATION_SECRET=letmein sheepkiller/kafka-manager

访问http://xxx:9000/即可

Springboot项目集成

新建springboot项目,添加依赖

pom.xml

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.2</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<dependencies>

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
</dependencies>

添加配置

kafka-jaas.config

KafkaClient 
 org.apache.kafka.common.security.plain.PlainLoginModule required
 username="admin"
 password="admin";
;

application.properties

# 端口
server.port=8899

# 应用名称
spring.application.name=spring-boot-test

# kafka配置
# 生产者配置
spring.kafka.bootstrap-servers=xxx:9092,xxx:9093,xxx:9094
#发送失败后,重试次数,0表示不重试
spring.kafka.producer.retries=3
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 开启sasl认证
spring.kafka.producer.properties.sasl.mechanism=PLAIN
spring.kafka.producer.properties.security.protocol=SASL_PLAINTEXT

# 消费者配置
# 默认的消费组ID
spring.kafka.consumer.group-id=test-topic
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.auto-offset-reset=earliest
# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 开启sasl认证
spring.kafka.consumer.properties.sasl.mechanism=PLAIN
spring.kafka.consumer.properties.security.protocol=SASL_PLAINTEXT

添加kafka消费类

KafkaConsumer

@Component
public class KafkaConsumer 
    @KafkaListener(id="test-topic-consumer", topics = "test-topic")
    public void recieveMsg(ConsumerRecord<String, String> consumerRecord) 
        System.out.println("接收到消息:消息值:" + consumerRecord.value() + ",消息偏移量:" + consumerRecord.offset());
    

添加kafka生产者

KafkaController

@RestController
@RequestMapping(value = "kafka")
public class KafkaController 
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @PostMapping("sendMsg")
    public String sendMsg(String msg) 
        kafkaTemplate.send("test-topic", msg);
        return "success";
    

SpringBoot启动类

SpringBootTestApplication

@SpringBootApplication
public class SpringBootTestApplication 

    public static void main(String[] args) throws FileNotFoundException 
        // 启动时配置sasl认证
        final File file = ResourceUtils.getFile("classpath:kafka-jaas.conf");
        System.setProperty("java.security.auth.login.config", file.getAbsolutePath());
        SpringApplication.run(SpringBootTestApplication.class, args);
    


使用postman发送请求测试即可。

以上是关于伪分布式Kafka环境搭建与SpringBoot集成的主要内容,如果未能解决你的问题,请参考以下文章

Kafka 单机和伪分布式集群搭建

Kafka集群搭建与SpringBoot项目集成

kafka的环境搭建

kafka环境搭建 02kafka_2.11-2.4.1 基于 zookeeper 搭建高可用伪集群(一台服务器实现三个节点的 kafka 集群)

hadoop2.7.3伪分布式环境搭建详细安装过程

HDFS伪分布式环境搭建