如何检查Apache Kafka服务运行状态
Posted 梦想画家
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何检查Apache Kafka服务运行状态相关的知识,希望对你有一定的参考价值。
客户端应用程序使用Kafka主要分为两种类型,即生产者和消费者。无论哪种都需要kafka服务处于运行状态,本文介绍几种方法检查kafka服务器的运行状态。
使用Zookeeper 命令
检查活动的kafka服务的最快方法之一是使用zookeeper的dump命令。dump是管理zookeeper服务器的一个4LW命令。
现在我们使用nc命令发送dump给zookeeper服务器,默认为2181端口:
$ echo dump | nc localhost 2181 | grep -i broker | xargs
/brokers/ids/0
执行上面命令,可以在zookeeper上看到已注册的临时代理ID,如果没有临时id存在,则说明没有代理节点正在运行。
需要提醒的是,dump命令需要显示配置才允许执行。需要在 zookeeper.properties 或 zoo.cfg增加白名单命令列表:
lw.commands.whitelist=dump
当然也可以是使用Zookeeper APIs 查看活动代理服务器。
使用Apache kafka AdminClient
如果生产者和消费者是java应用程序,那么可以使用Apache Kafka AdminClent类检查kafka服务器是否处于运行状态。
下面首先定义KafkaAdminClient类包装AdminClient实例,这样我们就能够快速进行测试:
public class KafkaAdminClient
private final AdminClient client;
public KafkaAdminClient(String bootstrap)
Properties props = new Properties();
props.put("bootstrap.servers", bootstrap);
props.put("request.timeout.ms", 3000);
props.put("connections.max.idle.ms", 5000);
this.client = AdminClient.create(props);
接下来在KafkaAdminClient类中定义verifyConnection() 方法验证客户端是否能连接上正在运行的kafka服务端:
public boolean verifyConnection() throws ExecutionException, InterruptedException
Collection<Node> nodes = this.client.describeCluster()
.nodes()
.get();
return nodes != null && nodes.size() > 0;
最后通过连接kafka集群测试代码:
@Test
void givenKafkaIsRunning_whenCheckedForConnection_thenConnectionIsVerified() throws Exception
boolean alive = kafkaAdminClient.verifyConnection();
assertThat(alive).isTrue();
使用kcat 工具
kcat,又称为 kafkacat,是一款非JVM的Kafka消息生产和消费的命令行工具,适用于 Kafka 0.8 及以上版本。ubuntu 上安装方式:
sudo apt-get update
sudo apt-get install kafkacat
可以通过kafkacat命令查看kafka服务运行状态,使用-L选项显示已经存在主题的元数据:
kafkacat -b localhost:29092 -t demo-topic -L
Metadata for demo-topic (from broker -1: localhost:29092/bootstrap):
3 brokers:
broker 2 at localhost:29092
broker 3 at localhost:39092
broker 1 at localhost:19092
1 topics:
topic "demo-topic" with 0 partitions: Broker: Leader not available (try again)
从输出信息可以看到正在运行的kafka代理。
使用UI工具
对于开发环境的验证应用我们可以使用UI工具,如Offset Explorer,但这种方法不建议在生产环境使用。下面使用Offset Explorer连接kafka集群,输入zookeeper主机和端口:
我们能在左侧面板上看到正在运行的代理服务,点击节点可以查看更多细节。
总结
本文介绍了几种命令行方法:zookeeper命令、kafka的AdminClient、kcat工具以及UI方法查看kafka服务的运行状态。
如何检查 Kafka Server 是不是正在运行?
【中文标题】如何检查 Kafka Server 是不是正在运行?【英文标题】:How to check whether Kafka Server is running?如何检查 Kafka Server 是否正在运行? 【发布时间】:2016-10-21 14:45:15 【问题描述】:我想在开始生产和消费作业之前确保 kafka 服务器是否正在运行。它在windows环境中,这是我在eclipse中的kafka服务器代码......
Properties properties = new Properties();
properties.setProperty("broker.id", "1");
properties.setProperty("port", "9092");
properties.setProperty("log.dirs", "D://workspace//");
properties.setProperty("zookeeper.connect", "localhost:2181");
Option<String> option = Option.empty();
KafkaConfig config = new KafkaConfig(properties);
KafkaServer kafka = new KafkaServer(config, new CurrentTime(), option);
kafka.startup();
在这种情况下if (kafka != null)
是不够的,因为它总是正确的。那么有什么方法可以知道我的 kafka 服务器正在运行并准备好用于生产者。我有必要检查一下,因为它会导致一些起始数据包丢失。
【问题讨论】:
【参考方案1】:必须为所有 Kafka 代理分配一个broker.id
。启动时,代理将在 Zookeeper 中创建一个临时节点,路径为/broker/ids/$id
。由于节点是短暂的,它将在代理断开连接后立即被删除,例如通过关闭。
您可以像这样查看临时代理节点的列表:
echo dump | nc localhost 2181 | grep brokers
ZooKeeper 客户端接口公开了许多命令; dump
列出集群的所有会话和临时节点。
注意,以上假设:
您在localhost
的默认端口 (2181
) 上运行 ZooKeeper,而 localhost
是集群的领导者
您的 zookeeper.connect
Kafka 配置没有为您的 Kafka 集群指定 chroot 环境,即它只是 host:port
而不是 host:port/path
【讨论】:
所以,这实际上检查zookeeper
是否有至少一个 kafka
连接。它不会测试 your kafka
是否正在运行。在OP的情况下它是正确的,但这是一个间接测试。可能必须研究在端口 9092 上可以做什么以进行直接测试。【参考方案2】:
我使用了AdminClient api。
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("connections.max.idle.ms", 10000);
properties.put("request.timeout.ms", 5000);
try (AdminClient client = KafkaAdminClient.create(properties))
ListTopicsResult topics = client.listTopics();
Set<String> names = topics.names().get();
if (names.isEmpty())
// case: if no topic found.
return true;
catch (InterruptedException | ExecutionException e)
// Kafka is not available
【讨论】:
这似乎不正确,因为它在只有一个代理启动时返回 true。 @Leon 你能详细说明你的评论吗? 如果您有 >=3 个代理且副本为 3,则如果 1 个代理启动而另一个 >=2 关闭,则代码返回 true。实际上它只需要 1 个 zk 节点,您可以在没有任何代理运行的情况下获取主题名称列表。 @Leon 我觉得这个答案很有价值。当然,您需要了解您实际“监控”的是什么。像这里一样探测 Zookeeper 和探测引导服务器将回答两组不同的问题。我认为从客户端的角度来看,连接到引导服务器是正确的做法。即使您应该对 Kafka 集群(ZK 和节点)进行专门的监控,但验证特定客户端是否可以实际连接到集群是有意义的。 这应该被标记为接受的答案!【参考方案3】:您可以在您的机器上安装 Kafkacat 工具
例如在 Ubuntu 上您可以使用安装它
apt-get install kafkacat
kafkacat 安装完成后,可以使用以下命令进行连接
kafkacat -b <your-ip-address>:<kafka-port> -t test-topic
用你的机器ip替换一旦你运行了上面的命令,如果 kafkacat 能够建立连接,那么这意味着 kafka 已经启动并运行了
【讨论】:
kafkacat -b localhost:9092 -L // 按照docs.confluent.io/platform/current/app-development/…,-L 显示Kafka集群的当前状态及其主题、分区、副本...【参考方案4】:对于 Linux,“ps aux | grep kafka”查看结果中是否显示 kafka 属性。例如。 /path/to/kafka/server.properties
【讨论】:
他们在 Windows 中没有指定它吗?【参考方案5】:Paul 的回答非常好,从经纪人的角度来看,这实际上是 Kafka 和 Zk 如何协同工作的。
我想说检查 Kafka 服务器是否正在运行的另一个简单方法是创建一个简单的 KafkaConsumer 指向集群并尝试一些操作,例如 listTopics()。如果 kafka 服务器没有运行,你会得到一个TimeoutException,然后你可以使用try-catch
语句。
def validateKafkaConnection(kafkaParams : mutable.Map[String, Object]) : Unit =
val props = new Properties()
props.put("bootstrap.servers", kafkaParams.get("bootstrap.servers").get.toString)
props.put("group.id", kafkaParams.get("group.id").get.toString)
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val simpleConsumer = new KafkaConsumer[String, String](props)
simpleConsumer.listTopics()
【讨论】:
我只启动了zookeeper而不是Kafka,然后执行了kafka-topics.bat --list
,我得到了所有的主题。似乎只列出主题无法验证 Kafka 是否正在运行。
@MohammadFaisal 该命令将从 Zookeeper (github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/…) 获取信息元数据。我提供的解决方案是从 Kafka Cluster 获取信息(如果你按照源代码,你会得到这个:github.com/apache/kafka/blob/0.10.0/clients/src/main/java/org/…)。那是因为当 Kafka 集群关闭时,您没有收到任何错误。【参考方案6】:
最好的选择是在开始生成或使用消息之前使用AdminClient,如下所示
private static final int ADMIN_CLIENT_TIMEOUT_MS = 5000;
try (AdminClient client = AdminClient.create(properties))
client.listTopics(new ListTopicsOptions().timeoutMs(ADMIN_CLIENT_TIMEOUT_MS)).listings().get();
catch (ExecutionException ex)
LOG.error("Kafka is not available, timed out after ms", ADMIN_CLIENT_TIMEOUT_MS);
return;
【讨论】:
请注意,AdminClient 仅从 0.11 版本开始可用【参考方案7】:首先你需要创建 AdminClient bean:
@Bean
public AdminClient adminClient()
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
StringUtils.arrayToCommaDelimitedString(new Object[]"your bootstrap server address));
return AdminClient.create(configs);
然后,你可以使用这个脚本:
while (true)
Map<String, ConsumerGroupDescription> groupDescriptionMap =
adminClient.describeConsumerGroups(Collections.singletonList(groupId))
.all()
.get(10, TimeUnit.SECONDS);
ConsumerGroupDescription consumerGroupDescription = groupDescriptionMap.get(groupId);
log.debug("Kafka consumer group () state: ",
groupId,
consumerGroupDescription.state());
if (consumerGroupDescription.state().equals(ConsumerGroupState.STABLE))
boolean isReady = true;
for (MemberDescription member : consumerGroupDescription.members())
if (member.assignment() == null || member.assignment().topicPartitions().isEmpty())
isReady = false;
if (isReady)
break;
log.debug("Kafka consumer group () is not ready. Waiting...", groupId);
TimeUnit.SECONDS.sleep(1);
此脚本将每秒检查消费者组的状态,直到状态为STABLE。因为所有消费者都分配到主题分区,所以您可以断定服务器正在运行并准备就绪。
【讨论】:
【参考方案8】:如果服务器正在运行,您可以使用以下代码检查可用的代理。
import org.I0Itec.zkclient.ZkClient;
public static boolean isBrokerRunning()
boolean flag = false;
ZkClient zkClient = new ZkClient(endpoint.getZookeeperConnect(), 10000);//, kafka.utils.ZKStringSerializer$.MODULE$);
if(zkClient!=null)
int brokersCount = zkClient.countChildren(ZkUtils.BrokerIdsPath());
if(brokersCount > 0)
logger.info("Following Broker(s) is/are available on Zookeeper.",zkClient.getChildren(ZkUtils.BrokerIdsPath()));
flag = true;
else
logger.error("ERROR:No Broker is available on Zookeeper.");
zkClient.close();
return flag;
【讨论】:
这里,如果我已经有多个代理在运行,那么会返回错误的结果。您的解决方案可能适用于单一代理环境。 是的,对于单一经纪人来说,这是完美的。顺便说一句,您没有提到多个。让我试一试。您可以检查任何一个经纪人是否已启动,不要检查其他经纪人,因此只要任何经纪人已启动,您就可以跳过其他经纪人。另一方面,您仍然需要检查最后运行的代理,因为在非常不幸的情况下,前 N 个代理可能已关闭。【参考方案9】:我在confluent Kafka中发现了一个事件OnError
:
consumer.OnError += Consumer_OnError;
private void Consumer_OnError(object sender, Error e)
Debug.Log("connection error: "+ e.Reason);
ConsumerConnectionError(e);
及其代码文档:
//
// Summary:
// Raised on critical errors, e.g. connection failures or all brokers down. Note
// that the client will try to automatically recover from errors - these errors
// should be seen as informational rather than catastrophic
//
// Remarks:
// Executes on the same thread as every other Consumer event handler (except OnLog
// which may be called from an arbitrary thread).
public event EventHandler<Error> OnError;
【讨论】:
以上是关于如何检查Apache Kafka服务运行状态的主要内容,如果未能解决你的问题,请参考以下文章
apache kafka监控系列-KafkaOffsetMonitor
kafka学习,kafka踩坑,使用java项目连接kafka发送消息报错连接超时问题
Apache-Kafka-Connect , Confluent-HDFS-Connector , Unknown-magic-byte