Kafka 连接器 Elasticsearch topics.regex

Posted

技术标签:

【中文标题】Kafka 连接器 Elasticsearch topics.regex【英文标题】:Kafka connector Elasticsearch topics.regex 【发布时间】:2020-11-24 08:54:54 【问题描述】:

我使用的版本是confluent kafka 5.1.14

我试图运行 Kafka 连接器 Elasticsearch 以将数据从 Kafka 发送到 Elasticsearch 我已经在独立模式下测试了这个配置,一切正常。 这是 elasticsearch sink 的独立配置

name=elasticsearch-sink-standalone
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics.regex=^[a-zA-Z0-9]((?!-raw$)[a-zA-Z0-9-_])+[a-zA-Z0-9]$
connection.url=http://elasticsearch:9200
type.name=_doc
key.ignore=true
schema.ignore=true

但是当我使用相同的设置在分布式模式下创建接收器时,它会出错。 这是创建接收器时请求的帖子正文


 "name" : "connector-test",
 "config" : 
  "connector.class" : "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "tasks.max" : "1",
  "topics.regex" : "^[a-zA-Z0-9]((?!-raw$)[a-zA-Z0-9-_])+[a-zA-Z0-9]$", 
  "connection.url" : "http://elasitcsearch:9200",
  "type.name" : "_doc",
  "key.ignore" : "true",
  "schema.ignore" : "true"
 

这是我得到的错误

[2020-08-04 18:18:35,911] ERROR WorkerSinkTaskid=connector-test-0 Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:512)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:492)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:344)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:512)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'status': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"status-task-connector-test-0"; line: 1, column: 8]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'status': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"status-task-connector-test-0"; line: 1, column: 8]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:703)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3532)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2627)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:832)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:729)
    at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4043)
    at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2572)
    at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:342)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:512)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:512)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:492)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
[2020-08-04 18:18:35,912] ERROR WorkerSinkTaskid=connector-test-0 Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)

我可以将分布式模式与其他正则表达式一起使用,所以我认为错误是由正则表达式引起的。 但让我感到困惑的是,独立模式可以使用这个正则表达式。

独立模式和分布式模式都运行(我不确定这些信息是否有帮助)

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false

如果我需要提供更多信息,请告诉我 谢谢!

【问题讨论】:

【参考方案1】:

我不认为这里的问题是正则表达式。堆栈跟踪清楚地表明,在从源系统(在本例中是 Kafka,因为连接器是接收器)轮询消息后尝试序列化消息,并且正在发生 something-2-json 错误。您需要了解导致转换错误的这些记录的格式。

正则表达式用于了解连接器应从哪些主题读取记录,这似乎按预期工作。

【讨论】:

以上是关于Kafka 连接器 Elasticsearch topics.regex的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Kafka 连接器中正确连接 Elastic Operator 部署的 Elasticsearch?

Kafka 如何使用 SSL 连接 Elasticsearch?

使用 Kafka Connect Elasticsearch 连接器的消息顺序

用于批量操作的 Kafka Elasticsearch 连接器

无法使用 Confluent Elasticsearch 接收器连接器将 Kafka 主题数据转换为结构化 JSON

是否可以为一个带有使用 debezium 和 kafka 的表的数据库创建一个 Elasticsearch 索引?