Kafka快速入门——Kafka集群部署
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka快速入门——Kafka集群部署相关的知识,希望对你有一定的参考价值。
Kafka快速入门(六)——Kafka集群部署
一、Kafka集群部署方案规划
1、操作系统选择
通常,生产环境应该将Kafka集群部署在Linux操作系统上,原因如下:
(1)Kafka客户端底层使用了Java的selector,selector在Linux上的实现机制是epoll,而在Windows平台上的实现机制是select,因此Kafka部署在Linux上能够获得更高效的I/O性能。
(2)网络传输效率的差别。Kafka需要在磁盘和网络间进行大量数据传输,在Linux部署Kafka能够享受到零拷贝(Zero Copy)技术所带来的快速数据传输特性。
(3)社区的支持度。Apache Kafka社区目前对Windows平台上发现的Kafka Bug不做任何承诺。
2、磁盘
(1)Kafka实现了冗余机制来提供高可靠性,并通过分区机制在软件层面实现负载均衡,因此Kafka的磁盘存储可以不使用磁盘阵列(RAID),使用普通磁盘组成存储空间即可。
(2)使用机械磁盘能够胜任Kafka线上环境,但SSD显然性能更好。
3、磁盘容量
规划磁盘容量时需要考虑:新增消息数、消息留存时间、平均消息大小、备份数、是否启用压缩等因素。
假设公司业务每天需要向Kafka集群发送100000000条消息,每条消息保存两份以防止数据丢失,消息默认保存7天时间,消息的平均大小是1KB,Kafka的数据压缩比是0.75。
每天100000000条1KB大小的消息,保存两份,压缩比0.75,占用空间大小就等于150GB(100000000*1KB*2/1000/1000*0.75
),考虑到Kafka集群的索引数据等,需要预留出10%的磁盘空间,因此每天总存储容量是165GB。数据留存7天,因此规划磁盘容量为1155GB(165GB*7
)。
4、网络带宽
假设公司的机房环境是千兆网络,即1Gbps,业务需要在1小时内处理1TB的业务数据。假设Kafka Broker会用到70%的带宽资源,超过70%的阈值可能网络丢包,单台Kafka Broker最多能使用大约700Mb的带宽资源,但通常需要再额外为其它服务预留出2/3的资源,即Kafka Broker可以为Kafka服务分配带宽240Mbps(700Mb/3)。1小时处理1TB数据,则每秒需要处理2336Mb(1024*1024*8/3600
)数据,除以240,约等于10台服务器。如果还需要额外复制两份,那么服务器台数还要乘以3,即30台。
二、Kafka集群参数配置
1、Broker端参数
Broker端参数也被称为静态参数(Static Configs),静态参数只能在Kafka的配置文件server.properties中进行设置,必须重启Broker进程才能生效。
log.dirs:指定Broker需要使用的若干个文件目录路径,没有默认值,必须指定。在生产环境中一定要为log.dirs配置多个路径,如果条件允许,需要保证目录被挂载到不同的物理磁盘上。优势在于,提升读写性能,多块物理磁盘同时读写数据具有更高的吞吐量;能够实现故障转移(Failover),Kafka 1.1版本引入Failover功能,坏掉磁盘上的数据会自动地转移到其它正常的磁盘上,而且Broker还能正常工作,基于Failover机制,Kafka可以舍弃RAID方案。
zookeeper.connect:CS格式参数,可以指定值为zk1:2181,zk2:2181,zk3:2181,不同Kafka集群可以指定:zk1:2181,zk2:2181,zk3:2181/kafka1,chroot只需要写一次。
listeners:设置内网访问Kafka服务的监听器。
advertised.listeners:设置外网访问Kafka服务的监听器。
auto.create.topics.enable:是否允许自动创建Topic。
unclean.leader.election.enable:是否允许Unclean Leader 选举。
auto.leader.rebalance.enable:是否允许定期进行Leader选举,生产环境中建议设置成false。
log.retention.{hours|minutes|ms}:控制一条消息数据被保存多长时间。优先级:ms设置最高、minutes次之、hours最低。
log.retention.bytes:指定Broker为消息保存的总磁盘容量大小。message.max.bytes:控制Broker能够接收的最大消息大小。
2、Topic级别参数
如果同时设置了Topic级别参数和全局Broker参数,Topic级别参数会覆盖全局Broker参数,而每个Topic都能设置自己的参数值。
生产环境中,应当允许不同部门的Topic根据自身业务需要,设置自己的留存时间。如果只能设置全局Broker参数,那么势必要提取所有业务留存时间的最大值作为全局参数值,此时设置Topic级别参数对Broker参数进行覆盖就是一个不错的选择。
retention.ms:指定Topic消息被保存的时长,默认是7天,只保存最近7天的消息,会覆盖掉Broker端的全局参数值。
retention.bytes:指定为Topic预留多大的磁盘空间。通常在多租户的Kafka集群中使用,默认值是 -1,表示可以无限使用磁盘空间。
max.message.bytes:指定Kafka Broker能够正常接收Topic 的最大消息大小。
Topic级别参数可以在创建Topic时进行设置,也可以在修改Topic 时设置,推荐在修改Topic时进行设置,Apache Kafka社区未来可能统一使用kafka-configs脚本来设置Topic级别参数。
3、JVM参数
Kafka 2.0.0版本已经正式摒弃对Java 7的支持。
Kafka Broker在与客户端进行交互时会在JVM堆上创建大量的Byte Buffer实例,因此JVM端设置的Heap Size不能太小,建议设置6GB。export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g
JVM端配置的一个重要参数是垃圾回收器的设置。对于Java 7,如果Broker所在机器的CPU资源非常充裕,建议使用CMS收集器。启用方法是指定-XX:+UseCurrentMarkSweepGC。否则,使用吞吐量收集器,开启方法是指定-XX:+UseParallelGC。对于Java 9,用默认的G1收集器,在没有任何调优的情况下,G1表现得要比CMS出色,主要体现在更少的Full GC,需要调整的参数更少等,所以使用G1就好。export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
4、操作系统参数
件描述符限制:ulimit -n。建议设置成一个超大的值,如ulimit -n 1000000。
文件系统类型:文件系统类型的选择。根据官网的测试报告,XFS 的性能要强于ext4。
Swappiness:推荐设置为一个较小值,如1。如果将swap设置为0,将会完全禁止Kafka Broker进程使用swap空间;当物理内存耗尽时,操作系统会触发OOM killer组件,随机挑选一个进程kill掉,不给用户任何预警。如果设置一个比较小值,当开始使用swap空间时,Broker性能会出现急剧下降,从而给进一步调优和诊断问题的时间。
提交时间:提交时间(Flush落盘时间)。向Kafka发送数据并不是真要等数据被写入磁盘才会认为成功,而是只要数据被写入到操作系统的页缓存(Page Cache)上就认为写入成功,随后操作系统根据LRU算法会定期将页缓存上的脏数据落盘到物理磁盘上。页缓存数据写入磁盘的周期由提交时间来确定,默认是5秒,可以适当地增加提交间隔来降低物理磁盘的写操作。如果在页缓存中的数据在写入到磁盘前机器宕机,数据会丢失,但鉴于Kafka在软件层面已经提供了多副本的冗余机制,拉大提交间隔换取性能是一个合理的做法。
三、Docker镜像选择
1、安装docker
安装Docker:sudo yum install docker
启动Docker:sudo systemctl start docker
docker版本检查:docker version
2、docker-compose安装
docker-compose下载:sudo curl -L https://github.com/docker/compose/releases/download/1.23.0-rc3/docker-compose-
uname -s-
uname -m-o /usr/local/bin/docker-compose
docker-compose安装:sudo chmod +x /usr/local/bin/docker-compose
docker-compose版本检查:docker-compose version
3、docker镜像选择
zookeeper镜像选择:docker search zookeeper
选择star最多的镜像:docker.io/zookeeper
Kafka镜像选择:docker search kafka
选择star最多的镜像:docker.io/wurstmeister/kafka
kafka-manager镜像选择:docker search kafka-manager
选择镜像:kafkamanager/kafka-manager
四、Kafka单机部署方案
1、编写docker-compose.yml文件
# 单机 zookeeper + kafka + kafka-manager集群
version: ‘2‘
services:
# 定义zookeeper服务
zookeeper-test:
image: zookeeper # zookeeper镜像
restart: always
hostname: zookeeper-test
ports:
- "12181:2181" # 宿主机端口:docker内部端口
container_name: zookeeper-test # 容器名称
# 定义kafka服务
kafka-test:
image: wurstmeister/kafka # kafka镜像
restart: always
hostname: kafka-test
ports:
- "9092:9092" # 对外暴露端口号
- "9999:9999" # 对外暴露JMX_PORT
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.0.105 #
KAFKA_ADVERTISED_PORT: 9092 #
KAFKA_ZOOKEEPER_CONNECT: zookeeper-test:2181 # zookeeper服务
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000 # zookeeper连接超时
KAFKA_LOG_CLEANUP_POLICY: "delete"
KAFKA_LOG_RETENTION_HOURS: 120 # 设置消息数据保存的最长时间为120小时
KAFKA_MESSAGE_MAX_BYTES: 10000000 # 消息体的最大字节数
KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000 #
KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000 #
KAFKA_NUM_PARTITIONS: 1 # 分区数量
KAFKA_DELETE_RETENTION_MS: 10000 #
KAFKA_BROKER_ID: 1 # kafka的ID
KAFKA_COMPRESSION_TYPE: lz4
KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.0.105 -Dcom.sun.management.jmxremote.rmi.port=9999" # 导入KAFKA_JMX_OPTS环境变量
JMX_PORT: 9999 # 导入JMX_PORT环境变量
depends_on:
- zookeeper-test # 依赖
container_name: kafka-test
# 定义kafka-manager服务
kafka-manager-test:
image: kafkamanager/kafka-manager # kafka-manager镜像
restart: always
container_name: kafka-manager-test
hostname: kafka-manager-test
ports:
- "9000:9000" # 对外暴露端口,提供web访问
depends_on:
- kafka-test # 依赖
environment:
ZK_HOSTS: zookeeper-test:2181 # 宿主机IP
KAFKA_BROKERS: kafka-test:9090 # kafka
KAFKA_MANAGER_AUTH_ENABLED: "true" # 开启安全认证
KAFKA_MANAGER_USERNAME: kafka-manager # Kafka Manager登录用户
KAFKA_MANAGER_PASSWORD: 123456 # Kafka Manager登录密码
需要确认相应端口是否被占用。
2、启动服务
创建kafka目录,将docker-compose.yml文件放入kafka目录,在kafka目录执行命令。
启动:docker-compose up -d
关闭:docker-compose down
3、kafka服务查看
进入docker容器:docker exec -it kafka /bin/bash
创建Topic:kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic test
查看Topic:kafka-topics.sh --list --zookeeper zookeeper:2181
生产消息:kafka-console-producer.sh --broker-list kafka:9092 --topic test
消费消息:kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test --from-beginning
打开两个Terminal,一个执行生产消息的命令,一个执行消费消息的命令,每生产一条消息时消费消息Terminal就会显示一条消息,实现消息队列。
4、Kafka版本查询
wurstmeister/kafka镜像中,kafka安装在/opt目录下,进入/opt目录,kafka_2.12-2.4.0目录即为kafka安装目录。
Scala版本:2.12
Kafka版本:2.4
5、kafka-manager监控
Web方式访问:http://127.0.0.1:9000
五、错误解决
1、容器删除失败
docker rm -f $(docker ps -a --filter status=dead -q |head -n 1)
报错信息:ERROR: for f78856fb92e9_zoo1 Driver overlay2 failed to remove root filesystem f78856fb92e97f75ff4c255077de544b39351a4a2a3319737ada2a54df568032: remove /var/lib/docker/overlay2/2c257b8071b6a3d79e216838522f76ba7263d466a470dc92cdbef25c4dd04dc3/merged: device or resource busy
grep docker /proc/*/mountinfo|grep containerid | awk -F ":" ‘{print $1}‘ | awk -F "/" ‘{print $3}‘
sudo kill -9 3119
2、kafka服务一直重启
报错信息:Error response from daemon: Container 9b3f9af8a1196f2ad3cf74fe2b1eeb7ccbd231fe2a93ec09f594d3a0fbb5783c is restarting, wait until the container is running
错误原因:
docker-compose.yml文件对kafka服务配置restart: always,如果kafka服务启动失败会一直重启,可以通过docker logs kafka查看kafka服务启动的日志信息,查找错误原因。
六、Kafka集群参数配置
############################# System ######################
# 唯一标识在集群中的ID,要求是正数。
broker.id = 0
# 服务端口,默认9092
port = 9092
# 监听地址,不设为所有地址
host.name = debugo01
# 处理网络请求的最大线程数
num.network.threads = 2
# 处理磁盘I/O的线程数
num.io.threads = 8
# 一些后台线程数
background.threads = 4
# 等待IO线程处理的请求队列最大数
queued.max.requests = 500
# socket的发送缓冲区(SO_SNDBUF)
socket.send.buffer.bytes = 1048576
# socket的接收缓冲区 (SO_RCVBUF)
socket.receive.buffer.bytes = 1048576
# socket请求的最大字节数。为了防止内存溢出,message.max.bytes必然要小于
socket.request.max.bytes = 104857600
############################# Topic ########################
# 每个topic的分区个数,更多的partition会产生更多的segment file
num.partitions = 2
# 是否允许自动创建topic ,若是false,就需要通过命令创建topic
auto.create.topics.enable = true
# 一个topic ,默认分区的replication个数 ,不能大于集群中broker的个数。
default.replication.factor = 1
# 消息体的最大大小,单位是字节
message.max.bytes = 1000000
############################# ZooKeeper ####################
# Zookeeper quorum设置。如果有多个使用逗号分割
zookeeper.connect = debugo01:2181, debugo02, debugo03
# 连接zk的超时时间
zookeeper.connection.timeout.ms = 1000000
# ZooKeeper集群中leader和follower之间的同步实际
zookeeper.sync.time.ms = 2000
############################# Log #########################
# 日志存放目录,多个目录使用逗号分割
log.dirs = / var / log / kafka
# 当达到下面的消息数量时,会将数据flush到日志文件中。默认10000
# log.flush.interval.messages=10000
# 当达到下面的时间(ms)时,执行一次强制的flush操作。interval.ms和interval.messages无论哪个达到,都会flush。默认3000ms
# log.flush.interval.ms=1000
# 检查是否需要将日志flush的时间间隔
log.flush.scheduler.interval.ms = 3000
# 日志清理策略(delete|compact)
log.cleanup.policy = delete
# 日志保存时间 (hours|minutes),默认为7天(168小时)。超过这个时间会根据policy处理数据。bytes和minutes无论哪个先达到都会触发。
log.retention.hours = 168
# 日志数据存储的最大字节数。超过这个时间会根据policy处理数据。
# log.retention.bytes=1073741824
# 控制日志segment文件的大小,超出该大小则追加到一个新的日志segment文件中(-1表示没有限制)
log.segment.bytes = 536870912
# 当达到下面时间,会强制新建一个segment
log.roll.hours = 24 * 7
# 日志片段文件的检查周期,查看它们是否达到了删除策略的设置(log.retention.hours或log.retention.bytes)
log.retention.check.interval.ms = 60000
# 是否开启压缩
log.cleaner.enable = false
# 对于压缩的日志保留的最长时间
log.cleaner.delete.retention.ms = 1
day
# 对于segment日志的索引文件大小限制
log.index.size.max.bytes = 10 * 1024 * 1024
# y索引计算的一个缓冲区,一般不需要设置。
log.index.interval.bytes = 4096
############################# replica #######################
# partition management controller 与replicas之间通讯的超时时间
controller.socket.timeout.ms = 30000
# controller-to-broker-channels消息队列的尺寸大小
controller.message.queue.size = 10
# replicas响应leader的最长等待时间,若是超过这个时间,就将replicas排除在管理之外
replica.lag.time.max.ms = 10000
# 是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker
controlled.shutdown.enable = false
# 控制器关闭的尝试次数
controlled.shutdown.max.retries = 3
# 每次关闭尝试的时间间隔
controlled.shutdown.retry.backoff.ms = 5000
# 如果relicas落后太多,将会认为此partition relicas已经失效。而一般情况下,因为网络延迟等原因,总会导致replicas中消息同步滞后。如果消息严重滞后,leader将认为此relicas网络延迟较大或者消息吞吐能力有限。在broker数量较少,或者网络不足的环境中,建议提高此值.
replica.lag.max.messages = 4000
# leader与relicas的socket超时时间
replica.socket.timeout.ms = 30 * 1000
# leader复制的socket缓存大小
replica.socket.receive.buffer.bytes = 64 * 1024
# replicas每次获取数据的最大字节数
replica.fetch.max.bytes = 1024 * 1024
# replicas同leader之间通信的最大等待时间,失败了会重试
replica.fetch.wait.max.ms = 500
# 每一个fetch操作的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会等待直到数据达到这个大小
replica.fetch.min.bytes = 1
# leader中进行复制的线程数,增大这个数值会增加relipca的IO
num.replica.fetchers = 1
# 每个replica将最高水位进行flush的时间间隔
replica.high.watermark.checkpoint.interval.ms = 5000
# 是否自动平衡broker之间的分配策略
auto.leader.rebalance.enable = false
# leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡
leader.imbalance.per.broker.percentage = 10
# 检查leader是否不平衡的时间间隔
leader.imbalance.check.interval.seconds = 300
# 客户端保留offset信息的最大空间大小
offset.metadata.max.bytes = 1024
#############################Consumer #####################
# Consumer端核心的配置是group.id、zookeeper.connect
# 决定该Consumer归属的唯一组ID,By setting the same group id multiple processes indicate that they are all part of the same consumer group.
group.id
# 消费者的ID,若是没有设置的话,会自增
consumer.id
# 一个用于跟踪调查的ID ,最好同group.id相同
client.id = < group_id >
# 对于zookeeper集群的指定,必须和broker使用同样的zk配置
zookeeper.connect = debugo01:2182, debugo02: 2182, debugo03: 2182
# zookeeper的心跳超时时间,查过这个时间就认为是无效的消费者
zookeeper.session.timeout.ms = 6000
# zookeeper的等待连接时间
zookeeper.connection.timeout.ms = 6000
# zookeeper的follower同leader的同步时间
zookeeper.sync.time.ms = 2000
# 当zookeeper中没有初始的offset时,或者超出offset上限时的处理方式 。
# smallest :重置为最小值
# largest:重置为最大值
# anything else:抛出异常给consumer
auto.offset.reset = largest
# socket的超时时间,实际的超时时间为max.fetch.wait + socket.timeout.ms.
socket.timeout.ms = 30 * 1000
# socket的接收缓存空间大小
socket.receive.buffer.bytes = 64 * 1024
# 从每个分区fetch的消息大小限制
fetch.message.max.bytes = 1024 * 1024
# true时,Consumer会在消费消息后将offset同步到zookeeper,这样当Consumer失败后,新的consumer就能从zookeeper获取最新的offset
auto.commit.enable = true
# 自动提交的时间间隔
auto.commit.interval.ms = 60 * 1000
# 用于消费的最大数量的消息块缓冲大小,每个块可以等同于fetch.message.max.bytes中数值
queued.max.message.chunks = 10
# 当有新的consumer加入到group时,将尝试reblance,将partitions的消费端迁移到新的consumer中, 该设置是尝试的次数
rebalance.max.retries = 4
# 每次reblance的时间间隔
rebalance.backoff.ms = 2000
# 每次重新选举leader的时间
refresh.leader.backoff.ms
# server发送到消费端的最小数据,若是不满足这个数值则会等待直到满足指定大小。默认为1表示立即接收。
fetch.min.bytes = 1
# 若是不满足fetch.min.bytes时,等待消费端请求的最长等待时间
fetch.wait.max.ms = 100
# 如果指定时间内没有新消息可用于消费,就抛出异常,默认-1表示不受限
consumer.timeout.ms = -1
#############################Producer######################
# 核心的配置包括:
# metadata.broker.list
# request.required.acks
# producer.type
# serializer.class
# 消费者获取消息元信息(topics, partitions and replicas)的地址,配置格式是:host1:port1,host2:port2,也可以在外面设置一个vip
metadata.broker.list
# 消息的确认模式
# 0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个server失败的情况下,有点像TCP
# 1:发送消息,并会等待leader 收到确认后,一定的可靠性
# -1:发送消息,等待leader收到确认,并进行复制操作后,才返回,最高的可靠性
request.required.acks = 0
# 消息发送的最长等待时间
request.timeout.ms = 10000
# socket的缓存大小
send.buffer.bytes = 100 * 1024
# key的序列化方式,若是没有设置,同serializer.class
key.serializer.class
# 分区的策略,默认是取模
partitioner.class =kafka.producer.DefaultPartitioner
# 消息的压缩模式,默认是none,可以有gzip和snappy
compression.codec = none
# 可以针对默写特定的topic进行压缩
compressed.topics = null
# 消息发送失败后的重试次数
message.send.max.retries = 3
# 每次失败后的间隔时间
retry.backoff.ms = 100
# 生产者定时更新topic元信息的时间间隔 ,若是设置为0,那么会在每个消息发送后都去更新数据
topic.metadata.refresh.interval.ms = 600 * 1000
# 用户随意指定,但是不能重复,主要用于跟踪记录消息
client.id = ""
# 异步模式下缓冲数据的最大时间。例如设置为100则会集合100ms内的消息后发送,这样会提高吞吐量,但是会增加消息发送的延时
queue.buffering.max.ms = 5000
# 异步模式下缓冲的最大消息数,同上
queue.buffering.max.messages = 10000
# 异步模式下,消息进入队列的等待时间。若是设置为0,则消息不等待,如果进入不了队列,则直接被抛弃
queue.enqueue.timeout.ms = -1
# 异步模式下,每次发送的消息数,当queue.buffering.max.messages或queue.buffering.max.ms满足条件之一时producer会触发发送。
batch.num.messages = 200
以上是关于Kafka快速入门——Kafka集群部署的主要内容,如果未能解决你的问题,请参考以下文章