如何使用 Kafka Connect 将 Protobuf 消息传递到 Elasticsearch?

Posted

技术标签:

【中文标题】如何使用 Kafka Connect 将 Protobuf 消息传递到 Elasticsearch?【英文标题】:How to use Kafka Connect to deliver Protobuf messages to Elasticsearch? 【发布时间】:2021-09-19 07:03:19 【问题描述】:

我正在使用这个 confluent protobuf 示例:

https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/Protobuf 根据此 Protobuf 文件生成“Term”类型的消息:

syntax = "proto3";
package confluent.kafka.examples.protobuf;
message Section 
  string Id = 1;
  string SectionName = 2;

message Term 
  string TermId = 1;
  string TermName = 2;
  repeated Section Section = 3;

这些消息需要找到通往 Elasticsearch 的途径,所以我添加了这个 ES 连接器:

PUT  localhost:8083/connectors/protobuftopic/config
    
      "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
      "type.name": "_doc",
      "value.converter.schema.registry.url": "http://schema-registry:8081",
      "key.converter.schemas.enable": "true",
      "tasks.max": "1",
      "topics": "protobuftopic",
      "value.converter.schemas.enable": "true",
      "name": "protobuftopic”,
      "connection.url": "http://elasticsearch:9200",
      "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
      "key.ignore": "true",
      "schema.ignore": "true"
    

现在我们有一个正在运行的连接器:

localhost:8083/connectors/protobuftopic/status

  "name": "protobuftopic",
  "connector": 
    "state": "RUNNING",
    "worker_id": "kafka-connect:8083"
  ,
  "tasks": [
    
      "id": 0,
      "state": "RUNNING",
      "worker_id": "kafka-connect:8083"
    
  ],
  "type": "sink"

然后我生成一些消息并使用 KSQL 打印它们:

ksql> print protobuftopic from beginning;
Key format: KAFKA_INT or KAFKA_STRING
Value format: PROTOBUF or KAFKA_STRING
rowtime: 2021/07/08 19:05:08.436 Z, key: 1952805485, value: TermId: "123" TermName: "Term-123" Section  Id: "0" SectionName: "Section-123" , partition: 0
rowtime: 2021/07/08 19:05:08.914 Z, key: 1952805485, value: TermId: "123" TermName: "Term-123" Section  Id: "1" SectionName: "Section-123" , partition: 0
rowtime: 2021/07/08 19:05:08.923 Z, key: 1952805485, value: TermId: "123" TermName: "Term-123" Section  Id: "2" SectionName: "Section-123" , partition: 0
rowtime: 2021/07/08 19:05:08.932 Z, key: 1952805485, value: TermId: "123" TermName: "Term-123" Section  Id: "3" SectionName: "Section-123" , partition: 0
rowtime: 2021/07/08 19:05:08.942 Z, key: 1952805485, value: TermId: "123" TermName: "Term-123" Section  Id: "4" SectionName: "Section-123" , partition: 0

当我检查我的 ES 连接器的状态时,我看到以下内容:

    
      "name": "protobuftopic",
      "connector": 
        "state": "RUNNING",
        "worker_id": "kafka-connect:8083"
      ,
      "tasks": [
        
          "id": 0,
          "state": "FAILED",
          "worker_id": "kafka-connect:8083",
          "trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)\n\t
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:498)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)\n\t
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic protobuftopic to Protobuf: \n\tat io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:131)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertKey(WorkerSinkTask.java:535)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:498)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)\n\t... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id -1\n


Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n"
        
      ],
      "type": "sink"
    

我在这里做错了什么?

【问题讨论】:

【参考方案1】:

在这种特殊情况下,“未知魔术字节”错误的原因是没有明确的“密钥转换器”定义。连接器采用 Protobuf 并且失败了。通过指定解决了这个问题

"key.converter": "org.apache.kafka.connect.storage.StringConverter

所以整个连接器配置如下所示:


    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "type.name": "_doc",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.schemas.enable": "true",
    "tasks.max": "1",
    "topics": "protobuftopic",
    "value.converter.schemas.enable": "true",
    "name": "protobuftopic",
    "connection.url": "http://elasticsearch:9200",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.ignore": "true",
    "schema.ignore": "true",
    "errors.tolerance": "all",
    "errors.log.enable":true,
    "errors.log.include.messages":true
   

【讨论】:

以上是关于如何使用 Kafka Connect 将 Protobuf 消息传递到 Elasticsearch?的主要内容,如果未能解决你的问题,请参考以下文章

如何将 kafka-connect-jdbc-5.5.0.jar 添加到 Debezium/connect

Kafka Connect:如何将String解析为Map

Kafka Connect 如何安装 Connect 插件

如何从 Kafka JSON 消息中获取 org.apache.kafka.connect.data.Decimal 值 [重复]

使用Kafka Connect导入/导出数据

如何将 from_json 与 Kafka connect 0.10 和 Spark Structured Streaming 一起使用?