Kafka Connect 使用带有 Strimzi 的 REST API 和种类:KafkaConnector
Posted
技术标签:
【中文标题】Kafka Connect 使用带有 Strimzi 的 REST API 和种类:KafkaConnector【英文标题】:Kafka Connect using REST API with Strimzi with kind: KafkaConnector 【发布时间】:2021-10-02 07:13:39 【问题描述】:我正在尝试使用 Kafka Connect REST API 来管理连接器,为简单起见,请考虑以下 pause
实现:
def pause(): Unit =
logger.info(s"pause() Triggered")
val response = HttpClient.newHttpClient.send(
HttpRequest
.newBuilder(URI.create(config.connectUrl + s"/connectors/$config.connectorName/pause"))
.PUT(BodyPublishers.noBody)
.timeout(Duration.ofMillis(config.timeout.toMillis.toInt))
.build()
, BodyHandlers.ofString)
if (response.statusCode() != HTTPStatus.Accepted)
throw new Exception(s"Could not pause connector: $response.body")
由于我使用 KafkaConnector
作为资源,我不能使用 Kafka Connect REST API,因为连接器操作员将 KafkaConnetor 资源作为其单一事实来源,手动更改(例如 pause
)直接使用 Kafka Connect REST API 由 Cluster Operator 还原。
所以要暂停连接器,我需要以某种方式编辑资源。
我正在努力改变当前函数的逻辑,如果有一些关于如何处理 KafkaConnetor 资源的实际例子会很棒。
我查看了Using Strimzi 文档,但找不到任何实际示例
谢谢!
在@Jakub 的帮助下,我设法创建了我的新客户:
class KubernetesService(config: Configuration) extends StrictLogging
private[this] val client = new DefaultKubernetesClient(Config.autoConfigure(config.connectorContext))
def setPause(pause: Boolean): Unit =
logger.info(s"[KubernetesService] - setPause($pause) Triggered")
val connector = getConnector()
connector.getSpec.setPause(pause)
Crds.kafkaConnectorOperation(client).inNamespace(config.connectorNamespace).withName(config.connectorName).replace(connector)
Crds.kafkaConnectorOperation(client)
.inNamespace(config.connectorNamespace)
.withName(config.connectorName)
.waitUntilCondition(connector =>
connector != null &&
connector.getSpec.getPause == pause &&
val desiredState = if (pause) "Paused" else "Running"
connector.getStatus.getConditions.stream().anyMatch(_.getType.equalsIgnoreCase(desiredState))
, config.timeout.toMillis, TimeUnit.MILLISECONDS)
def delete(): Unit =
logger.info(s"[KubernetesService] - delete() Triggered")
Crds.kafkaConnectorOperation(client).inNamespace(config.connectorNamespace).withName(config.connectorName).delete
Crds.kafkaConnectorOperation(client)
.inNamespace(config.connectorNamespace)
.withName(config.connectorName)
.waitUntilCondition(_ == null, config.timeout.toMillis, TimeUnit.MILLISECONDS)
def create(oldKafkaConnect: KafkaConnector): Unit =
logger.info(s"[KubernetesService] - create($oldKafkaConnect.getMetadata) Triggered")
Crds.kafkaConnectorOperation(client).inNamespace(config.connectorNamespace).withName(config.connectorName).create(oldKafkaConnect)
Crds.kafkaConnectorOperation(client)
.inNamespace(config.connectorNamespace)
.withName(config.connectorName)
.waitUntilCondition(connector =>
connector != null &&
connector.getStatus.getConditions.stream().anyMatch(_.getType.equalsIgnoreCase("Running"))
, config.timeout.toMillis, TimeUnit.MILLISECONDS)
def getConnector(): KafkaConnector =
logger.info(s"[KubernetesService] - getConnector() Triggered")
Try
Crds.kafkaConnectorOperation(client).inNamespace(config.connectorNamespace).withName(config.connectorName).get
match
case Success(connector) => connector
case Failure(_: NullPointerException) => throw new NullPointerException(s"Failure on getConnector($config.connectorName) on ns: $config.connectorNamespace, context: $config.connectorContext")
case Failure(exception) => throw exception
【问题讨论】:
【参考方案1】:要暂停连接器,您可以编辑KafkaConnector
资源并将.spec
中的pause
字段设置为true
(参见docs)。有几种方法可以做到这一点。您可以使用 kubectl
并从文件 (kubectl apply
) 应用新的 YAML,或者使用 kubectl edit
以交互方式执行此操作。
如果您想以编程方式执行此操作,则需要使用 Kubernetes 客户端来编辑资源。在 Java 中,您还可以使用 Strimzi 的 api
模块,该模块具有编辑资源的所有结构。我整理了一个使用 Fabric8 Kubernetes 客户端和 api
模块在 Java 中暂停 Kafka 连接器的简单示例:
package cz.scholz.strimzi.api.examples;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.strimzi.api.kafka.Crds;
import io.strimzi.api.kafka.KafkaConnectorList;
import io.strimzi.api.kafka.model.KafkaConnector;
public class PauseConnector
public static void main(String[] args)
String namespace = "myproject";
String crName = "my-connector";
KubernetesClient client = new DefaultKubernetesClient();
MixedOperation<KafkaConnector, KafkaConnectorList, Resource<KafkaConnector>> op = Crds.kafkaConnectorOperation(client);
KafkaConnector connector = op.inNamespace(namespace).withName(crName).get();
connector.getSpec().setPause(true);
op.inNamespace(namespace).withName(crName).replace(connector);
client.close();
(有关完整项目,请参阅 https://github.com/scholzj/strimzi-api-examples)
我不是 Scala 用户 - 但我认为它也应该可以在 Scala 中使用,但我将把它从 Java 重写为 Scala 留给你。
【讨论】:
非常感谢@Jakub,strimzi-api
有什么好的参考吗?例如使用新的config
进行delete
和recreate
等操作?
以及是否可以找到特定 groupId
的所有当前偏移量,因为目前我正在使用 adminClient (org.apache.kafka.clients.admin
) 来查找偏移量
"如果有可能找到特定 groupId 的所有当前偏移量,因为目前我正在使用 adminClient" ...嗯,管理客户端是正确的方法为了这。但我不确定这有多容易或多难。
"非常感谢@Jakub,srimzi-api 有什么好的参考吗?例如使用新配置进行删除和重新创建等操作?" ...好吧,对于创建,您可以只使用create
命令而不是replace
。您可以使用KafkaConnectorBuilder
来准备资源。与delete
类似或更新配置等。您只需操纵自定义资源。
为什么我要用新的连接器替换旧的连接器,op.inNamespace(namespace).withName(crName).get().getSpec().setPause(true)
不应该工作吗?另外,如果我给 K8sClient 命名空间,我还需要在 .inNamespace
方法上指定它吗?以上是关于Kafka Connect 使用带有 Strimzi 的 REST API 和种类:KafkaConnector的主要内容,如果未能解决你的问题,请参考以下文章
Kafka Connect with MSSQL 不适用于删除操作
Debezium MongoDB 连接器错误:org.apache.kafka.connect.errors.ConnectException:错误处理程序中超出公差
使用 cp-kafka-connect-base 为 snowflake-kafka-connector 构建一个组合的 docker 映像,以部署在 kafka connect 集群上