伪分布式Kafka环境搭建与SpringBoot集成
Posted 迷途老码
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了伪分布式Kafka环境搭建与SpringBoot集成相关的知识,希望对你有一定的参考价值。
安装包下载
下载安装包
wget https://archive.apache.org/dist/kafka/2.2.0/kafka_2.12-2.2.0.tgz
# 解压
tar -zxf kafka_2.12-2.2.0.tgz
# 复制kafka
cp -rp kafka_2.12-2.2.0 kafka-broker-1
新建数据和日志路径
新建kafka目录
cd /usr/local/kafka
mkdir -p kafka/001/log
mkdir -p kafka/002/log
mkdir -p kafka/003/log
新建zookeeper目录
cd /usr/local/kafka
mkdir -p zookeeper/001/log
mkdir -p zookeeper/002/log
mkdir -p zookeeper/003/log
mkdir -p zookeeper/001/data
mkdir -p zookeeper/002/data
mkdir -p zookeeper/003/data
# 新建myid
cat 1 > zookeeper/001/data/myid
cat 2 > zookeeper/002/data/myid
cat 3 > zookeeper/003/data/myid
修改配置文件
第一个broker(路径:/usr/local/kafka/kafka-broker-1/config)
新增jaas.conf
KafkaServer
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin";
;
新增zk_jaas.conf
Server
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="admin"
user_admin="admin";
;
Client
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="admin";
;
修改zookeeper配置文件
zookeeper.properties
# 数据文件与日志文件
dataDir=/usr/local/kafka/zookeeper/001/data
dataLogDir=/usr/local/kafka/zookeeper/001/log
# 端口
clientPort=2181
# 服务端口,第一个是给客户端提供服务端口,后面一个是内部通讯端口,例如选举leader
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889
# SASL认证
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew=3600000
requireClientAuthScheme=sasl
zookeeper.sasl.client=true
修改zookeeper启动命令
zookeeper-server-start.sh
# 加入export这一行
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/kafka-broker-1/config/zk_jaas.conf $KAFKA_OPTS"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain "$@"
修改kafka配置文件
server.properties
broker.id=0
listeners=SASL_PLAINTEXT://:9092
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
log.dirs=/usr/local/kafka/kafka/001/log
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
修改kafka启动命令
kafka-run-class.sh
# Generic jvm settings you want to add
if [ -z "$KAFKA_OPTS" ]; then
#KAFKA_OPTS=""
# 加入下面这一行
KAFKA_OPTS="-Djava.security.auth.login.config=$base_dir/config/jaas.conf"
fi
配置第二个broker(路径:/usr/local/kafka/kafka-broker-2)
cd /usr/local/kafka
cp -rp kafka-broker-1 kafka-broker-2
修改zookeeper配置文件
zookeeper.properties
# 修改
# 数据文件与日志文件
dataDir=/usr/local/kafka/zookeeper/002/data
dataLogDir=/usr/local/kafka/zookeeper/002/log
# 修改
# 端口
clientPort=2182
# 服务端口,第一个是给客户端提供服务端口,后面一个是内部通讯端口,例如选举leader
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889
# SASL认证
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew=3600000
requireClientAuthScheme=sasl
zookeeper.sasl.client=true
修改zookeeper启动命令
zookeeper-server-start.sh
# 修改
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/kafka-broker-2/config/zk_jaas.conf $KAFKA_OPTS"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain "$@"
修改kafka配置文件
server.properties
# 修改
broker.id=1
# 修改
listeners=SASL_PLAINTEXT://:9093
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# 修改
log.dirs=/usr/local/kafka/kafka/002/log
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
配置第三个broker(路径:/usr/local/kafka/kafka-broker-3)
cd /usr/local/kafka
cp -rp kafka-broker-1 kafka-broker-3
修改zookeeper配置文件
zookeeper.properties
# 修改
# 数据文件与日志文件
dataDir=/usr/local/kafka/zookeeper/003/data
dataLogDir=/usr/local/kafka/zookeeper/003/log
# 修改
# 端口
clientPort=2183
# 服务端口,第一个是给客户端提供服务端口,后面一个是内部通讯端口,例如选举leader
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889
# SASL认证
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew=3600000
requireClientAuthScheme=sasl
zookeeper.sasl.client=true
修改zookeeper启动命令
zookeeper-server-start.sh
# 修改
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka/kafka-broker-3/config/zk_jaas.conf $KAFKA_OPTS"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain "$@"
修改kafka配置文件
server.properties
# 修改
broker.id=2
# 修改
listeners=SASL_PLAINTEXT://:9094
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# 修改
log.dirs=/usr/local/kafka/kafka/003/log
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
启动
zookeeper启动
# 启动命令
nohup /usr/local/kafka/kafka-broker-1/bin/zookeeper-server-start.sh /usr/local/kafka/kafka-broker-1/config/zookeeper.properties &
nohup /usr/local/kafka/kafka-broker-2/bin/zookeeper-server-start.sh /usr/local/kafka/kafka-broker-2/config/zookeeper.properties &
nohup /usr/local/kafka/kafka-broker-3/bin/zookeeper-server-start.sh /usr/local/kafka/kafka-broker-3/config/zookeeper.properties &
# 查看端口(正常应该有2181、3887、3888、3889和2887|2888|2889中的一个)
ss -lnput | egrep '2181|2887|3887|2888|3888|2889|3889'
kafka启动
# 启动命令
/usr/local/kafka/kafka-broker-1/bin/kafka-server-start.sh -daemon /usr/local/kafka/kafka-broker-1/config/server.properties
/usr/local/kafka/kafka-broker-2/bin/kafka-server-start.sh -daemon /usr/local/kafka/kafka-broker-2/config/server.properties
/usr/local/kafka/kafka-broker-3/bin/kafka-server-start.sh -daemon /usr/local/kafka/kafka-broker-3/config/server.properties
# 查看端口
ss -lnput | egrep '9092|9093|9094'
安装kafka-manager
# 安装yum-utils软件包
yum install -y yum-utils
# 设置docker仓库
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
# 安装docker
yum install docker-ce docker-ce-cli containerd.io
# 启动docker
systemctl start docker
# 安装kafka-manager
docker run -itd --rm -p 9000:9000 -e ZK_HOSTS="172.30.129.14:2181" -e APPLICATION_SECRET=letmein sheepkiller/kafka-manager
访问http://xxx:9000/即可
Springboot项目集成
新建springboot项目,添加依赖
pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
添加配置
kafka-jaas.config
KafkaClient
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin";
;
application.properties
# 端口
server.port=8899
# 应用名称
spring.application.name=spring-boot-test
# kafka配置
# 生产者配置
spring.kafka.bootstrap-servers=xxx:9092,xxx:9093,xxx:9094
#发送失败后,重试次数,0表示不重试
spring.kafka.producer.retries=3
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 开启sasl认证
spring.kafka.producer.properties.sasl.mechanism=PLAIN
spring.kafka.producer.properties.security.protocol=SASL_PLAINTEXT
# 消费者配置
# 默认的消费组ID
spring.kafka.consumer.group-id=test-topic
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.auto-offset-reset=earliest
# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
spring.kafka.consumer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 开启sasl认证
spring.kafka.consumer.properties.sasl.mechanism=PLAIN
spring.kafka.consumer.properties.security.protocol=SASL_PLAINTEXT
添加kafka消费类
KafkaConsumer
@Component
public class KafkaConsumer
@KafkaListener(id="test-topic-consumer", topics = "test-topic")
public void recieveMsg(ConsumerRecord<String, String> consumerRecord)
System.out.println("接收到消息:消息值:" + consumerRecord.value() + ",消息偏移量:" + consumerRecord.offset());
添加kafka生产者
KafkaController
@RestController
@RequestMapping(value = "kafka")
public class KafkaController
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@PostMapping("sendMsg")
public String sendMsg(String msg)
kafkaTemplate.send("test-topic", msg);
return "success";
SpringBoot启动类
SpringBootTestApplication
@SpringBootApplication
public class SpringBootTestApplication
public static void main(String[] args) throws FileNotFoundException
// 启动时配置sasl认证
final File file = ResourceUtils.getFile("classpath:kafka-jaas.conf");
System.setProperty("java.security.auth.login.config", file.getAbsolutePath());
SpringApplication.run(SpringBootTestApplication.class, args);
使用postman发送请求测试即可。
以上是关于伪分布式Kafka环境搭建与SpringBoot集成的主要内容,如果未能解决你的问题,请参考以下文章
kafka环境搭建 02kafka_2.11-2.4.1 基于 zookeeper 搭建高可用伪集群(一台服务器实现三个节点的 kafka 集群)