kakfa从入门到放弃: kafka入门,环境搭建,命令行操作
Posted 浅弋、璃鱼
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kakfa从入门到放弃: kafka入门,环境搭建,命令行操作相关的知识,希望对你有一定的参考价值。
文章目录
一、消息队列:
1. 消息队列:
消息队列, 英文名: Message Queue, 常缩写为: MQ;
消息队列是一种用来存储消息的队列;
很多时候消息队列不是一个永久性的存储, 是作为临时存储的
// 1. 创建一个保存字符串的队列
strQueue := make(chan string, 20)
// 2. 往消息队列里放入消息
strQueue <- "hello"
// 3. 从消息队列里取出数据
fmt.Println(<- strQueue)
2. 消息队列中间:
消息队列中间件就是用来存储消息的软件(组件);
常用的消息中间件: kafka, RabbitMQ, ActiveMQ, RocketMQ, ZeroMQ等;
3. 消息队列的应用场景:
-
异步处理:
- 可以将一些比较耗时的操作放在其他系统中, 通过消息队列将需要进行处理的消息进行存储, 其他系统可以消费消息队列中的数据;
- 比较常见: 发送短信, 发送邮件;
-
系统解耦:
- 原先一个微服务通过接口(http)调用另一个微服务, 此时耦合很严重, 只要接口发生变化就会导致不可用;
- 使用消息队列可以将系统进行解耦合, 现在第一个微服务可以将消息放到消息队列中, 另一个服务从消息队列中取出数据再进行处理;
-
流量削峰;
- 消息队列是低延迟, 高可靠, 高吞吐的, 可以应对大量并发;
- 消息队列是低延迟, 高可靠, 高吞吐的, 可以应对大量并发;
-
日志处理: (大数据领域常见)
- 用户行为分析:
- 用户行为分析:
4. 生产者, 消费者模型:
5. 消息队列的两种模式:
5.1 点对点模式
消息发送者生产消息发送到消息队列中, 然后消息接收者从消息队列中取出来并且消费消息. 消息被消费后, 消息队列不再有存储, 所以消息接收者不可能消费到已经被消费的消息;
点对点模式的特点:
- 每个消息只有一个接收者(Consumer), 一旦被消费后, 消息就不再在消息队列中;
- 发送者和接收者之间没有依赖, 发送者发送消息后, 不管有没有接收者, 都不会影响到发送者发送下一条消息;
- 接收者在成功接收消息后需向队列应答成功, 以便消息队列删除当前接收的消息;
5.2 发布订阅模式
发布/订阅模式特点:
- 每个消息可以有多个订阅者;
- 发布者和订阅者之间有时间上的依赖; 针对某个主题(topic)的订阅者, 它必须创建订阅者之后, 才能消费发布者的消息;
- 为了消费消息, 订阅者需要提前订阅该角色, 并保持在线运行;
二、Kafka简介:
1. 什么是kafka
Kafka是由Apache软件基金会开发的一个开源流平台, 由Scala和java编写;
- 发布和订阅数据流, 类似于消息队列或者是企业消息传递系统;
- 以为容错的持久化方式存储数据流;
- 处理数据流; – kafka streams
2, 应用场景:
- 建立实时数据通道, 以可靠地在系统或应用之间获取数据;
- 构建实时流应用程序, 以转换或响应数据流;
其中: - Producers: 可以有很多的应用程序, 将消息数据放入到kafka集群中;
- Consumers: 可以有很多的应用程序, 将消息数据从kafka集群中拉取出来;
- Connectors: kafka的连接器可以将数据库中的数据导入到kafka, 也可以将kafka的数据导出到数据库中;
- Stream Processors: 流处理器可以从kafka拉取数据, 也可以将数据写入到kafka;
特性 | ActiveMQ | RabbitMQ | Kafka | RocketMQ |
---|---|---|---|---|
所属 | Apache | Moailla pubilc license | Apache | Apache/Ali |
成熟度 | 成熟 | 成熟 | 成熟 | 比较成熟 |
生产者-消费者模式 | √ | √ | √ | √ |
Request-Reply(请求-响应模式) | √ | √ | x | √ |
Api完备度 | 高 | 高 | 高 | 低(静态配置) |
多语言支持 | 支持java优先 | 语言无关 | 支持, java优先 | 支持 |
单击吞吐量 | 万级(最差) | 万级 | 十万级 | 十万级(最多) |
消息延迟 | 微秒级 | 毫秒级 | ||
可用性 | 高(主从)高(主从) | 非常高(分布式) | 高 | |
消息丢失 | 低 | 理论上不会丢失 | ||
消息重复 | 控制 | 理论上不会重复 | ||
事务 | √ | x | √ | √ |
文档完备 | 高 | 高 | 高 | 中 |
首次部署难度 | 低 | 中 | 高 |
3. kafka生态圈
生态圈官网: https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem
三、kafka环境部署
kafka版本号: kafka_2.12-2.4.1
其中: 2.12 是scala的版本; 2.4.1 是kafka版本;
发版记录: https://kafka.apache.org/downloads
1. 搭建集群环境
准备kafka安装包
tar -xvzf kafka_2.12-2.4.1.tgz -C ./
cd ./kafka_2.12-2.4.1/
修改server.properties
cd ./kafka_2.12-2.4.1/config
vi server.properties
# 指定broker的id
broker.id=0
# 指定kafka数据的位置
log.dirs=/path/to/kafka/data
将安装好的kafka复制到另外的服务上, 并修改broker.id=n
配置KAFKA_HOME环境变量:
/etc/profile
export KAFKA_HOME=/path/to/kafka/
export PATH=:$PATH:$KAFKA_HOME
source /etc.portfile
启动服务器:
# 启动zookeeper
nohup bin/zookeeper-server-start.sh config/zookeep.properties &
# 启动kafka
cd /path/to/kafka/
nohup bin/kafka-server-start.sh config/server.properties &
# 测试kafka集群是否启动成功
bin/kafka-topic.sh --bootstrap-server localhost:9092 --list
2.目录结构
目录名称 | 说明 |
---|---|
bun | kafka的所有执行脚本都在这里; 例如: 启动kafka服务, 创建topic, 生产者, 消费者 |
config | 所有配置文件 |
libs | 运行kafka所需要的所有jar包 |
logs | kafak的所有日志文件 |
site-docs | 帮助文档 |
3.onekey脚本
准备集群节点列表文件
/export/onekey/slave
192.168.100.1
192.168.100.2
192.168.100.3
3.1 一键启动
cat /export/onekey/slave | while read line
do
echo $line
ssh $line "source /etc/profile; export JMX_PORT=9988; nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties >/dev/nul* 2>&1 &" &
&
wait
done
3.2 一键关闭
cat /export/onekey/slave | while read line
do
echo $line
ssh $line "source /etc/profile;; jps |grep kafka | cut -d ' ' -f1 | xargs kill -s 9"
&
wait
done
4. docker启动:
相关文档: https://hub.docker.com/r/bitnami/kafka
./docker-compose.yml
version: "2"
services:
zookeeper:
image: docker.io/bitnami/zookeeper:latest
ports:
- "2181:2181"
volumes:
- "zookeeper_data:/bitnami"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: docker.io/bitnami/kafka:latest
ports:
# - "9092:9092"
- "9093:9093"
volumes:
- "kafka_data:/bitnami"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- zookeeper
volumes:
zookeeper_data:
driver: local
kafka_data:
driver: local
四、基础操作:
1. 创建topic
创建一个topic, kafka中所有的消息都是保存在主题中, 要生产消息到kafka, 首先必须要有一个确定的主题:
# 创建名为test的topic
bin/kafka-topic.sh --create --bootstrap-server host:9092 --topic test
# 查看目前kafka中的主题
bin/kafka-topic.sh --list --bootstrap-server host:9092
2. 生产/消费消息
使用kafka内置的测试, 生产一些消息到kafka的test topic中
bin/kafka-console-producer.sh --broker-list host:9092 --topic test
使用下边的命令来消费test topic中的消息
bin/fakfa-console-consumer.sh --bootstrap-server host:9092 --topic test --from-beginning
3. 使用kafka tools操作kafka
官方网站: kafkaTools
- 浏览kafka集群节点, 多少个topic, 多少个分区;
- 创建topic/ 删除topic;
- 浏览zookeeper中的数据;
4. 基准测试;
基准测试(benchmark testing)是这一种测量和评估软件性能指标的活动; 我们可以通过基准测试了解到软件、硬件的性能水平; 主要测试负载的执行时间, 传输速度, 吞吐量,资源占用等;
4.1 基于1个分区1个副本的基准测试
4.1.1 测试步骤:
- 启动kafka集群;
- 创建一个分区一个副本的tocip benchmark
- 同时运行生产者, 消费者基准测试程序;
- 观察结果;
创建topic:
bin/kafka-topic.sh --zookeeper host:2081 --create --topic benchmark --partitions 1 --replication-factor 1
生产者基准测试
在生产环境中, 推荐使用生产5000w消息, 这样性能数据会更准确些;
bin/kafka-producer-perf-test.sh --topic benchmark --num-records 50000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=host1:9091,host2:9092,host3:9092 acks=1
bin/kafka-producer-perf-test.sh
–topic topic的名字
–num-records 总共指定生产数据量 (默认5000w)
–throughput 指定吞吐量–限流 (-1:不指定)
–record-size record数据大小(字节)
–producer-props 生产者的一些配置
bootstrap.servers=host1:9091,host2:9092,host3:9092 指定kafak集群
acks=1 ACK模式
测试结果:
吞吐量 | 93092.533979 records/sec (每秒9.3W条记录) |
---|---|
吞吐速率 | (88.78 MB/sec)每秒约89MB数据 |
平均延迟时间 | 346.62 ms avg latency |
最大延迟时间 | 1003.00 ms max latency |
消费者基准测试
bin/kafka-consumer-perf-test.sh --broker-list host1:9091,host2:9092,host3:9092 ---topic benchmark --f etch-size 1048576 --massage 50000000
bin/kafka-consumer-perf-test.sh
–broker-list host1:9091,host2:9092,host3:9092 指定kafka集群地址
—topic benchmark 指定topic的名称
–fetch-size 1048576 每次拉取的数据大小
–massage 50000000 总共要消费的消费数量
data.consumed.in.MB(共计消费的数据) | 4768.3716MB |
---|---|
(MB.sec)每秒消费的字节数量 | 445.6006(每秒445MB) |
data.consumed.in.nMsg(共计消费的数量) | 5000000 |
nMsg.sec(每秒的消息数量) | 467246.0518(每秒46.7W条) |
4.2 基于3个分区1个副本的基准测试
4.3 基于1个分区3个副本的基准测试
以上是关于kakfa从入门到放弃: kafka入门,环境搭建,命令行操作的主要内容,如果未能解决你的问题,请参考以下文章