获取所有写入特定主题的 Kafka 源连接器

Posted

技术标签:

【中文标题】获取所有写入特定主题的 Kafka 源连接器【英文标题】:Get all Kafka Source Connectors writing to a specific topic 【发布时间】:2021-10-10 19:50:12 【问题描述】:

我有一个 Kafka 主题的名称。我想知道哪些连接器正在使用这个主题。具体来说,我需要源连接器名称,以便可以修改源查询。我只能访问 Confluent 控制中心。我们有数百个连接器,我无法手动搜索它们。

提前谢谢你!

【问题讨论】:

【参考方案1】:

你需要写一个脚本。

给定:连接 REST 端点

做:(伪代码)

to_find = 'your_topic_name'

topic_used_by = []

connectors = GET /connectors
for each connector_name in connectors:
  connector_config = GET /connectors/connector_name
  connector_config = connector_config['config']

  if 'topics.regex' in connector_config:
     // pattern match against your topic
     if to_find.matches(connector_config['topics.regex']):
        topic_used_by.add(connector_name)
  else if 'topics' in connector_config:
     // split these on commas
     if to_find in connector_config['topics'].split(','):
       topic_used_by.add(connector_name)

print(topic_used_by)

因此,您的 HTTP 客户端应该很容易扩展以更新任何配置值,然后是 POST /connectors/connector_name/config

请记住,'topics''topics.regex' 都不是 连接器属性(它们是接收器连接器属性)。因此,您需要对其进行修改以使用您拥有的任何连接器属性(理想情况下,通过连接器类名称进行过滤)。例如,JDBC Source 的table.whitelist 属性或MongoDB 源中的collection 确定主题名称。

除非您按类名过滤,否则将返回源和接收器,因此除非在连接器类的名称中有 "Source" 或类似名称,否则这是您能做的最好的事情

另外,您需要考虑一些主题名称可能由各种 transform 配置设置,这些配置中可能有任何 JSON 键值,因此没有直接的方法可以搜索那些没有 at最少transform_config_value.contains('your_topic_name')

【讨论】:

【参考方案2】:

您可以使用通过KIP-558: Track the set of actively used topics by connectors in Kafka Connect 实现的功能。

例如从名为“some-source”的连接器获取活动主题集:

curl -s 'http://localhost:8083/connectors/some-source/topics' | jq

  "some-source": 
    "topics": [
      "foo",
      "bar",
      "baz",               
    ]
  

【讨论】:

对不起。我没有连接器名称。我只有主题名称,我正在尝试找到该主题的制作人。

以上是关于获取所有写入特定主题的 Kafka 源连接器的主要内容,如果未能解决你的问题,请参考以下文章

提取和转换 jdbc sink 连接器的 kafka 消息特定字段

为jdbc sink连接器提取和转换kafka消息的特定字段。

覆盖 Mongo 源连接器

如何告诉 debezuim Mysql 源连接器停止重新拍摄 kafka 主题中现有表的快照?

用于 S3 中 PARQUET 格式的 Kafka S3 源连接器

Kafka JDBC 源连接器插入或更新