Kafka 动态配置
Posted cpuCode
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka 动态配置相关的知识,希望对你有一定的参考价值。
Kafka 动态配置
动态配置 : 修改参数后,无需重启 Broker 就能生效
Kafka 1.1 后的 Dynamic Update Mode 列 :
- read-only : 只有重启 Broker,才能生效
- per-broker : 动态参数,修改后,就能生效
- cluster-wide : 动态参数,修改后,整个集群范围内生效
动态 Broker :
- 动态调整 Broker 端各种线程池大小,实时应对突发流量
- 动态调整 Broker 端连接信息或安全配置信息
- 动态更新 SSL Keystore 有效期
- 动态调整 Broker 端 Compact 操作性能
- 实时变更 JMX 指标收集器 (JMX Metrics Reporter)
配置保存
Kafka 将动态 Broker 参数保存在 ZooKeeper 中
- changes : 实时监测动态参数变更的,不会保存参数值
- topics : 保存 Kafka 主题级别参数的。虽然它们不属于动态 Broker 端参数,但其实它们也是能够动态变更的
- users 和 clients 则是用于动态调整客户端配额(Quota)的 znode 节点。所谓配额,是指 Kafka 运维人员限制连入集群的客户端的吞吐量或者是限定它们使用的 CPU 资源
/config/brokers znode
保存动态 Broker 参数, 分两大类子节点 :
< default >
: 保存 cluster-wide 范围的动态参数broker.id
: 保存特定 Broker 的 per-broker 范围参数
参数的优先级:
- per-broker > cluster-wide > static > Kafka 默认值
持久化节点 : ephemeralOwner = 0x0
。ZooKeeper 重启,也不会丢失
配置
设置 cluster-wide 范围值 :
- 指定 entity-default
bin/kafka-configs.sh \\
--bootstrap-server kafka-host:port \\
--entity-type brokers \\
--entity-default --alter --add-config unclean.leader.election.enable=true
设置 per-broker 范围参数 :
bin/kafka-configs.sh \\
--bootstrap-server kafka-host:port \\
--entity-type brokers \\
--entity-name 1 \\
--alter --add-config unclean.leader.election.enable=false
删除 cluster-wide 范围参数 :
bin/kafka-configs.sh \\
--bootstrap-server kafka-host:port \\
--entity-type brokers \\
--entity-default \\
--alter --delete-config unclean.leader.election.enable
删除 per-broker 范围参数 :
bin/kafka-configs.sh \\
--bootstrap-server kafka-host:port \\
--entity-type brokers \\
--entity-name 1 \\
--alter --delete-config unclean.leader.election.enable
查看 cluster-wide 范围参数 :
bin/kafka-configs.sh \\
--bootstrap-server kafka-host:port \\
--entity-type brokers \\
--entity-default --describe
查看 per-broker 范围参数 :
bin/kafka-configs.sh \\
--bootstrap-server kafka-host:port \\
--entity-type brokers \\
--entity-name 1 --describe
动态调整参数 :
log.retention.ms
: 修改日志留存时间num.io.threads
: IO 处理线程数num.network.threads
: 网络处理线程数ssl.keystore.type
,ssl.keystore.location
,ssl.keystore.password
,ssl.key.password
: 更新 Keystorenum.replica.fetchers
: Follower 副本拉取速度
动态从zookeeper读取kafka信息
连接kafka时,经常遇到配置kafka连接信息连接失败,程序后台一直打印连接失败信息,或者由于连接不上kafka程序启动直接失败情况,考虑一种方案如下:
从从zookeeper中读取kafka集群信息,如果kafka集群信息中有配置的kafka连接信息,则说明kafka正常启动,已注册到zookeeper,可正常连接
贴上代码如下:
public static void main(String[] args) throws Exception { getNodes(); } public static void getNodes() throws Exception { CuratorFramework client = CuratorFrameworkFactory .newClient(connectString, 1000*60, 1000*15, new RetryNTimes(10,5000)); client.start();//开始连接 CuratorFrameworkState st = client.getState(); System.out.println(st); List<String> children = client.getChildren().usingWatcher(new CuratorWatcher() { @Override public void process(WatchedEvent event) throws Exception { System.out.println("监控: " + event); } }).forPath("/brokers/ids"); if (ValidateUtil.isNotEmpty(children)) { for (String id : children) { //String brokerInfo = new String(client.getData("/brokers/ids/" + id)); System.out.println("current Id:"+id); //返回的字节数组 byte[] o= client.getData().usingWatcher(new CuratorWatcher() { @Override public void process(WatchedEvent event) throws Exception { LOGGER.info("触发watcher, path:{}", event.getPath()); } }).forPath("/brokers/ids/" + id); System.out.println("o:"+o); String b = new String(o); System.out.println("current node content:"+b); } } System.out.println(children); String result = client.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath("/test", "Data".getBytes()); System.out.println(result); // 设置节点数据 client.setData().forPath("/test", "111".getBytes()); client.setData().forPath("/test", "222".getBytes()); // 删除节点 System.out.println(client.checkExists().forPath("/test")); client.delete().withVersion(-1).forPath("/test"); System.out.println(client.checkExists().forPath("/brokers")); client.close(); System.out.println("OK!"); client.getCuratorListenable().addListener(new CuratorListener() { @Override public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println("事件: " + event); } }); client.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { System.out.println("连接状态事件: " + newState); } }); client.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() { @Override public void unhandledError(String message, Throwable e) { System.out.println("错误事件:" + message); } }); }
参照资料:
1、动态从zookeeper中读取kafka集群
2、Spring Boot 使用 Curator 操作 ZooKeeper
3、Springboot2(29)集成zookeeper的增删改查、节点监听、分布式读写锁、分布式计数器
4、springboot搭建连接zookeeper
以上是关于Kafka 动态配置的主要内容,如果未能解决你的问题,请参考以下文章
docker 配置 kafka+zookeeper,golang接入示例