无法使用弹性接收器连接器将数据从融合平台发送到 Elasticsearch。异常:错误处理程序中超出了容差

Posted

技术标签:

【中文标题】无法使用弹性接收器连接器将数据从融合平台发送到 Elasticsearch。异常:错误处理程序中超出了容差【英文标题】:Cannot send data from confluent platform to Elasticsearch with elastic sink connector. Exception: Tolerance exceeded in error handler 【发布时间】:2021-10-19 05:54:26 【问题描述】:

我正在尝试运行一个简单的示例,通过使用带有弹性接收器连接器的融合平台将 kafka 数据发送到弹性搜索。 我正在使用融合平台版本 6.0.0,并安装了最新版本的弹性接收器连接器。 我的连接器的配置如下:


  "value.converter.schemas.enable": "false",
  "name": "e",
  "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "topics": [
    "ciao"
  ],
  "connection.url": [
    "http://192.168.x.x:9200"
  ],
  "key.ignore": "true",
  "schema.ignore": "true"

我使用 kafkacat 向我的主题“ciao”发送消息,但是一旦我发送数据,我的连接器就失败了。

我正在尝试查看问题可能是什么,我得到了这个异常:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:196)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:122)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:495)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:472)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: 
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:366)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:495)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:146)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:180)
    ... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ciao': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n at [Source: (byte[])\"ciao\"; line: 1, column: 5]\nCaused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ciao': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n at [Source: (byte[])\"ciao\"; line: 1, column: 5]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:722)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3560)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2655)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:857)
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:754)
    at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4247)
    at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2734)
    at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:64)
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:364)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:495)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:146)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:180)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:122)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:495)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:472)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

有什么问题? 我遵循了本教程,没有使用 docker 运行 confluent,而是在本地使用 confluent 平台 https://www.confluent.io/blog/kafka-elasticsearch-connector-tutorial/

【问题讨论】:

【参考方案1】:

我为您重新格式化了堆栈跟踪,您可以在其中看到

org.apache.kafka.connect.errors.DataException: 
Converting byte[] to Kafka Connect data failed due to serialization error: 
    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:366)
…
org.apache.kafka.common.errors.SerializationException: 
com.fasterxml.jackson.core.JsonParseException: 
Unrecognized token 'ciao': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n at [Source: (byte[])\"ciao\"; line: 1, column: 5]

看起来它正在读取的消息不是有效的 JSON。如果主题上只有一些格式错误的消息以及您确实想要处理的其他有效 JSON,您可以尝试设置 "errors.tolerance" : "all"

另见Error Handling in Kafka Connect and Elasticsearch Sink connector

【讨论】:

我尝试设置“errors.tolerance”:“all”,并且连接器运行良好,但是当我将索引搜索到elasticsearch时它不存在。 在这种情况下,这听起来不仅仅是一些格式错误的消息,而是您的数据格式与连接器所期望的格式存在根本性问题。您能否编辑您的问题以包含有关 Kafka 主题的数据样本?【参考方案2】:

我看到配置错误。 我编写了正确的配置,现在它运行良好。

配置如下:


  "name": "e",
  "config": 
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "topics": "ciao",
    "connection.url": "http://192.x.x.x:9200",
    "key.ignore": "true",
    "value.converter.schema.registry.url": "http://localhost:8081",
    "value.converter.schemas.enable": "true",
    "key.converter.schema.registry.url": "http://localhost:8081"
  

【讨论】:

以上是关于无法使用弹性接收器连接器将数据从融合平台发送到 Elasticsearch。异常:错误处理程序中超出了容差的主要内容,如果未能解决你的问题,请参考以下文章

JavaME RaspBerryPi UART 无法从 RS485 接收数据

从套接字连接发送和接收字节

Kafka HDFS 连接器 - 没有完全融合

I2C 从机发送完数据后,接收不到主机(接收到倒数第二个数据后)发送的停止条件

向 Trains 服务器报告的弹性如何?

从浏览器获取相同的会话到 Adob​​e