Kubernetes 多节点上的 Kafka

Posted

技术标签:

【中文标题】Kubernetes 多节点上的 Kafka【英文标题】:Kafka on Kubernetes multi-node 【发布时间】:2015-11-15 09:08:55 【问题描述】:

所以我的目标是以分布式方式建立一个由几个 kafka-brokers 组成的集群。但我看不到让经纪人相互了解的方法。

据我了解,每个代理都需要在其配置中使用单独的 ID,如果我从 kubernetes 启动容器,我无法保证或配置它?

他们还需要有相同的adverted_host?

是否有任何我遗漏的参数需要更改才能让节点相互发现?

在 Dockerfile 末尾使用脚本进行这样的配置是否可行?和/或共享卷?

我目前正在尝试在 vanilla Kubernetes 上使用具有预配置 zookeeper+kafka 组合的 spotify/kafka-image 来执行此操作。

【问题讨论】:

【参考方案1】:

对此我的解决方案是使用 IP 作为 ID:修剪点,您将获得一个唯一 ID,该 ID 在容器外也可用于其他容器。

使用服务,您可以访问多个容器的 IP(有关如何执行此操作,请参见我的回答: what's the best way to let kubenetes pods communicate with each other?

因此,如果您使用 IP 作为唯一 ID,您也可以获得他们的 ID。 唯一的问题是ID不是连续的或者从0开始,但是zookeeper / kafka似乎并不介意。

编辑 1:

后续关注配置 Zookeeper:

每个 ZK 节点都需要知道其他节点。 Kubernetes 发现服务知道 Service 中的节点,因此想法是使用 ZK 节点启动 Service

此服务需要在创建 Zookeeper pod 的 ReplicationController (RC) 之前启动。

ZK 容器的启动脚本需要:

等待发现服务用它的节点填充 ZK 服务(这需要几秒钟,现在我只是在我的启动脚本的开头添加了一个 sleep 10 但更可靠的是你应该寻找服务其中至少有 3 个节点。) 在发现服务中查找构成服务的容器: 这是通过查询 API 来完成的。 KUBERNETES_SERVICE_HOST 环境变量在每个容器中都可用。 查找服务描述的端点是

URL="http(s)://$USERNAME:$PASSWORD@$KUBERNETES_SERVICE_HOST/api/v1/namespaces/$NAMESPACE/endpoints/$SERVICE_NAME"

其中NAMESPACEdefault,除非您更改它,如果您将服务命名为zookeeper,SERVICE_NAME 将是zookeeper。

你会得到构成服务的容器的描述,它们的 ip 在“ip”字段中。 你可以这样做:

curl -s $URL | grep '\"ip\"' | awk 'print $2' | awk -F\" 'print $2' 

获取服务中的 IP 列表。 这样,使用上面定义的 ID 在节点上填充 zoo.cfg

您可能需要 USERNAMEPASSWORD 才能到达 google 容器引擎等服务的端点。这些需要放入 Secret 卷中(请参阅此处的文档:http://kubernetes.io/v1.0/docs/user-guide/secrets.html

您还需要在 Google Container Engine 上使用 curl -s --insecure,除非您将 CA 证书添加到您的 pod 中

基本上将卷添加到容器中,并从文件中查找值。 (与文档所说的相反,当 base64 编码时,请勿将 \n 放在用户名或密码的末尾:它只会让您在阅读这些内容时变得更加复杂)

编辑 2:

您需要在 Kafka 节点上做的另一件事是获取 IP 和主机名,并将它们放入 /etc/hosts 文件中。 Kafka 似乎需要通过主机名知道节点,并且这些默认情况下不在服务节点中设置

编辑 3:

经过多次尝试和思考,使用 IP 作为 ID 可能不是那么好:这取决于您如何配置存储。 对于任何类型的分布式服务,如 zookeeper、kafka、mongo、hdfs,您可能想要使用 emptyDir 类型的存储,所以它只是在该节点上(安装远程存储类型违背了分发这些服务的目的!) emptyDir 会在同一个节点上重新加载数据,所以使用 NODE ID(节点 IP)作为 ID 似乎更符合逻辑,因为这样在同一个节点上重启的 pod 就会有数据。 这避免了数据的潜在损坏(如果一个新节点开始在同一个实际上不是空的目录中写入,谁知道会发生什么)以及 Kafka,如果代理 id 更改,主题被分配一个 broker.id, zookeeper 不更新主题 broker.id 并且主题看起来可用但指向错误的 broker.id 并且一团糟。

到目前为止,我还没有找到如何获取节点 IP,但我认为可以通过查找服务 pod 名称和部署它们的节点来在 API 中查找。

编辑 4

要获取节点 IP,可以从端点 API 获取 pod hostname == name /api/v1/命名空间/默认/端点/ 如上所述。 然后您可以从 pod 名称中获取节点 IP /api/v1/namespaces/default/pods/

PS:这是受 Kubernetes 存储库中示例的启发(此处为 rethinkdb 示例:https://github.com/kubernetes/kubernetes/tree/master/examples/rethinkdb

【讨论】:

好主意!这似乎是一个可行的解决方案!我现在设法通过启动脚本提供了我所有的经纪人 ID: BROKER_ID=$(ip addr | awk '/inet/ && /eth0/sub(/\/.*$/,"",$2);打印 $2' | sed -r 's/\.//g') 和: sed -r -i "s/(broker.id)=(.*)/\1=$BROKER_ID/g" $KAFKA_HOME/ config/server.properties 我用了 ip=$(hostname -i) 然后 id=$ip//./ 我还可以问一下您如何将服务器添加到/conf/zoo.cfg?与 Kubernetes 的 Kafka-Service 交互的共享卷? 所以想法是这样的,我将编辑更多详细信息的答案:使用 zk 节点启动服务,并使用 kafka 节点启动服务。然后启动 zookeeper 的复制控制器。此时服务将注册 zk 容器......但这需要几秒钟。所以同时你的容器需要等待发现服务填充 zk 容器节点。我在启动 zookeeper 的脚本中暂停了 10 秒来解决这个问题,但这并不理想。然后在运行 zk 之前查找 IP 并填充 zoo.cfg。 感谢您的详尽演练!您指的是哪个用户名和密码?我假设的 API 服务器本身?我使用证书 atm 对服务器进行身份验证。那么我应该查找如何使用证书卷曲?【参考方案2】:

看看 https://github.com/CloudTrackInc/kubernetes-kafka 它允许在 kubernetes 中启动 Kafka,并支持扩展和自动扩展。

【讨论】:

那个链接已经过时了。现在可能更好地找到 Strimzi 或 Confluent Helm Charts。【参考方案3】:

这在我的搜索中很显眼,但包含相当过时的信息。要使用更现代的解决方案对此进行更新,您应该使用 StatefulSet 部署,这将生成名称中包含整数计数器而不是哈希的 pod,例如。卡夫卡控制器-0。

这当然是主机名,所以从那里使用 awk 提取固定不变的代理 ID 很简单:

hostname | awk -F'-' 'print $3'

如今可用于 Kafka 的最流行的容器都有一个代理 ID 命令。

【讨论】:

【参考方案4】:

我使用 docker-compose 完成了这项工作(Kubernetes 的不同之处在于您将通过 service.yaml 传递 ID 并拥有 2 个服务):

kafka1:
  build: kafka-0.8.1/
  ports:
  - 9092
  links:
  - zookeeper
  environment:
  - ID=1
kafka2:
  build: kafka-0.8.1/
  ports:
  - 9092
  links:
  - zookeeper
  environment:
  - ID=2

配置:

broker.id=$ID
port=9092
advertised.host.name=$HOST
advertised.port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/kafka/kafka-logs-$ID
num.partitions=200
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=$DOCKER_ZOOKEEPER_1_PORT_2181_TCP_ADDR:$DOCKER_ZOOKEEPER_1_PORT_2181_TCP_PORT
zookeeper.connection.timeout.ms=6000

sh:

#!/bin/bash
echo "Running config"
export HOST=`grep $HOSTNAME /etc/hosts | awk 'print $1'`
export ID=$ID:?
perl -p -i -e 's/\$\([^]+)\/defined $ENV$1 ? $ENV$1 : $&/eg' < /broker.template > $KAFKA_HOME/config/server.properties
echo "Done"
echo "starting kafka with:"
echo "$KAFKA_HOME/config/server.properties"
echo ""
cat $KAFKA_HOME/config/server.properties
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties

【讨论】:

您确定可以通过 kubernetes 服务传递环境变量吗?因为除此之外,我认为这是一个解决方案。 这讲的是在 Pod 或 ReplicationControllers 中传递环境变量?我知道这是可能的,但这是否也意味着它可以在服务中完成?

以上是关于Kubernetes 多节点上的 Kafka的主要内容,如果未能解决你的问题,请参考以下文章

Kubernetes多节点二进制部署

Kubernetes多节点二进制部署

如何通过在多个Vagrant管理的虚拟机上部署多节点的Kubernetes集群

Kubernetes 中工作节点上的连接被拒绝错误

NFS 卷位于 kubernetes 节点上的啥位置?

Kubernetes命令kubectl 在Node节点上的使用