当 Kafka 从 Python 脚本启动时,kafka-server-stop.sh 不起作用

Posted

技术标签:

【中文标题】当 Kafka 从 Python 脚本启动时,kafka-server-stop.sh 不起作用【英文标题】:kafka-server-stop.sh not working when Kafka started from Python script 【发布时间】:2014-10-23 11:01:56 【问题描述】:

在远程节点上部署一些 Apache Kafka 实例后,我发现作为 Kafka 存档一部分的 kafka-server-stop.sh 脚本存在问题。

默认包含:

#!/bin/sh
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk 'print $1' | xargs kill -SIGTERM

如果我将 apache kafka 作为非后台进程执行,则此脚本效果很好,例如:

/var/lib/kafka/bin/kafka-server-start.sh /var/lib/kafka/config/server.properties

当我将它作为后台进程执行时它也可以工作:

/var/lib/kafka/bin/kafka-server-start.sh /var/lib/kafka/config/server.properties &

但是在我的远程节点上,我使用这个 python 脚本执行它(使用 Ansible):

#!/usr/bin/env python
import argparse
import os
import subprocess

KAFKA_PATH = "/var/lib/kafka/"

def execute_command_pipe_output(command_to_call):
  return subprocess.Popen(command_to_call, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

def execute_command_no_output(command_to_call):
  with open(os.devnull, "w") as null_file:
    return subprocess.Popen(command_to_call, stdout=null_file, stderr=subprocess.STDOUT)  

def start_kafka(args):
  command_to_call = ["nohup"]
  command_to_call += [KAFKA_PATH + "bin/zookeeper-server-start.sh"]
  command_to_call += [KAFKA_PATH + "config/zookeeper.properties"]

  proc = execute_command_no_output(command_to_call)

  command_to_call = ["nohup"]
  command_to_call += [KAFKA_PATH + "bin/kafka-server-start.sh"]
  command_to_call += [KAFKA_PATH + "config/server.properties"]

  proc = execute_command_no_output(command_to_call)

def stop_kafka(args):
  command_to_call = [KAFKA_PATH + "bin/kafka-server-stop.sh"]

  proc = execute_command_pipe_output(command_to_call)
  for line in iter(proc.stdout.readline, b''):
    print line,

  command_to_call = [KAFKA_PATH + "bin/zookeeper-server-stop.sh"]

  proc = execute_command_pipe_output(command_to_call)
  for line in iter(proc.stdout.readline, b''):
    print line,


if __name__ == "__main__":
  parser = argparse.ArgumentParser(description="Starting Zookeeper and Kafka instances")
  parser.add_argument('action', choices=['start', 'stop'], help="action to take")

  args = parser.parse_args()

  if args.action == 'start':
    start_kafka(args)
  elif args.action == 'stop':
    stop_kafka(args)
  else:
    parser.print_help()

执行后

manage-kafka.py start
manage-kafka.py stop

Zookeeper 已关闭(应该如此),但 Kafka 仍在运行。

更有趣的是,当我(手动)调用时

nohup /var/lib/kafka/bin/kafka-server-stop.sh

nohup /var/lib/kafka/bin/kafka-server-stop.sh &

kafka-server-stop.sh 正确关闭 Kafka 实例。我怀疑这个问题可能是由一些 Linux/Python 引起的。

【问题讨论】:

只是出于好奇:为什么要使用 python?为什么不使用 upstart 并进行类似“service kafka-broker start/stop”之类的 python 调用? 它是作为第一个解决方案创建的,我只是对为什么观察所描述的行为感兴趣。我将把它转移到主管进行管理,因为我们已经将它用于不同的应用程序。 你找到答案了吗? 【参考方案1】:

Kafka需要在zookeepers关闭之前完成关闭过程。

所以启动 zookeepers,然后 brokers 将重试关闭过程

我也遇到过类似的情况。问题是我的配置没有等待 kafka 代理关闭。希望这可以帮助某人。我花了一段时间才弄清楚...

【讨论】:

有机溶液。 很好的解决方案。我有同样的问题,这里的答案对我来说。使用 2.12 版【参考方案2】:

在想出一种粗鲁的方法来解决这个问题之前,我经常遇到这个问题。 所以发生的事情是 Kafka 突然关闭,但端口仍在使用中。

按照以下步骤操作:

    查找在该端口上运行的进程的进程 ID: lsof -t -i :YOUR_PORT_NUMBER 。 ##这是mac版 终止该进程 kill -9 process_id

【讨论】:

【参考方案3】:

在执行 kafka-zookeeper-stop.sh 管理工具之前,请先执行 kafka-server-stop.sh。它将首先断开服务器与 zookeeper 的连接,然后它会停止 zookeeper 本身。请等待 3-4 秒后再重新开始。

【讨论】:

【参考方案4】:

我的猜测:kafka-server-stop.sh 使用 shell 管道。所以 Popen 需要shell=True 参数。

见https://docs.python.org/2/library/subprocess.html#subprocess.Popen

【讨论】:

【参考方案5】:

kafka-server-stop.sh 中的命令更改为此解决了我的问题:

PIDS=$(ps axww | grep -i 'kafka\.Kafka' | grep java | grep -v grep | nawk 'print $1')

说明: 问题是kafka-server-stop.sh 使用以下命令来杀死 PIDS:

PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk 'print $1')

'ps' 80 列在终端上发出: 问题在于,ps ax 的输出没有显示命令的所有输出,因为它被截断为 xx 列(通常为 80 列,过去的默认终端宽度)。我的是 stty -a 中定义的 168 列。更改为 ps axww 确实如此,简而言之,这会扩大输出。

awk 输入记录长度问题: 另一个问题是 awk 有一个 Characters per input record limitation of 3000 chars,如 here 所述。相反,nawk 不受C long 值的限制。 gawk 也可以。

缺点是我正在修改一个核心脚本,在升级过程中可能会被覆盖。它很快而且可能,但它为我完成了这项工作。

P.S 如果你有兴趣,我找到了一个 jira here。

【讨论】:

以上是关于当 Kafka 从 Python 脚本启动时,kafka-server-stop.sh 不起作用的主要内容,如果未能解决你的问题,请参考以下文章

当debezium连接器从你的sql服务器获取数据时,有没有办法限制kafka连接堆空间

filebeat + kafka +ELK集群实验

filebeat + kafka +ELK集群实验

filebeat + kafka +ELK集群实验

如何在kafka-python和confluent-kafka之间做出选择

filebeat + kafka +ELK集群实验