如何使用 Kafka Connect 将 Protobuf 消息传递到 Elasticsearch?
Posted
技术标签:
【中文标题】如何使用 Kafka Connect 将 Protobuf 消息传递到 Elasticsearch?【英文标题】:How to use Kafka Connect to deliver Protobuf messages to Elasticsearch? 【发布时间】:2021-09-19 07:03:19 【问题描述】:我正在使用这个 confluent protobuf 示例:
https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/Protobuf 根据此 Protobuf 文件生成“Term”类型的消息:
syntax = "proto3";
package confluent.kafka.examples.protobuf;
message Section
string Id = 1;
string SectionName = 2;
message Term
string TermId = 1;
string TermName = 2;
repeated Section Section = 3;
这些消息需要找到通往 Elasticsearch 的途径,所以我添加了这个 ES 连接器:
PUT localhost:8083/connectors/protobuftopic/config
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type.name": "_doc",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter.schemas.enable": "true",
"tasks.max": "1",
"topics": "protobuftopic",
"value.converter.schemas.enable": "true",
"name": "protobuftopic”,
"connection.url": "http://elasticsearch:9200",
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"key.ignore": "true",
"schema.ignore": "true"
现在我们有一个正在运行的连接器:
localhost:8083/connectors/protobuftopic/status
"name": "protobuftopic",
"connector":
"state": "RUNNING",
"worker_id": "kafka-connect:8083"
,
"tasks": [
"id": 0,
"state": "RUNNING",
"worker_id": "kafka-connect:8083"
],
"type": "sink"
然后我生成一些消息并使用 KSQL 打印它们:
ksql> print protobuftopic from beginning;
Key format: KAFKA_INT or KAFKA_STRING
Value format: PROTOBUF or KAFKA_STRING
rowtime: 2021/07/08 19:05:08.436 Z, key: 1952805485, value: TermId: "123" TermName: "Term-123" Section Id: "0" SectionName: "Section-123" , partition: 0
rowtime: 2021/07/08 19:05:08.914 Z, key: 1952805485, value: TermId: "123" TermName: "Term-123" Section Id: "1" SectionName: "Section-123" , partition: 0
rowtime: 2021/07/08 19:05:08.923 Z, key: 1952805485, value: TermId: "123" TermName: "Term-123" Section Id: "2" SectionName: "Section-123" , partition: 0
rowtime: 2021/07/08 19:05:08.932 Z, key: 1952805485, value: TermId: "123" TermName: "Term-123" Section Id: "3" SectionName: "Section-123" , partition: 0
rowtime: 2021/07/08 19:05:08.942 Z, key: 1952805485, value: TermId: "123" TermName: "Term-123" Section Id: "4" SectionName: "Section-123" , partition: 0
当我检查我的 ES 连接器的状态时,我看到以下内容:
"name": "protobuftopic",
"connector":
"state": "RUNNING",
"worker_id": "kafka-connect:8083"
,
"tasks": [
"id": 0,
"state": "FAILED",
"worker_id": "kafka-connect:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)\n\t
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:498)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)\n\t
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic protobuftopic to Protobuf: \n\tat io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:131)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:535)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:498)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)\n\t... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id -1\n
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n"
],
"type": "sink"
我在这里做错了什么?
【问题讨论】:
【参考方案1】:在这种特殊情况下,“未知魔术字节”错误的原因是没有明确的“密钥转换器”定义。连接器采用 Protobuf 并且失败了。通过指定解决了这个问题
"key.converter": "org.apache.kafka.connect.storage.StringConverter
所以整个连接器配置如下所示:
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type.name": "_doc",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter.schemas.enable": "true",
"tasks.max": "1",
"topics": "protobuftopic",
"value.converter.schemas.enable": "true",
"name": "protobuftopic",
"connection.url": "http://elasticsearch:9200",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
"key.ignore": "true",
"schema.ignore": "true",
"errors.tolerance": "all",
"errors.log.enable":true,
"errors.log.include.messages":true
【讨论】:
以上是关于如何使用 Kafka Connect 将 Protobuf 消息传递到 Elasticsearch?的主要内容,如果未能解决你的问题,请参考以下文章
如何将 kafka-connect-jdbc-5.5.0.jar 添加到 Debezium/connect
如何从 Kafka JSON 消息中获取 org.apache.kafka.connect.data.Decimal 值 [重复]
如何将 from_json 与 Kafka connect 0.10 和 Spark Structured Streaming 一起使用?