Kafka Connect:没有为连接器创建任务

Posted

技术标签:

【中文标题】Kafka Connect:没有为连接器创建任务【英文标题】:Kafka Connect: No tasks created for a connector 【发布时间】:2020-06-05 15:23:53 【问题描述】:

我们使用 Debezium (MongoDB) 和 Confluent S3 连接器以分布式模式运行 Kafka Connect(Confluent Platform 5.4,即 Kafka 2.4)。 通过 REST API 添加新连接器时,连接器会在 RUNNING 状态下创建,但不会为连接器创建任务。

暂停和恢复连接器没有帮助。 当我们停止所有工作人员然后重新启动它们时,任务就创建好了,一切都按原样运行。

问题不是由连接器插件引起的,因为我们看到 Debezium 和 S3 连接器的行为相同。同样在调试日志中,我可以看到 Debezium 正确地从 Connector.taskConfigs() 方法返回任务配置。

有人能告诉我该怎么做吗?我们可以在不重新启动工作人员的情况下添加连接器吗? 谢谢。

配置详情

集群有 3 个节点,具有以下 connect-distributed.properties

bootstrap.servers=kafka-broker-001:9092,kafka-broker-002:9092,kafka-broker-003:9092,kafka-broker-004:9092
group.id=tdp-QA-connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.topic=connect-offsets-qa
offset.storage.replication.factor=3
offset.storage.partitions=5

config.storage.topic=connect-configs-qa
config.storage.replication.factor=3

status.storage.topic=connect-status-qa
status.storage.replication.factor=3
status.storage.partitions=3

offset.flush.interval.ms=10000

rest.host.name=tdp-QA-kafka-connect-001
rest.port=10083
rest.advertised.host.name=tdp-QA-kafka-connect-001
rest.advertised.port=10083

plugin.path=/opt/kafka-connect/plugins,/usr/share/java/

security.protocol=SSL
ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
ssl.truststore.password=<secret>
ssl.endpoint.identification.algorithm=
producer.security.protocol=SSL
producer.ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
producer.ssl.truststore.password=<secret>
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/etc/kafka/ssl/kafka-connect.truststore.jks
consumer.ssl.truststore.password=<secret>

max.request.size=20000000
max.partition.fetch.bytes=20000000

连接器配置

Debezium 示例:


  "name": "qa-mongodb-comp-converter-task|1",
  "config": 
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "mongodb.hosts": "mongodb-qa-001:27017,mongodb-qa-002:27017,mongodb-qa-003:27017",
    "mongodb.name": "qa-debezium-comp",
    "mongodb.ssl.enabled": true,
    "collection.whitelist": "converter[.]task",
    "tombstones.on.delete": true
  

S3 示例:


  "name": "qa-s3-sink-task|1",
  "config": 
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "topics": "qa-debezium-comp.converter.task",
    "topics.dir": "data/env/qa",
    "s3.region": "eu-west-1",
    "s3.bucket.name": "<bucket-name>",
    "flush.size": "15000",
    "rotate.interval.ms": "3600000",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "custom.kafka.connect.s3.format.plaintext.PlaintextFormat",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "schema.compatibility": "NONE",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": false,
    "value.converter.schemas.enable": false,
    "transforms": "ExtractDocument",
    "transforms.ExtractDocument.type":"custom.kafka.connect.transforms.ExtractDocument$Value"
  


连接器是使用 curl 创建的: curl -X POST -H "Content-Type: application/json" --data @&lt;json_file&gt; http:/&lt;connect_host&gt;:10083/connectors

【问题讨论】:

多次遇到同样的问题 (***.com/q/55622904/7109598),但没有找到解决办法 从来没有遇到过这个问题。请显示所有相关配置、命令、安装详细信息等...... cc @Iskuskov @cricket_007 我添加了配置细节。我们使用 curl -X POST ... 通过 REST API 添加连接器 在issues.apache.org/jira/projects/KAFKA/issues 创建一个jira 问题可能有助于调查。请在票证中包含日志(如果需要,请编辑)。调试级别将在重新平衡期间如何创建和分配任务方面提供更多信息。此外,可能值得检查您是否也遇到了 CP 5.3.2 的问题,或者尝试connect.protocol 的三个选项中的每一个。此外,请确保使用正确的设置创建 Connect 的内部主题(配置主题需要压缩)。 我认为 Kafka Connect 没有记录有关为什么无法创建或维持任务的信息的 WARN 或 ERROR 是疯狂的。我在这里添加了评论:issues.apache.org/jira/browse/KAFKA-9747 【参考方案1】:

删除连接器并使用不同的database.server.id 重新创建它。重复此过程,直到出现任务。

经过 6-7 次试验后,它对我有用,不知道为什么。暂停和恢复,重新启动连接器/任务对我没有帮助。

【讨论】:

【参考方案2】:

我遇到了同样的问题,所以我更改了连接器的名称并创建了一个新连接器,它可以工作,但我不知道这个问题的根源,因为我们在 kafka-connect 日志中没有信息。

【讨论】:

以上是关于Kafka Connect:没有为连接器创建任务的主要内容,如果未能解决你的问题,请参考以下文章

pyflink消费kafka-connect-jdbc消息(带schema)

Kafka-Connect:在分布式模式下创建新连接器就是创建新组

java.lang.NoClassDefFoundError: org/apache/kafka/connect/header/ConnectHeaders

kafka connect - 审计 - 在任务完成时触发事件

Debezium 如何使用 Kafka Connect 正确注册 SqlServer 连接器 - 连接被拒绝

保护对 Kafka Connect 的 REST API 的访问