如何将 kubernetes 服务端点 IP 传递到 KAFKA 广告监听器

Posted

技术标签:

【中文标题】如何将 kubernetes 服务端点 IP 传递到 KAFKA 广告监听器【英文标题】:How to pass down kubernetes service endpoints IP into KAFKA advertised listener 【发布时间】:2018-10-09 18:30:21 【问题描述】:

我想在 Kubernetes 中配置 Kafka 代理。我使用的 docker 镜像是confluentinc/cp-kafka:latest。它需要KAFKA_ADVERTISED_LISTENERS 环境变量,它允许Kafka 客户端与代理通信。

问题在于难以将服务端点 IP 分配给KAFKA_ADVERTISED_LISTENERS。如果我使用localhost 作为这个值,它只在本地 Kafka 代理 pod 中工作,但对于 kubernetes 集群中的某些 Kafka 客户端 pod 与它进行通信将不起作用。如果我使用来自kubectl get endpoints -l app=kafka 的服务端点 IP,这是可行的,但每次使用一些审计脚本设置此动态值的开销很小。

我想知道有没有更好的方法可以在 Kubernetes yaml 文件中动态设置这个值,所以我不需要每次都以编程方式设置这个 IP。

这里是yaml文件:

---
apiVersion: v1
kind: Service
metadata:
  name: kafka-broker
  labels:
    app: kafka
spec:
  type: NodePort
  ports:
  - port: 9092
    targetPort: 9092
  selector:
    app: kafka

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-broker
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      hostname: broker
      containers:
      - name: kafka
        image: confluentinc/cp-kafka:latest
        ports:
        - containerPort: 9092
        env:
        - name: KAFKA_ADVERTISED_LISTENERS
          value: "PLAINTEXT://DYNAMIC_ENDPOINT_IP:9092"
        - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
          value: "1"
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zoo1:2181

提前致谢。

编辑: 我尝试使用服务器名称、服务主机环境变量、服务源 IP 和 Pod IP。不幸的是,我仍然在 kafka 日志中收到错误: java.lang.IllegalArgumentException: Error creating broker listeners from 'PLAINTEXT://$KAFKA_BROKER_SERVICE_HOST:9092': Unable to parse PLAINTEXT://$KAFKA_BROKER_SERVICE_HOST:9092 to a broker endpoint

如果我使用kubectl exec -it kafa-broker-ssfjks env,则这些环境变量实际上已在此 pod 中正确设置。我想这可能与 Kafka 代理配置问题有关?

【问题讨论】:

【参考方案1】:

@Jakub 让我走上了正轨,所以对于像 cp-kafka-connect 这样的东西,我的 Dockerfile 看起来像:

FROM confluentinc/cp-kafka-connect:5.4.0
ENV CONNECT_GROUP_ID='kafkatosql'
ENV CONNECT_CONFIG_STORAGE_TOPIC="kafkatosql-config"
ENV CONNECT_OFFSET_STORAGE_TOPIC="kafkatosql-offsets"
ENV CONNECT_STATUS_STORAGE_TOPIC="kafkatosql-status"
ENV CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter"
ENV CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter"
ENV CONNECT_LOG4J_ROOT_LOGLEVEL="ERROR"
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:5.4.0
WORKDIR /app
COPY start.sh .
CMD exec ./start.sh

然后 start.sh 看起来像:

kafka_connect_host=localhost:8083

export CONNECT_REST_ADVERTISED_HOST_NAME=$(hostname -I)

/etc/confluent/docker/run &

wait_counter=0
echo "Waiting for Kafka Connect to start listening on kafka-connect ⏳"
while true; do
  status=$(curl -s -o /dev/null -w %http_code http://$kafka_connect_host/connectors)
  if [ $status -eq 000 ]; then
    wait_counter=$((wait_counter+1))
    echo "Kafka Connect listener HTTP status: $status (waiting for 200)"
    if [ $wait_counter = 15 ]; then
      echo 'Waited too long!';
      exit 1;
    else
      echo "Retries: $wait_counter"
      sleep 3
    fi
  else
    break
  fi
done
echo -e "\n--\n+> Creating Kafka Connect Postgresql Sink"
curl -X PUT http://$kafka_connect_host/connectors/jdbc_sink_postgresql_00/config -H "Content-Type: application/json" -d '
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "tasks.max": 1,
  "topics": "users",
  "connection.url": "jdbc:'"$DB_URL"'",
  "auto.create": false
'

# ... other stuff

trap : TERM INT; sleep infinity & wait

【讨论】:

【参考方案2】:

使用服务名称 (kafka-broker) 而不是 IP。 Kube-dns 将为您解决。如果kafka客户端被放置在同一个命名空间,你应该只使用“kafka-broker”,如果没有,你必须使用限定名“kafka-broker.YOURNAMESPACE.svc”

【讨论】:

这似乎不起作用,pod 遇到以下异常(尽管它能够使用 kubernetes 服务与 zookeeper 建立连接):INFO Socket connection established to zookeeper.default.svc.cluster.local /10.103.99.26:2181, 启动会话 (org.apache.zookeeper.ClientCnxn) ... WARN [Controller-1-to-broker-1-send-thread]: 控制器 1 与代理 kafka-broker:9092 的连接 (id :1机架:null)不成功(kafka.controller.RequestSendThread)java.net.SocketTimeoutException:在kafka.controller.RequestSendThread.brokerReady的30000毫秒内连接失败【参考方案3】:

您应该让您的客户端通过该服务进行连接,因此公开该服务的 ip 或 dns 应该可以工作。默认情况下,服务在 pod 中作为变量名公开。如果配置了 dns 插件,则可以使用 dns。更多信息:https://kubernetes.io/docs/concepts/services-networking/service/#environment-variables

【讨论】:

澄清一下,此解决方案适用于仅具有一个副本的部署 - 如问题中所述。 (具有多个代理的 Kafka 集群在负载均衡器后面无法正常工作。服务只能用于引导,但广告地址必须指向特定的代理。) 所以基本上你可以使用 StatefulSet 并对 env 变量中的成员进行硬编码。或者,当另一个副本启动时,您将如何更新侦听器列表? 当您使用 StatefulSet 时,您可以使用无头服务,该服务将为每个 pod 提供一个固定的主机名/DNS 记录,该记录可用作 Kubernetes 集群中的广告主机名。但是您需要弄清楚映像中的主机名(例如在 Docker 入口点脚本中使用 hostname -f)——我从未使用过 Confluent Docker 映像,所以我不确定这是否可行。尽管您会预先知道每个 pod 的主机名,但您不能在环境变量中使用它,因为 StatefulSet 创建的所有 pod 只有一个模板。 我尝试在"PLAINTEXT://$KAFKA_BROKER_SERVICE_HOST:9092" 中使用$KAFKA_BROKER_SERVICE_HOST 环境变量。但是,kafka 会抛出一些错误,例如java.lang.IllegalArgumentException: Error creating broker listeners from 'PLAINTEXT://$KAFKA_BROKER_SERVICE_HOST:9092': Unable to parse PLAINTEXT://$KAFKA_BROKER_SERVICE_HOST:9092 to a broker endpoint。我将编辑这个问题。

以上是关于如何将 kubernetes 服务端点 IP 传递到 KAFKA 广告监听器的主要内容,如果未能解决你的问题,请参考以下文章

[教程,Part 2]如何使用HTTP/REST端点,中间件,Kubernetes等开发Go gRPC微服务

Kubernetes Nginx-Ingress oauth_proxy如何将信息/令牌传递给服务

如何在 Kitura 服务器中获取 Kubernetes 外部 IP

Kubernetes - 如何将信任库路径和密码传递给 JVM 参数

Kubernetes Nginx Ingress 找不到服务端点

开源网络组件总结(kubernetes相关)