Kafka 连接器 Elasticsearch topics.regex
Posted
技术标签:
【中文标题】Kafka 连接器 Elasticsearch topics.regex【英文标题】:Kafka connector Elasticsearch topics.regex 【发布时间】:2020-11-24 08:54:54 【问题描述】:我使用的版本是confluent kafka 5.1.14
我试图运行 Kafka 连接器 Elasticsearch 以将数据从 Kafka 发送到 Elasticsearch 我已经在独立模式下测试了这个配置,一切正常。 这是 elasticsearch sink 的独立配置
name=elasticsearch-sink-standalone
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics.regex=^[a-zA-Z0-9]((?!-raw$)[a-zA-Z0-9-_])+[a-zA-Z0-9]$
connection.url=http://elasticsearch:9200
type.name=_doc
key.ignore=true
schema.ignore=true
但是当我使用相同的设置在分布式模式下创建接收器时,它会出错。 这是创建接收器时请求的帖子正文
"name" : "connector-test",
"config" :
"connector.class" : "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max" : "1",
"topics.regex" : "^[a-zA-Z0-9]((?!-raw$)[a-zA-Z0-9-_])+[a-zA-Z0-9]$",
"connection.url" : "http://elasitcsearch:9200",
"type.name" : "_doc",
"key.ignore" : "true",
"schema.ignore" : "true"
这是我得到的错误
[2020-08-04 18:18:35,911] ERROR WorkerSinkTaskid=connector-test-0 Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:512)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:492)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:344)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:512)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'status': was expecting ('true', 'false' or 'null')
at [Source: (byte[])"status-task-connector-test-0"; line: 1, column: 8]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'status': was expecting ('true', 'false' or 'null')
at [Source: (byte[])"status-task-connector-test-0"; line: 1, column: 8]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:703)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3532)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2627)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:832)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:729)
at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4043)
at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2572)
at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:342)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:512)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:512)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:492)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-08-04 18:18:35,912] ERROR WorkerSinkTaskid=connector-test-0 Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
我可以将分布式模式与其他正则表达式一起使用,所以我认为错误是由正则表达式引起的。 但让我感到困惑的是,独立模式可以使用这个正则表达式。
独立模式和分布式模式都运行(我不确定这些信息是否有帮助)
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
如果我需要提供更多信息,请告诉我 谢谢!
【问题讨论】:
【参考方案1】:我不认为这里的问题是正则表达式。堆栈跟踪清楚地表明,在从源系统(在本例中是 Kafka,因为连接器是接收器)轮询消息后尝试序列化消息,并且正在发生 something-2-json 错误。您需要了解导致转换错误的这些记录的格式。
正则表达式用于了解连接器应从哪些主题读取记录,这似乎按预期工作。
【讨论】:
以上是关于Kafka 连接器 Elasticsearch topics.regex的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Kafka 连接器中正确连接 Elastic Operator 部署的 Elasticsearch?
Kafka 如何使用 SSL 连接 Elasticsearch?
使用 Kafka Connect Elasticsearch 连接器的消息顺序
用于批量操作的 Kafka Elasticsearch 连接器