Kafka Connect 使用带有 Strimzi 的 REST API 和种类:KafkaConnector

Posted

技术标签:

【中文标题】Kafka Connect 使用带有 Strimzi 的 REST API 和种类:KafkaConnector【英文标题】:Kafka Connect using REST API with Strimzi with kind: KafkaConnector 【发布时间】:2021-10-02 07:13:39 【问题描述】:

我正在尝试使用 Kafka Connect REST API 来管理连接器,为简单起见,请考虑以下 pause 实现:

def pause(): Unit = 
      logger.info(s"pause() Triggered")
      val response = HttpClient.newHttpClient.send(
        HttpRequest
          .newBuilder(URI.create(config.connectUrl + s"/connectors/$config.connectorName/pause"))
          .PUT(BodyPublishers.noBody)
          .timeout(Duration.ofMillis(config.timeout.toMillis.toInt))
          .build()
      , BodyHandlers.ofString)
      if (response.statusCode() != HTTPStatus.Accepted) 
        throw new Exception(s"Could not pause connector: $response.body")
      
    

由于我使用 KafkaConnector 作为资源,我不能使用 Kafka Connect REST API,因为连接器操作员将 KafkaConnetor 资源作为其单一事实来源,手动更改(例如 pause)直接使用 Kafka Connect REST API 由 Cluster Operator 还原。

所以要暂停连接器,我需要以某种方式编辑资源。

我正在努力改变当前函数的逻辑,如果有一些关于如何处理 KafkaConnetor 资源的实际例子会很棒。

我查看了Using Strimzi 文档,但找不到任何实际示例

谢谢!


在@Jakub 的帮助下,我设法创建了我的新客户:


class KubernetesService(config: Configuration) extends StrictLogging 

  private[this] val client = new DefaultKubernetesClient(Config.autoConfigure(config.connectorContext))

  def setPause(pause: Boolean): Unit = 
    logger.info(s"[KubernetesService] - setPause($pause) Triggered")

    val connector = getConnector()
    connector.getSpec.setPause(pause)
    Crds.kafkaConnectorOperation(client).inNamespace(config.connectorNamespace).withName(config.connectorName).replace(connector)

    Crds.kafkaConnectorOperation(client)
      .inNamespace(config.connectorNamespace)
      .withName(config.connectorName)
      .waitUntilCondition(connector => 
        connector != null &&
          connector.getSpec.getPause == pause && 
          val desiredState = if (pause) "Paused" else "Running"
          connector.getStatus.getConditions.stream().anyMatch(_.getType.equalsIgnoreCase(desiredState))
        
      , config.timeout.toMillis, TimeUnit.MILLISECONDS)
  

  def delete(): Unit = 
    logger.info(s"[KubernetesService] - delete() Triggered")

    Crds.kafkaConnectorOperation(client).inNamespace(config.connectorNamespace).withName(config.connectorName).delete

    Crds.kafkaConnectorOperation(client)
      .inNamespace(config.connectorNamespace)
      .withName(config.connectorName)
      .waitUntilCondition(_ == null, config.timeout.toMillis, TimeUnit.MILLISECONDS)
  

  def create(oldKafkaConnect: KafkaConnector): Unit = 
    logger.info(s"[KubernetesService] - create($oldKafkaConnect.getMetadata) Triggered")

    Crds.kafkaConnectorOperation(client).inNamespace(config.connectorNamespace).withName(config.connectorName).create(oldKafkaConnect)

    Crds.kafkaConnectorOperation(client)
      .inNamespace(config.connectorNamespace)
      .withName(config.connectorName)
      .waitUntilCondition(connector => 
          connector != null &&
          connector.getStatus.getConditions.stream().anyMatch(_.getType.equalsIgnoreCase("Running"))
      , config.timeout.toMillis, TimeUnit.MILLISECONDS)
  

  def getConnector(): KafkaConnector = 
    logger.info(s"[KubernetesService] - getConnector() Triggered")
    Try 
      Crds.kafkaConnectorOperation(client).inNamespace(config.connectorNamespace).withName(config.connectorName).get
     match 
      case Success(connector) => connector
      case Failure(_: NullPointerException) => throw new NullPointerException(s"Failure on getConnector($config.connectorName) on ns: $config.connectorNamespace, context: $config.connectorContext")
      case Failure(exception) => throw exception
    
  

【问题讨论】:

【参考方案1】:

暂停连接器,您可以编辑KafkaConnector 资源并将.spec 中的pause 字段设置为true(参见docs)。有几种方法可以做到这一点。您可以使用 kubectl 并从文件 (kubectl apply) 应用新的 YAML,或者使用 kubectl edit 以交互方式执行此操作。

如果您想以编程方式执行此操作,则需要使用 Kubernetes 客户端来编辑资源。在 Java 中,您还可以使用 Strimzi 的 api 模块,该模块具有编辑资源的所有结构。我整理了一个使用 Fabric8 Kubernetes 客户端和 api 模块在 Java 中暂停 Kafka 连接器的简单示例:

package cz.scholz.strimzi.api.examples;

import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.strimzi.api.kafka.Crds;
import io.strimzi.api.kafka.KafkaConnectorList;
import io.strimzi.api.kafka.model.KafkaConnector;

public class PauseConnector 
    public static void main(String[] args) 
        String namespace = "myproject";
        String crName = "my-connector";

        KubernetesClient client = new DefaultKubernetesClient();
        MixedOperation<KafkaConnector, KafkaConnectorList, Resource<KafkaConnector>> op = Crds.kafkaConnectorOperation(client);
        KafkaConnector connector = op.inNamespace(namespace).withName(crName).get();
        connector.getSpec().setPause(true);
        op.inNamespace(namespace).withName(crName).replace(connector);

        client.close();
    

(有关完整项目,请参阅 https://github.com/scholzj/strimzi-api-examples)

我不是 Scala 用户 - 但我认为它也应该可以在 Scala 中使用,但我将把它从 Java 重写为 Scala 留给你。

【讨论】:

非常感谢@Jakub,strimzi-api 有什么好的参考吗?例如使用新的config 进行deleterecreate 等操作? 以及是否可以找到特定 groupId 的所有当前偏移量,因为目前我正在使用 adminClient (org.apache.kafka.clients.admin) 来查找偏移量 "如果有可能找到特定 groupId 的所有当前偏移量,因为目前我正在使用 adminClient" ...嗯,管理客户端是正确的方法为了这。但我不确定这有多容易或多难。 "非常感谢@Jakub,srimzi-api 有什么好的参考吗?例如使用新配置进行删除和重新创建等操作?" ...好吧,对于创建,您可以只使用create 命令而不是replace。您可以使用KafkaConnectorBuilder 来准备资源。与delete 类似或更新配置等。您只需操纵自定义资源。 为什么我要用新的连接器替换旧的连接器,op.inNamespace(namespace).withName(crName).get().getSpec().setPause(true) 不应该工作吗?另外,如果我给 K8sClient 命名空间,我还需要在 .inNamespace 方法上指定它吗?

以上是关于Kafka Connect 使用带有 Strimzi 的 REST API 和种类:KafkaConnector的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Connect with MSSQL 不适用于删除操作

Kafka Connect,插件路径

带有旧数据库的JDBC Kafka连接器

Debezium MongoDB 连接器错误:org.apache.kafka.connect.errors.ConnectException:错误处理程序中超出公差

使用 cp-kafka-connect-base 为 snowflake-kafka-connector 构建一个组合的 docker 映像,以部署在 kafka connect 集群上

使用独立模式 Kafka-connect 将 Postgresql 的数据捕获更改为 kafka 主题