无法使用 Confluent Elasticsearch 接收器连接器将 Kafka 主题数据转换为结构化 JSON
Posted
技术标签:
【中文标题】无法使用 Confluent Elasticsearch 接收器连接器将 Kafka 主题数据转换为结构化 JSON【英文标题】:Unable to convert Kafka topic data into structured JSON with Confluent Elasticsearch sink connector 【发布时间】:2018-10-01 23:05:37 【问题描述】:我正在使用 Kafka 构建数据管道。数据流程如下:在mongodb中捕获数据变化并发送到elasticsearch。
MongoDB
3.6 版 分片集群卡夫卡
Confuent 平台 4.1.0 mongoDB 源连接器:debezium 0.7.5 elasticserach 水槽连接器弹性搜索
版本 6.1.0由于我仍在测试,因此与 Kafka 相关的系统正在单个服务器上运行。
启动动物园管理员
$ bin/zookeeper-server-start etc/kafka/zookeeper.properties
启动引导服务器
$ bin/kafka-server-start etc/kafka/server.properties
启动注册表架构
$ bin/schema-registry-start etc/schema-registry/schema-registry.properties
启动mongodb源连接器
$ bin/connect-standalone \
etc/schema-registry/connect-avro-standalone.properties \
etc/kafka/connect-mongo-source.properties
$ cat etc/kafka/connect-mongo-source.properties
>>>
name=mongodb-source-connector
connector.class=io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=''
initial.sync.max.threads=1
tasks.max=1
mongodb.name=higee
$ cat etc/schema-registry/connect-avro-standalone.properties
>>>
bootstrap.servers=localhost:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.port=8083
启动 elasticsearch sink 连接器
$ bin/connect-standalone \
etc/schema-registry/connect-avro-standalone2.properties \
etc/kafka-connect-elasticsearch/elasticsearch.properties
$ cat etc/kafka-connect-elasticsearch/elasticsearch.properties
>>>
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=higee.higee.higee
key.ignore=true
connection.url=''
type.name=kafka-connect
$ cat etc/schema-registry/connect-avro-standalone2.properties
>>>
bootstrap.servers=localhost:9092
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.\
JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.port=8084
上述系统一切正常。 Kafka 连接器捕获数据更改 (CDC) 并通过接收器连接器成功将其发送到 elasticsearch。问题是我无法将字符串类型消息数据转换为结构化数据类型。例如,让我们在对 mongodb 进行一些更改后使用 topic-data。
$ bin/kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--topic higee.higee.higee --from-beginning | jq
然后,我得到以下结果。
"after": null,
"patch":
"string": "\"_id\" : \"$oid\" : \"5ad97f982a0f383bb638ecac\",\"name\" : \"higee\",\"salary\" : 100,\"origin\" : \"South Korea\""
,
"source":
"version":
"string": "0.7.5"
,
"name": "higee",
"rs": "172.31.50.13",
"ns": "higee",
"sec": 1524214412,
"ord": 1,
"h":
"long": -2379508538412995600
,
"initsync":
"boolean": false
,
"op":
"string": "u"
,
"ts_ms":
"long": 1524214412159
然后,如果我去弹性搜索,我会得到以下结果。
"_index": "higee.higee.higee",
"_type": "kafka-connect",
"_id": "higee.higee.higee+0+3",
"_score": 1,
"_source":
"after": null,
"patch": """"_id" : "$oid" : "5ad97f982a0f383bb638ecac",
"name" : "higee",
"salary" : 100,
"origin" : "South Korea"""",
"source":
"version": "0.7.5",
"name": "higee",
"rs": "172.31.50.13",
"ns": "higee",
"sec": 1524214412,
"ord": 1,
"h": -2379508538412995600,
"initsync": false
,
"op": "u",
"ts_ms": 1524214412159
我想要实现的目标如下
"_index": "higee.higee.higee",
"_type": "kafka-connect",
"_id": "higee.higee.higee+0+3",
"_score": 1,
"_source":
"oid" : "5ad97f982a0f383bb638ecac",
"name" : "higee",
"salary" : 100,
"origin" : "South Korea"
"
我一直在尝试并仍在考虑的一些选项如下。
日志存储
案例1:不知道如何解析这些字符(/u0002,/u0001)
logstash.conf
input
kafka
bootstrap_servers => ["localhost:9092"]
topics => ["higee.higee.higee"]
auto_offset_reset => "earliest"
codec => json
charset => "UTF-8"
filter
json
source => "message"
output
stdout
codec => rubydebug
结果
"message" => "H\u0002�\u0001\"_id\" : \
\"$oid\" : \"5adafc0e2a0f383bb63910a6\", \
\"name\" : \"higee\", \
\"salary\" : 101, \
\"origin\" : \"South Korea\" \
\u0002\n0.7.5\nhigee \
\u0018172.31.50.13\u001Ahigee.higee2 \
��ح\v\u0002\u0002��̗���� \u0002\u0002u\u0002�����X",
"tags" => [[0] "_jsonparsefailure"]
案例2
logstash.conf
input
kafka
bootstrap_servers => ["localhost:9092"]
topics => ["higee.higee.higee"]
auto_offset_reset => "earliest"
codec => avro
schema_uri => "./test.avsc"
filter
json
source => "message"
output
stdout
codec => rubydebug
test.avsc
"namespace": "example",
"type": "record",
"name": "Higee",
"fields": [
"name": "_id", "type": "string",
"name": "name", "type": "string",
"name": "salary", "type": "int",
"name": "origin", "type": "string"
]
结果
An unexpected error occurred! :error=>#<NoMethodError:
undefined method `type_sym' for nil:NilClass>, :backtrace=>
["/home/ec2-user/logstash-
6.1.0/vendor/bundle/jruby/2.3.0/gems/avro-
1.8.2/lib/avro/io.rb:224:in `match_schemas'", "/home/ec2-
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro-
1.8.2/lib/avro/io.rb:280:in `read_data'", "/home/ec2-
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro-
1.8.2/lib/avro/io.rb:376:in `read_union'", "/home/ec2-
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro-
1.8.2/lib/avro/io.rb:309:in `read_data'", "/home/ec2-
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro-
1.8.2/lib/avro/io.rb:384:in `block in read_record'",
"org/jruby/RubyArray.java:1734:in `each'", "/home/ec2-
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro-
1.8.2/lib/avro/io.rb:382:in `read_record'", "/home/ec2-
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro-
1.8.2/lib/avro/io.rb:310:in `read_data'", "/home/ec2-
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro-
1.8.2/lib/avro/io.rb:275:in `read'", "/home/ec2-
user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/
logstash-codec-avro-3.2.3-java/lib/logstash/codecs/
avro.rb:77:in `decode'", "/home/ec2-user/logstash-6.1.0/
vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-
8.0.2/lib/ logstash/inputs/kafka.rb:254:in `block in
thread_runner'", "/home/ec2-user/logstash-
6.1.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-
8.0.2/lib/logstash/inputs/kafka.rb:253:in `block in
thread_runner'"]
python 客户端
在进行一些数据操作后使用不同的主题名称生成主题,以便弹性搜索接收器连接器可以仅使用来自 python 操作主题的格式正确的消息kafka
library : 无法解码消息
from kafka import KafkaConsumer
consumer = KafkaConsumer(
topics='higee.higee.higee',
auto_offset_reset='earliest'
)
for message in consumer:
message.value.decode('utf-8')
>>> 'utf-8' codec can't decode byte 0xe4 in position 6:
invalid continuation byte
confluent_kafka
与 python 3 不兼容
知道如何在 elasticsearch 中对数据进行 jsonify 处理吗? 以下是我搜索的来源。
mongodb debezium mongodb event flattening avro converter serializing debizium events debizum tutorial提前致谢。
一些尝试
1) 我已按如下方式更改了 connect-mongo-source.properties 文件以测试转换。
$ cat etc/kafka/connect-mongo-source.properties
>>>
name=mongodb-source-connector
connector.class=io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=''
initial.sync.max.threads=1
tasks.max=1
mongodb.name=higee
transforms=unwrap
transforms.unwrap.type = io.debezium.connector.mongodbtransforms.UnwrapFromMongoDbEnvelope
以下是我得到的错误日志。我还不习惯 Kafka,更重要的是 debezium 平台,我无法调试这个错误。
ERROR WorkerSourceTaskid=mongodb-source-connector-0 Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.bson.json.JsonParseException: JSON reader expected a string but found '0'.
at org.bson.json.JsonReader.visitBinDataExtendedJson(JsonReader.java:904)
at org.bson.json.JsonReader.visitExtendedJSON(JsonReader.java:570)
at org.bson.json.JsonReader.readBsonType(JsonReader.java:145)
at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:82)
at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:41)
at org.bson.codecs.BsonDocumentCodec.readValue(BsonDocumentCodec.java:101)
at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:84)
at org.bson.BsonDocument.parse(BsonDocument.java:62)
at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:45)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:218)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2) 这一次,我更改了 elasticsearch.properties,并没有对 connect-mongo-source.properties 进行更改。
$ cat connect-mongo-source.properties
name=mongodb-source-connector
connector.class=io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=''
initial.sync.max.threads=1
tasks.max=1
mongodb.name=higee
$ cat elasticsearch.properties
name=elasticsearch-sink
connector.class = io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=higee.higee.higee
key.ignore=true
connection.url=''
type.name=kafka-connect
transforms=unwrap
transforms.unwrap.type = io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope
我得到了以下错误。
ERROR WorkerSinkTaskid=elasticsearch-sink-0 Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.bson.BsonInvalidOperationException: Document does not contain key $set
at org.bson.BsonDocument.throwIfKeyAbsent(BsonDocument.java:844)
at org.bson.BsonDocument.getDocument(BsonDocument.java:135)
at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:53)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:480)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
3) 更改 test.avsc 并运行 logstash。我没有收到任何错误消息,但结果不是我所期望的,因为 origin
、salary
、name
字段都是空的,即使它们被赋予了非空值。我什至能够通过控制台消费者正确读取数据。
$ cat test.avsc
>>>
"type" : "record",
"name" : "MongoEvent",
"namespace" : "higee.higee",
"fields" : [
"name" : "_id",
"type" :
"type" : "record",
"name" : "HigeeEvent",
"fields" : [
"name" : "$oid",
"type" : "string"
,
"name" : "salary",
"type" : "long"
,
"name" : "origin",
"type" : "string"
,
"name" : "name",
"type" : "string"
]
]
$ cat logstash3.conf
>>>
input
kafka
bootstrap_servers => ["localhost:9092"]
topics => ["higee.higee.higee"]
auto_offset_reset => "earliest"
codec => avro
schema_uri => "./test.avsc"
output
stdout
codec => rubydebug
$ bin/logstash -f logstash3.conf
>>>
"@version" => "1",
"_id" =>
"salary" => 0,
"origin" => "",
"$oid" => "",
"name" => ""
,
"@timestamp" => 2018-04-25T09:39:07.962Z
【问题讨论】:
我认为你需要在调试中包含来自 Mongo 本身的数据......补丁字段显然只是一个字符串,而不是一个对象......关于 Logstash,codec => json
不起作用对于 Avro 数据,在您的 Avro 编解码器示例中,favorite_number
和 favorite_color
应该代表什么? $oid
与 _id
不同
@cricket_007 1) favorite_number
和 favorite_color
分别是 salary
和 origin
。我只是复制粘贴不同的部分。我在上面的问题中编辑了那些。 2) 来自 mongodb 的实际数据是什么意思? patch 字段以字符串格式表示原始数据。这就是我的理解,它们实际上是真实的数据。 3) $oid 是 mongodb 中每个文档的对象 id,默认情况下由 mongodb 给出。有没有办法可以更改/删除该字段?
【参考方案1】:
Python 客户端
你必须使用 Avro Consumer,否则你会得到'utf-8' codec can't decode byte
Even this example will not work 因为您仍然需要模式注册表来查找模式。
The prerequisites of Confluent's Python Client says it works with Python 3.x
没有什么能阻止你使用不同的客户端,所以不知道你为什么只尝试 Python 就离开了它。
Logstash Avro 编解码器
-
JSON 编解码器无法解码 Avro 数据。我认为 avro 输入编解码器之后的 json 过滤器也不起作用
您的 Avro 架构错误 - 您缺少
$oid
代替 _id
“原始 Avro”(包括消息本身中的架构)和 Confluent 的编码版本(仅包含注册表中的架构 ID)之间存在差异。意思是,Logstash 没有与模式注册表集成......至少not without a plugin。
你的 AVSC 实际上应该是这样的
"type" : "record",
"name" : "MongoEvent",
"namespace" : "higee.higee",
"fields" : [
"name" : "_id",
"type" :
"type" : "record",
"name" : "HigeeEvent",
"fields" : [
"name" : "$oid",
"type" : "string"
,
"name" : "salary",
"type" : "long"
,
"name" : "origin",
"type" : "string"
,
"name" : "name",
"type" : "string"
]
]
但是,Avro doesn't allow for names starting with anything but a regex of [A-Za-z_]
,所以$oid
将是一个问题。
虽然我不推荐它(也没有实际尝试过),但将 JSON 编码的 Avro 数据从 Avro 控制台消费者获取到 Logstash 的一种可能方法是使用 Pipe 输入插件
input
pipe
codec => json
command => "/path/to/confluent/bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic higee.higee.higee --from-beginning"
地贝齐姆
请注意,
after
值始终是一个字符串,按照惯例,它将包含文档的 JSON 表示
http://debezium.io/docs/connectors/mongodb/
我认为这也适用于 patch
值,但我真的不知道 Debezium。
如果不使用简单消息转换 (SMT),Kafka 将不会解析运行中的 JSON。阅读您链接到的文档,您可能应该add these to your Connect Source properties
transforms=unwrap
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope
同样值得指出的是,字段扁平化已在路线图上 - DBZ-561
Kafka Connect Elasticsearch
如果不使用 Logstash 或其JSON Processor 之类的东西,Elasticsearch 不会解析和处理编码的 JSON 字符串对象。相反,它只将它们作为一个完整的字符串体进行索引。
如果我没记错的话,Connect 只会将 Elasticsearch 映射应用于*** Avro 字段,而不是嵌套字段。
换句话说,生成的映射遵循这种模式,
"patch":
"string": "...some JSON object string here..."
,
你真正需要这样的地方——也许是manually defining your ES index
"patch":
"properties":
"_id":
"properties"
"$oid" : "type": "text" ,
"name" : "type": "text" ,
"salary": "type": "int" ,
"origin": "type": "text"
,
同样,不确定是否允许使用美元符号。
Kafka Connect MongoDB 源码
如果以上都不起作用,您可以尝试不同的连接器
Option 1 Option 2【讨论】:
首先,感谢您的反馈。使用以下设置配置 debizium 连接器没有按我预期的方式工作。 transforms=unwrap transforms.unwrap.type = io.debezium.connector.mongodb \ transforms.UnwrapFromMongoDbEnvelope 是的,我读过confluent doc,他们支持Python 3。然而,一旦导入库,我就遇到了一个异常,因为有些代码是写的在 python2 语法中。下面是一个例子。除了 NameError,错误: 我没有手动将它们更改为兼容 python3 的代码,而是使用了 python2 并且它有效。以下是代码 请不要在 cmets 中发布代码或长错误 我明白了,它们根本不可读。那我该如何粘贴错误消息或代码块? 编辑您的帖子,或者如果您已经解决了问题,请发布您自己的答案【参考方案2】:我能够使用 python kafka 客户端解决这个问题。以下是我的管道的新架构。
尽管 Confluent 文档说支持 python3,但我还是使用了 python 2。主要原因是有一些 python2 语法代码。例如...(不完全按照行,但语法相似)
except NameError, err:
为了与 Python3 一起使用,我需要将上面的行转换为:
except NameError as err:
话虽如此,以下是我的python代码。请注意,此代码仅用于原型设计,尚未用于生产。
通过 Confluent Consumer 使用消息
代码
from confluent_kafka.avro import AvroConsumer
c = AvroConsumer(
'bootstrap.servers': '',
'group.id': 'groupid',
'schema.registry.url': ''
)
c.subscribe(['higee.higee.higee'])
x = True
while x:
msg = c.poll(100)
if msg:
message = msg.value()
print(message)
x = False
c.close()
(在 mongodb 中更新文档后)让我们检查message
变量
u'after': None,
u'op': u'u',
u'patch': u'
"_id" : "$oid" : "5adafc0e2a0f383bb63910a6",
"name" : "higee",
"salary" : 100,
"origin" : "S Korea"',
u'source':
u'h': 5734791721791032689L,
u'initsync': False,
u'name': u'higee',
u'ns': u'higee.higee',
u'ord': 1,
u'rs': u'',
u'sec': 1524362971,
u'version': u'0.7.5',
u'ts_ms': 1524362971148
处理消费的消息
代码
patch = message['patch']
patch_dict = eval(patch)
patch_dict.pop('_id')
查看patch_dict
'name': 'higee', 'origin': 'S Korea', 'salary': 100
通过 Confluent Producer 生成消息
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
value_schema_str = """
"namespace": "higee.higee",
"name": "MongoEvent",
"type": "record",
"fields" : [
"name" : "name",
"type" : "string"
,
"name" : "origin",
"type" : "string"
,
"name" : "salary",
"type" : "int32"
]
"""
AvroProducerConf =
'bootstrap.servers': '',
'schema.registry.url': ''
value_schema = avro.load('./user.avsc')
avroProducer = AvroProducer(
AvroProducerConf,
default_value_schema=value_schema
)
avroProducer.produce(topic='python', value=patch_dict)
avroProducer.flush()
剩下的唯一一件事就是让elasticsearch sink连接器通过以下格式设置配置来响应新主题'python'。除了topics
,一切都保持不变。
name=elasticsearch-sink
connector.class= io.confluent.connect \
elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=python
key.ignore=true
connection.url=''
type.name=kafka-connect
然后运行 elasticsearch sink 连接器并在 elasticsearch 上检查它。
"_index": "zzzz",
"_type": "kafka-connect",
"_id": "zzzz+0+3",
"_score": 1,
"_source":
"name": "higee",
"origin": "S Korea",
"salary": 100
【讨论】:
请记住:eval()
不应在 Python 中使用。导入 json 模块会更安全。另外,我仍然相信使用 Java Kafka SMT 将是该问题的内置解决方案,因为该解决方案不是故障证明【参考方案3】:
+1 对 @cricket_007 的建议 - 使用 io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope
单消息转换。您可以在here 阅读有关 SMT 及其优势的更多信息。
【讨论】:
感谢您的链接。我也读过第 1 部分和第 2 部分。它们有助于理解整体概念。此外,我还查看了另外两个很棒的文档,以更深入地了解转换。然而,此时我的问题是我无法调试发生的一些错误。我将编辑我的 connect-mongo-source.properties 文件并粘贴错误消息。 kafka.apache.org/documentation.html#connect_transforms 和 confluent.io/thank-you/… 这是另一个尝试。根据Configuration
部分的此页面debezium.io/docs/configuration/mongodb-event-flattening,它说我们需要将该配置放在接收器连接器属性文件中。这次,我收到了不同的错误消息。我也会把那个放在我原来的问题上。
@GeeYeolNahm 是否可以提供来自导致错误的主题的消息?
@JiriPechanec 我已经包含在我原来的问题中。 “然后,我得到以下结果....”下方的代码块是消息。你能检查一下吗?以上是关于无法使用 Confluent Elasticsearch 接收器连接器将 Kafka 主题数据转换为结构化 JSON的主要内容,如果未能解决你的问题,请参考以下文章
Spark Confluent 模式注册表客户端 - 无法识别的字段“schemaType”
无法使用 Confluent Elasticsearch 接收器连接器将 Kafka 主题数据转换为结构化 JSON