如何将 OpenShift 上的 KSQLDB 集群连接到本地 Kerberized Kafka 集群

Posted

技术标签:

【中文标题】如何将 OpenShift 上的 KSQLDB 集群连接到本地 Kerberized Kafka 集群【英文标题】:How to connect your KSQLDB-Cluster on OpenShift to an on-premise kerberized Kafka-cluster 【发布时间】:2020-09-01 02:22:31 【问题描述】:

我想要达到的目标: 我们有一个本地 Kafka 集群。我想在 OpenShift 中设置 KSQLDB 并将其连接到本地 Kafka 集群的代理。

问题: 当我尝试使用命令“/usr/bin/ksql-server-start /etc/ksqldb/ksql-server.properties”启动 KSQLDB 服务器时,我收到错误消息:

[2020-05-14 15:47:48,519] ERROR Failed to start KSQL (io.confluent.ksql.rest.server.KsqlServerMain:60)
io.confluent.ksql.util.KsqlServerException: Could not get Kafka cluster configuration!
        at io.confluent.ksql.services.KafkaClusterUtil.getConfig(KafkaClusterUtil.java:90)
        at io.confluent.ksql.security.KsqlAuthorizationValidatorFactory.isKafkaAuthorizerEnabled(KsqlAuthorizationValidatorFactory.java:81)
        at io.confluent.ksql.security.KsqlAuthorizationValidatorFactory.create(KsqlAuthorizationValidatorFactory.java:51)
        at io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:624)
        at io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:544)
        at io.confluent.ksql.rest.server.KsqlServerMain.createExecutable(KsqlServerMain.java:98)
        at io.confluent.ksql.rest.server.KsqlServerMain.main(KsqlServerMain.java:56)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1589471268517) timed out at 1589471268518 after 1 attempt(s)
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
        at io.confluent.ksql.services.KafkaClusterUtil.getConfig(KafkaClusterUtil.java:60)
        ... 6 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1589471268517) timed out at 1589471268518 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

我的配置: 我根据这个镜像设置了我的Dockerfile:https://hub.docker.com/r/confluentinc/ksqldb-server,端口9092、9093、8080、8082和443是开放的。

我的 service-yaml 看起来像这样:

kind: Service
apiVersion: v1
metadata:
  name: social-media-dev
  namespace: abc
  selfLink: xyz
  uid: xyz
  resourceVersion: '1'
  creationTimestamp: '2020-05-14T09:47:15Z'
  labels:
    app: social-media-dev
  annotations:
    openshift.io/generated-by: OpenShiftNewApp
spec:
  ports:
    - name: social-media-dev
      protocol: TCP
      port: 9092
      targetPort: 9092
      nodePort: 31364
  selector:
    app: social-media-dev
    deploymentconfig: social-media-dev
  clusterIP: XX.XX.XXX.XXX
  type: LoadBalancer
  externalIPs:
    - XXX.XX.XXX.XXX
  sessionAffinity: None
  externalTrafficPolicy: Cluster
status:
  loadBalancer:
    ingress:
      - ip: XX.XX.XXX.XXX

我的 ksql-server.properties 文件包含以下信息: 听众:http://0.0.0.0:8082 bootstrap.servers: X.X.X.X:9092, X.X.X.Y:9092, X.X.X.Z:9092

到目前为止我所尝试的:

我尝试从我的 pod 内连接到代理,并且成功了:(timeout 1 bash -c '</dev/tcp/X.X.X.X/9092 && echo PORT OPEN || echo PORT CLOSED') 2>/dev/null 结果:端口打开

我也玩过监听器,但错误消息变得更短,只是显示“无法获取 Kafka 集群配置!”的信息。并且没有超时错误。

我尝试将 LoadBalancer 交换到 Nodeport,但也没有成功。

你有什么想法我接下来可以尝试吗?

更新:通过升级到 Cloudera CDH6,Cloudera Kafka 集群现在也可以与 Kafka Streams 一起使用。因此,我现在能够从 Openshift 中的 KSQLDB 集群连接到本地 Kafka 集群。

【问题讨论】:

在运行 ksqlDB 的地方使用 kafkacat,您能否成功连接到代理并返回元数据(使用 -L 标志)? 谢谢@RobinMoffatt,我会试试的,但是让 librdkafka-devel for kafkacat 在 RHEL8 上运行有点挑战。 如果您将 ksqlDB 作为 Docker 容器运行,您也可以将 kafkacat 作为 Docker 容器运行 :) 谢谢,我刚刚开始运行它。所以使用 kafkacat 我可以看到我们所有的主题、分区、领导者等。所以这很有效。 我猜@RobinMoffatt 可能是 kafka 监听器的问题?我没有在默认端口上运行它,因为它没有打开,而是使用了 8083。8083 没有被另一个进程使用,我已经检查过了。 【参考方案1】:

更新: 通过升级到 Cloudera CDH6,Cloudera Kafka 集群现在也可以与 Kafka Streams 一起使用。因此,我现在能够从 Openshift 中的 KSQLDB 集群连接到本地 Kafka 集群。

我还将在这里描述我连接到 kerberized Kafka 集群的最终方式,因为我一直在努力让它运行:

    获取 Kerberos 票证并通过 SSL 建立连接

ksql-server.properties(其中的 sasl_ssl 部分):

security.protocol=SASL_SSL
sasl.mechanism=GSSAPI

ssl.truststore.location=truststore.jks
ssl.truststore.password=password
ssl.truststore.type=JKS

ssl.ca.location=cert

sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="my.keytab" serviceName="kafka"  principal="myprincipal";
serviceName="kafka"

producer.ssl.endpoint.identification.algorithm=HTTPS
producer.security.protocol=SASL_SSL
producer.ssl.truststore.location=truststore.jks
producer.ssl.truststore.password=password
producer.sasl.mechanism=GSSAPI
producer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="my.keytab" serviceName="kafka"  principal="myprincipal";

consumer.ssl.endpoint.identification.algorithm=HTTPS
consumer.security.protocol=SASL_SSL
consumer.ssl.truststore.location=truststore.jks
consumer.ssl.truststore.password=password
consumer.sasl.mechanism=GSSAPI
consumer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="my.keytab" serviceName="kafka"  principal="myprincipal";`
    因此设置 Sentry 规则

HOST=[HOST]->CLUSTER=kafka-cluster->action=idempotentwrite

HOST=[HOST]->TRANSACTIONALID=[ID]->action=describe

HOST=[HOST]->TRANSACTIONALID=[ID]->action=write

【讨论】:

以上是关于如何将 OpenShift 上的 KSQLDB 集群连接到本地 Kerberized Kafka 集群的主要内容,如果未能解决你的问题,请参考以下文章

将 Openshift 上的 PostgreSQL 连接到 Amazon S3

如何从kafka主题为ksqldb创建主题

将 Kafka 连接嵌入 Ksqldb-server 时挂载(卷)不起作用

一文读懂OpenShift总体架构设计 | 五一送福利

与 OpenShift 上的 Java 程序的 RMI 或套接字连接

Kafka Connect 重新读取整个文件以进行 KSQLDB 调试或 KSQLDB 是不是可以在创建查询后插入所有事件?