Kafka 动态配置

Posted cpuCode

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka 动态配置相关的知识,希望对你有一定的参考价值。

Kafka 动态配置

动态配置 : 修改参数后,无需重启 Broker 就能生效

Kafka 1.1 后的 Dynamic Update Mode 列 :

  • read-only : 只有重启 Broker,才能生效
  • per-broker : 动态参数,修改后,就能生效
  • cluster-wide : 动态参数,修改后,整个集群范围内生效

动态 Broker :

  • 动态调整 Broker 端各种线程池大小,实时应对突发流量
  • 动态调整 Broker 端连接信息或安全配置信息
  • 动态更新 SSL Keystore 有效期
  • 动态调整 Broker 端 Compact 操作性能
  • 实时变更 JMX 指标收集器 (JMX Metrics Reporter)

配置保存

Kafka 将动态 Broker 参数保存在 ZooKeeper 中

  • changes : 实时监测动态参数变更的,不会保存参数值
  • topics : 保存 Kafka 主题级别参数的。虽然它们不属于动态 Broker 端参数,但其实它们也是能够动态变更的
  • users 和 clients 则是用于动态调整客户端配额(Quota)的 znode 节点。所谓配额,是指 Kafka 运维人员限制连入集群的客户端的吞吐量或者是限定它们使用的 CPU 资源

/config/brokers znode 保存动态 Broker 参数, 分两大类子节点 :

  • < default > : 保存 cluster-wide 范围的动态参数
  • broker.id : 保存特定 Broker 的 per-broker 范围参数

参数的优先级:

  • per-broker > cluster-wide > static > Kafka 默认值

持久化节点 : ephemeralOwner = 0x0 。ZooKeeper 重启,也不会丢失

配置

设置 cluster-wide 范围值 :

  • 指定 entity-default
bin/kafka-configs.sh \\
--bootstrap-server kafka-host:port \\
--entity-type brokers \\
--entity-default --alter --add-config unclean.leader.election.enable=true

设置 per-broker 范围参数 :

bin/kafka-configs.sh \\
--bootstrap-server kafka-host:port \\
--entity-type brokers \\
--entity-name 1 \\
--alter --add-config unclean.leader.election.enable=false

删除 cluster-wide 范围参数 :

bin/kafka-configs.sh \\
--bootstrap-server kafka-host:port \\
--entity-type brokers \\
--entity-default \\
--alter --delete-config unclean.leader.election.enable

删除 per-broker 范围参数 :

bin/kafka-configs.sh \\
--bootstrap-server kafka-host:port \\
--entity-type brokers \\
--entity-name 1 \\
--alter --delete-config unclean.leader.election.enable

查看 cluster-wide 范围参数 :

bin/kafka-configs.sh \\
--bootstrap-server kafka-host:port \\
--entity-type brokers \\
--entity-default --describe

查看 per-broker 范围参数 :

bin/kafka-configs.sh \\
--bootstrap-server kafka-host:port \\
--entity-type brokers \\
--entity-name 1 --describe

动态调整参数 :

  • log.retention.ms : 修改日志留存时间
  • num.io.threads : IO 处理线程数
  • num.network.threads : 网络处理线程数
  • ssl.keystore.type , ssl.keystore.location , ssl.keystore.password , ssl.key.password : 更新 Keystore
  • num.replica.fetchers : Follower 副本拉取速度

动态从zookeeper读取kafka信息

连接kafka时,经常遇到配置kafka连接信息连接失败,程序后台一直打印连接失败信息,或者由于连接不上kafka程序启动直接失败情况,考虑一种方案如下:

从从zookeeper中读取kafka集群信息,如果kafka集群信息中有配置的kafka连接信息,则说明kafka正常启动,已注册到zookeeper,可正常连接

贴上代码如下:

    public static void main(String[] args) throws Exception {
        getNodes();
    }


    public static void getNodes() throws Exception {
        CuratorFramework client = CuratorFrameworkFactory
                .newClient(connectString, 1000*60, 1000*15, new RetryNTimes(10,5000));
        client.start();//开始连接
        CuratorFrameworkState st = client.getState();
        System.out.println(st);
        List<String> children = client.getChildren().usingWatcher(new CuratorWatcher() {
            @Override
            public void process(WatchedEvent event) throws Exception {
                System.out.println("监控: " + event);
            }
        }).forPath("/brokers/ids");
        if (ValidateUtil.isNotEmpty(children)) {
            for (String id : children) {
                //String brokerInfo = new String(client.getData("/brokers/ids/" + id));
                System.out.println("current Id:"+id);
                //返回的字节数组
              byte[] o= client.getData().usingWatcher(new CuratorWatcher() {
                    @Override
                    public void process(WatchedEvent event) throws Exception {
                        LOGGER.info("触发watcher, path:{}", event.getPath());
                    }
                }).forPath("/brokers/ids/" + id);
                System.out.println("o:"+o);
                String b = new String(o);
                System.out.println("current node content:"+b);
            }

        }
        System.out.println(children);
        String result = client.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath("/test", "Data".getBytes());
        System.out.println(result);
        // 设置节点数据
        client.setData().forPath("/test", "111".getBytes());
        client.setData().forPath("/test", "222".getBytes());
        // 删除节点
        System.out.println(client.checkExists().forPath("/test"));
        client.delete().withVersion(-1).forPath("/test");
        System.out.println(client.checkExists().forPath("/brokers"));
        client.close();
        System.out.println("OK!");
        client.getCuratorListenable().addListener(new CuratorListener()
        {
            @Override
            public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception
            {
                System.out.println("事件: " + event);
            }
        });
        client.getConnectionStateListenable().addListener(new ConnectionStateListener()
        {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState)
            {
                System.out.println("连接状态事件: " + newState);
            }
        });
        client.getUnhandledErrorListenable().addListener(new UnhandledErrorListener()
        {
            @Override
            public void unhandledError(String message, Throwable e)
            {
                System.out.println("错误事件:" + message);
            }
        });
    }

参照资料:

1、动态从zookeeper中读取kafka集群

2、Spring Boot 使用 Curator 操作 ZooKeeper

3、Springboot2(29)集成zookeeper的增删改查、节点监听、分布式读写锁、分布式计数器

4、springboot搭建连接zookeeper

 

以上是关于Kafka 动态配置的主要内容,如果未能解决你的问题,请参考以下文章

kafka server.properties怎样配置

zookeeper+KAFKA 集群搭建

基于Kafka实现的Spring Cloud消息总线

docker 配置 kafka+zookeeper,golang接入示例

spring boot集成kafka开发,接收kafka消息,Java

spring boot集成kafka开发,接收kafka消息,Java