kakfa从入门到放弃: kafka入门,环境搭建,命令行操作

Posted 浅弋、璃鱼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kakfa从入门到放弃: kafka入门,环境搭建,命令行操作相关的知识,希望对你有一定的参考价值。

文章目录

一、消息队列:

1. 消息队列:

消息队列, 英文名: Message Queue, 常缩写为: MQ;
消息队列是一种用来存储消息的队列;
很多时候消息队列不是一个永久性的存储, 是作为临时存储的

// 1. 创建一个保存字符串的队列
strQueue := make(chan string, 20)

// 2. 往消息队列里放入消息
strQueue <- "hello"

// 3. 从消息队列里取出数据
fmt.Println(<- strQueue)

2. 消息队列中间:

消息队列中间件就是用来存储消息的软件(组件);
常用的消息中间件: kafka, RabbitMQ, ActiveMQ, RocketMQ, ZeroMQ等;

3. 消息队列的应用场景:

  • 异步处理:

    • 可以将一些比较耗时的操作放在其他系统中, 通过消息队列将需要进行处理的消息进行存储, 其他系统可以消费消息队列中的数据;
    • 比较常见: 发送短信, 发送邮件;
  • 系统解耦:

    • 原先一个微服务通过接口(http)调用另一个微服务, 此时耦合很严重, 只要接口发生变化就会导致不可用;
    • 使用消息队列可以将系统进行解耦合, 现在第一个微服务可以将消息放到消息队列中, 另一个服务从消息队列中取出数据再进行处理;
  • 流量削峰;

    • 消息队列是低延迟, 高可靠, 高吞吐的, 可以应对大量并发;
  • 日志处理: (大数据领域常见)

    • 用户行为分析:

4. 生产者, 消费者模型:

5. 消息队列的两种模式:

5.1 点对点模式


消息发送者生产消息发送到消息队列中, 然后消息接收者从消息队列中取出来并且消费消息. 消息被消费后, 消息队列不再有存储, 所以消息接收者不可能消费到已经被消费的消息;
点对点模式的特点:

  • 每个消息只有一个接收者(Consumer), 一旦被消费后, 消息就不再在消息队列中;
  • 发送者和接收者之间没有依赖, 发送者发送消息后, 不管有没有接收者, 都不会影响到发送者发送下一条消息;
  • 接收者在成功接收消息后需向队列应答成功, 以便消息队列删除当前接收的消息;

5.2 发布订阅模式


发布/订阅模式特点:

  • 每个消息可以有多个订阅者;
  • 发布者和订阅者之间有时间上的依赖; 针对某个主题(topic)的订阅者, 它必须创建订阅者之后, 才能消费发布者的消息;
  • 为了消费消息, 订阅者需要提前订阅该角色, 并保持在线运行;

二、Kafka简介:

1. 什么是kafka


Kafka是由Apache软件基金会开发的一个开源流平台, 由Scala和java编写;

  • 发布和订阅数据流, 类似于消息队列或者是企业消息传递系统;
  • 以为容错的持久化方式存储数据流;
  • 处理数据流; – kafka streams

2, 应用场景:

  • 建立实时数据通道, 以可靠地在系统或应用之间获取数据;
  • 构建实时流应用程序, 以转换或响应数据流;

    其中:
  • Producers: 可以有很多的应用程序, 将消息数据放入到kafka集群中;
  • Consumers: 可以有很多的应用程序, 将消息数据从kafka集群中拉取出来;
  • Connectors: kafka的连接器可以将数据库中的数据导入到kafka, 也可以将kafka的数据导出到数据库中;
  • Stream Processors: 流处理器可以从kafka拉取数据, 也可以将数据写入到kafka;
特性ActiveMQRabbitMQKafkaRocketMQ
所属ApacheMoailla pubilc licenseApacheApache/Ali
成熟度成熟成熟成熟比较成熟
生产者-消费者模式
Request-Reply(请求-响应模式)x
Api完备度低(静态配置)
多语言支持支持java优先语言无关支持, java优先支持
单击吞吐量万级(最差)万级十万级十万级(最多)
消息延迟微秒级毫秒级
可用性高(主从)高(主从)非常高(分布式)
消息丢失理论上不会丢失
消息重复控制理论上不会重复
事务x
文档完备
首次部署难度

3. kafka生态圈

生态圈官网: https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem

三、kafka环境部署

kafka版本号: kafka_2.12-2.4.1
其中: 2.12 是scala的版本; 2.4.1 是kafka版本;
发版记录: https://kafka.apache.org/downloads

1. 搭建集群环境

准备kafka安装包

tar -xvzf kafka_2.12-2.4.1.tgz -C ./
cd ./kafka_2.12-2.4.1/

修改server.properties

cd ./kafka_2.12-2.4.1/config
vi server.properties
# 指定broker的id
broker.id=0
# 指定kafka数据的位置
log.dirs=/path/to/kafka/data

将安装好的kafka复制到另外的服务上, 并修改broker.id=n

配置KAFKA_HOME环境变量:
/etc/profile

export KAFKA_HOME=/path/to/kafka/
export PATH=:$PATH:$KAFKA_HOME

source /etc.portfile

启动服务器:

# 启动zookeeper
nohup bin/zookeeper-server-start.sh config/zookeep.properties &

# 启动kafka
cd /path/to/kafka/
nohup bin/kafka-server-start.sh config/server.properties &

# 测试kafka集群是否启动成功
bin/kafka-topic.sh --bootstrap-server localhost:9092 --list

2.目录结构

目录名称说明
bunkafka的所有执行脚本都在这里; 例如: 启动kafka服务, 创建topic, 生产者, 消费者
config所有配置文件
libs运行kafka所需要的所有jar包
logskafak的所有日志文件
site-docs帮助文档

3.onekey脚本

准备集群节点列表文件
/export/onekey/slave

192.168.100.1
192.168.100.2
192.168.100.3

3.1 一键启动

cat /export/onekey/slave | while read line
do

	echo $line
	ssh $line "source /etc/profile; export JMX_PORT=9988; nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties >/dev/nul* 2>&1 &" &
&
wait
done

3.2 一键关闭

cat /export/onekey/slave | while read line
do

	echo $line
	ssh $line "source /etc/profile;; jps |grep kafka | cut -d ' ' -f1 | xargs kill -s 9"
&
wait
done

4. docker启动:

相关文档: https://hub.docker.com/r/bitnami/kafka
./docker-compose.yml

version: "2"

services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:latest
    ports:
      - "2181:2181"
    volumes:
      - "zookeeper_data:/bitnami"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: docker.io/bitnami/kafka:latest
    ports:
      # - "9092:9092"
      - "9093:9093"
    volumes:
      - "kafka_data:/bitnami"
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper

volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local

四、基础操作:

1. 创建topic

创建一个topic, kafka中所有的消息都是保存在主题中, 要生产消息到kafka, 首先必须要有一个确定的主题:

# 创建名为test的topic
bin/kafka-topic.sh --create --bootstrap-server host:9092 --topic test

# 查看目前kafka中的主题
bin/kafka-topic.sh --list --bootstrap-server host:9092

2. 生产/消费消息

使用kafka内置的测试, 生产一些消息到kafka的test topic中

bin/kafka-console-producer.sh --broker-list host:9092 --topic test

使用下边的命令来消费test topic中的消息

bin/fakfa-console-consumer.sh --bootstrap-server host:9092 --topic test --from-beginning

3. 使用kafka tools操作kafka

官方网站: kafkaTools

  • 浏览kafka集群节点, 多少个topic, 多少个分区;
  • 创建topic/ 删除topic;
  • 浏览zookeeper中的数据;

4. 基准测试;

基准测试(benchmark testing)是这一种测量和评估软件性能指标的活动; 我们可以通过基准测试了解到软件、硬件的性能水平; 主要测试负载的执行时间, 传输速度, 吞吐量,资源占用等;

4.1 基于1个分区1个副本的基准测试

4.1.1 测试步骤:

  • 启动kafka集群;
  • 创建一个分区一个副本的tocip benchmark
  • 同时运行生产者, 消费者基准测试程序;
  • 观察结果;

创建topic:

bin/kafka-topic.sh --zookeeper host:2081 --create --topic benchmark --partitions 1 --replication-factor 1

生产者基准测试
在生产环境中, 推荐使用生产5000w消息, 这样性能数据会更准确些;

bin/kafka-producer-perf-test.sh --topic benchmark --num-records 50000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=host1:9091,host2:9092,host3:9092 acks=1

bin/kafka-producer-perf-test.sh
–topic topic的名字
–num-records 总共指定生产数据量 (默认5000w)
–throughput 指定吞吐量–限流 (-1:不指定)
–record-size record数据大小(字节)
–producer-props 生产者的一些配置
bootstrap.servers=host1:9091,host2:9092,host3:9092 指定kafak集群
acks=1 ACK模式


测试结果:

吞吐量93092.533979 records/sec (每秒9.3W条记录)
吞吐速率(88.78 MB/sec)每秒约89MB数据
平均延迟时间346.62 ms avg latency
最大延迟时间1003.00 ms max latency

消费者基准测试

bin/kafka-consumer-perf-test.sh --broker-list host1:9091,host2:9092,host3:9092  ---topic benchmark --f etch-size 1048576 --massage 50000000

bin/kafka-consumer-perf-test.sh
–broker-list host1:9091,host2:9092,host3:9092 指定kafka集群地址
—topic benchmark 指定topic的名称
–fetch-size 1048576 每次拉取的数据大小
–massage 50000000 总共要消费的消费数量

data.consumed.in.MB(共计消费的数据)4768.3716MB
(MB.sec)每秒消费的字节数量445.6006(每秒445MB)
data.consumed.in.nMsg(共计消费的数量)5000000
nMsg.sec(每秒的消息数量)467246.0518(每秒46.7W条)

4.2 基于3个分区1个副本的基准测试

4.3 基于1个分区3个副本的基准测试

以上是关于kakfa从入门到放弃: kafka入门,环境搭建,命令行操作的主要内容,如果未能解决你的问题,请参考以下文章

kakfa从入门到放弃: kafka入门

kakfa从入门到放弃: golang编程操作kafka

kakfa从入门到放弃: golang编程操作kafka

kakfa从入门到放弃: golang编程操作kafka

kakfa从入门到放弃: 基础操作

kakfa从入门到放弃: 相关概念,幂等性&事务