无法使用 Confluent Elasticsearch 接收器连接器将 Kafka 主题数据转换为结构化 JSON

Posted

技术标签:

【中文标题】无法使用 Confluent Elasticsearch 接收器连接器将 Kafka 主题数据转换为结构化 JSON【英文标题】:Unable to convert Kafka topic data into structured JSON with Confluent Elasticsearch sink connector 【发布时间】:2018-10-01 23:05:37 【问题描述】:

我正在使用 Kafka 构建数据管道。数据流程如下:在mongodb中捕获数据变化并发送到elasticsearch。

MongoDB

3.6 版 分片集群

卡夫卡

Confuent 平台 4.1.0 mongoDB 源连接器:debezium 0.7.5 elasticserach 水槽连接器

弹性搜索

版本 6.1.0

由于我仍在测试,因此与 Kafka 相关的系统正在单个服务器上运行。

启动动物园管理员

$ bin/zookeeper-server-start etc/kafka/zookeeper.properties

启动引导服务器

$ bin/kafka-server-start etc/kafka/server.properties

启动注册表架构

$ bin/schema-registry-start etc/schema-registry/schema-registry.properties

启动mongodb源连接器

$ bin/connect-standalone \ 
  etc/schema-registry/connect-avro-standalone.properties \ 
  etc/kafka/connect-mongo-source.properties

$ cat etc/kafka/connect-mongo-source.properties
>>> 
name=mongodb-source-connector
connector.class=io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=''
initial.sync.max.threads=1
tasks.max=1
mongodb.name=higee

$ cat etc/schema-registry/connect-avro-standalone.properties
>>>
bootstrap.servers=localhost:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.port=8083

启动 elasticsearch sink 连接器

$ bin/connect-standalone \ 
  etc/schema-registry/connect-avro-standalone2.properties  \ 
  etc/kafka-connect-elasticsearch/elasticsearch.properties

$ cat etc/kafka-connect-elasticsearch/elasticsearch.properties
>>>
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=higee.higee.higee
key.ignore=true
connection.url=''
type.name=kafka-connect

$ cat etc/schema-registry/connect-avro-standalone2.properties
>>>
bootstrap.servers=localhost:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.\ 
                      JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.port=8084

上述系统一切正常。 Kafka 连接器捕获数据更改 (CDC) 并通过接收器连接器成功将其发送到 elasticsearch。问题是我无法将字符串类型消息数据转换为结构化数据类型。例如,让我们在对 mongodb 进行一些更改后使用 topic-data。

    $ bin/kafka-avro-console-consumer \
    --bootstrap-server localhost:9092 \
    --topic higee.higee.higee --from-beginning | jq

然后,我得到以下结果。

    "after": null,
      "patch": 
        "string": "\"_id\" : \"$oid\" : \"5ad97f982a0f383bb638ecac\",\"name\" : \"higee\",\"salary\" : 100,\"origin\" : \"South Korea\""
      ,
      "source": 
        "version": 
          "string": "0.7.5"
        ,
        "name": "higee",
        "rs": "172.31.50.13",
        "ns": "higee",
        "sec": 1524214412,
        "ord": 1,
        "h": 
          "long": -2379508538412995600
        ,
        "initsync": 
          "boolean": false
        
      ,
      "op": 
        "string": "u"
      ,
      "ts_ms": 
        "long": 1524214412159
      
    

然后,如果我去弹性搜索,我会得到以下结果。

    
        "_index": "higee.higee.higee",
        "_type": "kafka-connect",
        "_id": "higee.higee.higee+0+3",
        "_score": 1,
        "_source": 
          "after": null,
          "patch": """"_id" : "$oid" : "5ad97f982a0f383bb638ecac", 
                       "name" : "higee", 
                       "salary" : 100,
                       "origin" : "South Korea"""",
          "source": 
            "version": "0.7.5",
            "name": "higee",
            "rs": "172.31.50.13",
            "ns": "higee",
            "sec": 1524214412,
            "ord": 1,
            "h": -2379508538412995600,
            "initsync": false
          ,
          "op": "u",
          "ts_ms": 1524214412159
        
      

我想要实现的目标如下

    
        "_index": "higee.higee.higee",
        "_type": "kafka-connect",
        "_id": "higee.higee.higee+0+3",
        "_score": 1,
        "_source": 
          "oid" : "5ad97f982a0f383bb638ecac",
          "name" : "higee", 
          "salary" : 100,
          "origin" : "South Korea"
         "
     

我一直在尝试并仍在考虑的一些选项如下。

日志存储

案例1:不知道如何解析这些字符(/u0002,/u0001)

logstash.conf

input 
  kafka 
    bootstrap_servers => ["localhost:9092"]
    topics => ["higee.higee.higee"]
    auto_offset_reset => "earliest"
    codec => json 
      charset => "UTF-8"
    
  


filter 
  json 
    source => "message"
  
 

output 
  stdout 
    codec => rubydebug
  

结果


"message" => "H\u0002�\u0001\"_id\" : \
    \"$oid\" : \"5adafc0e2a0f383bb63910a6\", \
     \"name\" : \"higee\", \
     \"salary\" : 101, \
     \"origin\" : \"South Korea\" \
     \u0002\n0.7.5\nhigee \ 
     \u0018172.31.50.13\u001Ahigee.higee2 \ 
     ��ح\v\u0002\u0002��̗���� \u0002\u0002u\u0002�����X",
"tags" => [[0] "_jsonparsefailure"]

案例2

logstash.conf

input 
  kafka 
    bootstrap_servers => ["localhost:9092"]
    topics => ["higee.higee.higee"]
    auto_offset_reset => "earliest"
    codec => avro 
      schema_uri => "./test.avsc"
    
  


filter 
  json 
    source => "message"
  


output 
  stdout 
    codec => rubydebug
  

test.avsc


    "namespace": "example",
    "type": "record",
    "name": "Higee",
    "fields": [
      "name": "_id", "type": "string",
      "name": "name", "type": "string",
      "name": "salary",  "type": "int",
      "name": "origin", "type": "string"
    ]
 

结果

An unexpected error occurred! :error=>#<NoMethodError: 
undefined method `type_sym' for nil:NilClass>, :backtrace=> 
["/home/ec2-user/logstash- 
6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
1.8.2/lib/avro/io.rb:224:in `match_schemas'", "/home/ec2- 
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
1.8.2/lib/avro/io.rb:280:in `read_data'", "/home/ec2- 
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
1.8.2/lib/avro/io.rb:376:in `read_union'", "/home/ec2- 
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
1.8.2/lib/avro/io.rb:309:in `read_data'", "/home/ec2- 
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
1.8.2/lib/avro/io.rb:384:in `block in read_record'", 
"org/jruby/RubyArray.java:1734:in `each'", "/home/ec2- 
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
1.8.2/lib/avro/io.rb:382:in `read_record'", "/home/ec2- 
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
1.8.2/lib/avro/io.rb:310:in `read_data'", "/home/ec2- 
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
1.8.2/lib/avro/io.rb:275:in `read'", "/home/ec2- 
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/ 
logstash-codec-avro-3.2.3-java/lib/logstash/codecs/ 
avro.rb:77:in `decode'", "/home/ec2-user/logstash-6.1.0/ 
vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka- 
8.0.2/lib/ logstash/inputs/kafka.rb:254:in `block in 
thread_runner'", "/home/ec2-user/logstash- 
6.1.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka- 
8.0.2/lib/logstash/inputs/kafka.rb:253:in `block in 
thread_runner'"]

python 客户端

在进行一些数据操作后使用不同的主题名称生成主题,以便弹性搜索接收器连接器可以仅使用来自 python 操作主题的格式正确的消息

kafka library : 无法解码消息

from kafka import KafkaConsumer

consumer = KafkaConsumer(
             topics='higee.higee.higee',
             auto_offset_reset='earliest'
           )

for message in consumer:
    message.value.decode('utf-8')

>>> 'utf-8' codec can't decode byte 0xe4 in position 6: 
    invalid continuation byte

confluent_kafka 与 python 3 不兼容


知道如何在 elasticsearch 中对数据进行 jsonify 处理吗? 以下是我搜索的来源。

mongodb debezium mongodb event flattening avro converter serializing debizium events debizum tutorial

提前致谢。


一些尝试

1) 我已按如下方式更改了 connect-mongo-source.properties 文件以测试转换。

    $ cat etc/kafka/connect-mongo-source.properties
    >>> 
    name=mongodb-source-connector
    connector.class=io.debezium.connector.mongodb.MongoDbConnector
    mongodb.hosts=''
    initial.sync.max.threads=1
    tasks.max=1
    mongodb.name=higee
    transforms=unwrap     
    transforms.unwrap.type = io.debezium.connector.mongodbtransforms.UnwrapFromMongoDbEnvelope

以下是我得到的错误日志。我还不习惯 Kafka,更重要的是 debezium 平台,我无法调试这个错误。

ERROR WorkerSourceTaskid=mongodb-source-connector-0 Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.bson.json.JsonParseException: JSON reader expected a string but found '0'.
    at org.bson.json.JsonReader.visitBinDataExtendedJson(JsonReader.java:904)
    at org.bson.json.JsonReader.visitExtendedJSON(JsonReader.java:570)
    at org.bson.json.JsonReader.readBsonType(JsonReader.java:145)
    at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:82)
    at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:41)
    at org.bson.codecs.BsonDocumentCodec.readValue(BsonDocumentCodec.java:101)
    at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:84)
    at org.bson.BsonDocument.parse(BsonDocument.java:62)
    at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:45)
    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:218)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

2) 这一次,我更改了 elasticsearch.properties,并没有对 connect-mongo-source.properties 进行更改。

$ cat connect-mongo-source.properties

    name=mongodb-source-connector
    connector.class=io.debezium.connector.mongodb.MongoDbConnector
    mongodb.hosts=''
    initial.sync.max.threads=1
    tasks.max=1
    mongodb.name=higee

$ cat elasticsearch.properties

    name=elasticsearch-sink
    connector.class = io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
    tasks.max=1
    topics=higee.higee.higee
    key.ignore=true
    connection.url=''
    type.name=kafka-connect
    transforms=unwrap
    transforms.unwrap.type = io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope

我得到了以下错误。

ERROR WorkerSinkTaskid=elasticsearch-sink-0 Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.bson.BsonInvalidOperationException: Document does not contain key $set
    at org.bson.BsonDocument.throwIfKeyAbsent(BsonDocument.java:844)
    at org.bson.BsonDocument.getDocument(BsonDocument.java:135)
    at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:53)
    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:480)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

3) 更改 test.avsc 并运行 logstash。我没有收到任何错误消息,但结果不是我所期望的,因为 originsalaryname 字段都是空的,即使它们被赋予了非空值。我什至能够通过控制台消费者正确读取数据。

$ cat test.avsc
>>>
    
      "type" : "record",
      "name" : "MongoEvent",
      "namespace" : "higee.higee",
      "fields" : [ 
        "name" : "_id",
        "type" : 
          "type" : "record",
          "name" : "HigeeEvent",
          "fields" : [ 
            "name" : "$oid",
            "type" : "string"
          , 
            "name" : "salary",
            "type" : "long"
          , 
            "name" : "origin",
            "type" : "string"
          , 
            "name" : "name",
            "type" : "string"
           ]
        
       ]
    

$ cat logstash3.conf
>>>
    input 
      kafka 
        bootstrap_servers => ["localhost:9092"]
        topics => ["higee.higee.higee"]
        auto_offset_reset => "earliest"
        codec => avro 
          schema_uri => "./test.avsc"
        
      
    

    output 
      stdout 
       codec => rubydebug
      
    

$ bin/logstash -f logstash3.conf
>>>
    
    "@version" => "1",
    "_id" => 
      "salary" => 0,
      "origin" => "",
      "$oid" => "",
      "name" => ""
    ,
    "@timestamp" => 2018-04-25T09:39:07.962Z
    

【问题讨论】:

我认为你需要在调试中包含来自 Mongo 本身的数据......补丁字段显然只是一个字符串,而不是一个对象......关于 Logstash,codec =&gt; json 不起作用对于 Avro 数据,在您的 Avro 编解码器示例中,favorite_numberfavorite_color 应该代表什么? $oid_id 不同 @cricket_007 1) favorite_numberfavorite_color 分别是 salaryorigin。我只是复制粘贴不同的部分。我在上面的问题中编辑了那些。 2) 来自 mongodb 的实际数据是什么意思? patch 字段以字符串格式表示原始数据。这就是我的理解,它们实际上是真实的数据。 3) $oid 是 mongodb 中每个文档的对象 id,默认情况下由 mongodb 给出。有没有办法可以更改/删除该字段? 【参考方案1】:

Python 客户端

必须使用 Avro Consumer,否则你会得到'utf-8' codec can't decode byte

Even this example will not work 因为您仍然需要模式注册表来查找模式。

The prerequisites of Confluent's Python Client says it works with Python 3.x

没有什么能阻止你使用不同的客户端,所以不知道你为什么只尝试 Python 就离开了它。

Logstash Avro 编解码器

    JSON 编解码器无法解码 Avro 数据。我认为 avro 输入编解码器之后的 json 过滤器也不起作用 您的 Avro 架构错误 - 您缺少 $oid 代替 _id “原始 Avro”(包括消息本身中的架构)和 Confluent 的编码版本(仅包含注册表中的架构 ID)之间存在差异。意思是,Logstash 没有与模式注册表集成......至少not without a plugin。

你的 AVSC 实际上应该是这样的


  "type" : "record",
  "name" : "MongoEvent",
  "namespace" : "higee.higee",
  "fields" : [ 
    "name" : "_id",
    "type" : 
      "type" : "record",
      "name" : "HigeeEvent",
      "fields" : [ 
        "name" : "$oid",
        "type" : "string"
      , 
        "name" : "salary",
        "type" : "long"
      , 
        "name" : "origin",
        "type" : "string"
      , 
        "name" : "name",
        "type" : "string"
       ]
    
   ]

但是,Avro doesn't allow for names starting with anything but a regex of [A-Za-z_],所以$oid 将是一个问题。

虽然我不推荐它(也没有实际尝试过),但将 JSON 编码的 Avro 数据从 Avro 控制台消费者获取到 Logstash 的一种可能方法是使用 Pipe 输入插件

input 
  pipe 
    codec => json
    command => "/path/to/confluent/bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic higee.higee.higee --from-beginning" 
  

地贝齐姆

请注意,after 值始终是一个字符串,按照惯例,它将包含文档的 JSON 表示

http://debezium.io/docs/connectors/mongodb/

我认为这也适用于 patch 值,但我真的不知道 Debezium。

如果不使用简单消息转换 (SMT),Kafka 将不会解析运行中的 JSON。阅读您链接到的文档,您可能应该add these to your Connect Source properties

transforms=unwrap
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope

同样值得指出的是,字段扁平化已在路线图上 - DBZ-561

Kafka Connect Elasticsearch

如果不使用 Logstash 或其JSON Processor 之类的东西,Elasticsearch 不会解析和处理编码的 JSON 字符串对象。相反,它只将它们作为一个完整的字符串体进行索引。

如果我没记错的话,Connect 只会将 Elasticsearch 映射应用于*** Avro 字段,而不是嵌套字段。

换句话说,生成的映射遵循这种模式,

"patch": 
    "string": "...some JSON object string here..."
  ,

你真正需要这样的地方——也许是manually defining your ES index

"patch": 
   "properties": 
      "_id": 
        "properties" 
          "$oid" :   "type": "text" , 
          "name" :   "type": "text" ,
          "salary":   "type": "int"  , 
          "origin":  "type": "text" 
      ,

同样,不确定是否允许使用美元符号。

Kafka Connect MongoDB 源码

如果以上都不起作用,您可以尝试不同的连接器

Option 1 Option 2

【讨论】:

首先,感谢您的反馈。使用以下设置配置 debizium 连接器没有按我预期的方式工作。 transforms=unwrap transforms.unwrap.type = io.debezium.connector.mongodb \ transforms.UnwrapFromMongoDbEnvelope 是的,我读过confluent doc,他们支持Python 3。然而,一旦导入库,我就遇到了一个异常,因为有些代码是写的在 python2 语法中。下面是一个例子。除了 NameError,错误: 我没有手动将它们更改为兼容 python3 的代码,而是使用了 python2 并且它有效。以下是代码 请不要在 cmets 中发布代码或长错误 我明白了,它们根本不可读。那我该如何粘贴错误消息或代码块? 编辑您的帖子,或者如果您已经解决了问题,请发布您自己的答案【参考方案2】:

我能够使用 python kafka 客户端解决这个问题。以下是我的管道的新架构。

尽管 Confluent 文档说支持 python3,但我还是使用了 python 2。主要原因是有一些 python2 语法代码。例如...(不完全按照行,但语法相似)

    except NameError, err:

为了与 Python3 一起使用,我需要将上面的行转换为:

    except NameError as err:

话虽如此,以下是我的python代码。请注意,此代码仅用于原型设计,尚未用于生产。

通过 Confluent Consumer 使用消息

代码

from confluent_kafka.avro import AvroConsumer

c = AvroConsumer( 
       'bootstrap.servers': '',
       'group.id': 'groupid',
       'schema.registry.url': ''
    )

c.subscribe(['higee.higee.higee'])

x = True

while x:
    msg = c.poll(100)
    if msg:
        message = msg.value()
        print(message)
        x = False

c.close()

(在 mongodb 中更新文档后)让我们检查message 变量

u'after': None,
 u'op': u'u',
 u'patch': u'
     "_id" : "$oid" : "5adafc0e2a0f383bb63910a6",
     "name" : "higee",
     "salary" : 100,
     "origin" : "S Korea"',
 u'source': 
     u'h': 5734791721791032689L,
     u'initsync': False,
     u'name': u'higee',
     u'ns': u'higee.higee',
     u'ord': 1,
     u'rs': u'',
     u'sec': 1524362971,
     u'version': u'0.7.5',
 u'ts_ms': 1524362971148
 

处理消费的消息

代码

patch = message['patch']
patch_dict = eval(patch)
patch_dict.pop('_id')

查看patch_dict

'name': 'higee', 'origin': 'S Korea', 'salary': 100

通过 Confluent Producer 生成消息

    from confluent_kafka import avro
    from confluent_kafka.avro import AvroProducer

    value_schema_str = """
    
       "namespace": "higee.higee",
       "name": "MongoEvent",
       "type": "record",
       "fields" : [
           
               "name" : "name",
               "type" : "string"
           ,
           
              "name" : "origin",
              "type" : "string"
           ,
           
               "name" : "salary",
               "type" : "int32"
           
       ]
    
    """
    AvroProducerConf = 
        'bootstrap.servers': '',
        'schema.registry.url': ''
    

    value_schema = avro.load('./user.avsc')
    avroProducer = AvroProducer(
                       AvroProducerConf, 
                       default_value_schema=value_schema
                   )

    avroProducer.produce(topic='python', value=patch_dict)
    avroProducer.flush()

剩下的唯一一件事就是让elasticsearch sink连接器通过以下格式设置配置来响应新主题'python'。除了topics,一切都保持不变。

    name=elasticsearch-sink
    connector.class= io.confluent.connect \ 
                     elasticsearch.ElasticsearchSinkConnector
    tasks.max=1
    topics=python
    key.ignore=true
    connection.url=''
    type.name=kafka-connect

然后运行 ​​elasticsearch sink 连接器并在 elasticsearch 上检查它。

    
        "_index": "zzzz",
        "_type": "kafka-connect",
        "_id": "zzzz+0+3",
        "_score": 1,
        "_source": 
          "name": "higee",
          "origin": "S Korea",
          "salary": 100
        
      

【讨论】:

请记住:eval() 不应在 Python 中使用。导入 json 模块会更安全。另外,我仍然相信使用 Java Kafka SMT 将是该问题的内置解决方案,因为该解决方案不是故障证明【参考方案3】:

+1 对 @cricket_007 的建议 - 使用 io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope 单消息转换。您可以在here 阅读有关 SMT 及其优势的更多信息。

【讨论】:

感谢您的链接。我也读过第 1 部分和第 2 部分。它们有助于理解整体概念。此外,我还查看了另外两个很棒的文档,以更深入地了解转换。然而,此时我的问题是我无法调试发生的一些错误。我将编辑我的 connect-mongo-source.properties 文件并粘贴错误消息。 kafka.apache.org/documentation.html#connect_transforms 和 confluent.io/thank-you/… 这是另一个尝试。根据Configuration 部分的此页面debezium.io/docs/configuration/mongodb-event-flattening,它说我们需要将该配置放在接收器连接器属性文件中。这次,我收到了不同的错误消息。我也会把那个放在我原来的问题上。 @GeeYeolNahm 是否可以提供来自导致错误的主题的消息? @JiriPechanec 我已经包含在我原来的问题中。 “然后,我得到以下结果....”下方的代码块是消息。你能检查一下吗?

以上是关于无法使用 Confluent Elasticsearch 接收器连接器将 Kafka 主题数据转换为结构化 JSON的主要内容,如果未能解决你的问题,请参考以下文章

Spark Confluent 模式注册表客户端 - 无法识别的字段“schemaType”

无法使用 Confluent Elasticsearch 接收器连接器将 Kafka 主题数据转换为结构化 JSON

TiCDC+Confluent同步数据到Oracle

如何将 Kafka (Java) 应用程序从 Windows 连接到 Linux 中的 Confluent

C# 无法使用 Kafka 主题的消息?

在没有安装 Confluent 平台的情况下使用 Confluent Hub