将主题数据从 Java 生产者下沉到 Mongodb

Posted

技术标签:

【中文标题】将主题数据从 Java 生产者下沉到 Mongodb【英文标题】:Sinking topic data from Java producer to Mongodb 【发布时间】:2020-12-05 07:32:17 【问题描述】:

我使用 java 生成数据并将其下沉到 Kafka 主题之后,我希望这些数据下沉到 MongoDB。 当我通过 JAVA 将数据作为 JSON 发送时,由于此错误,它不会存储到 MongoDB 中。

[2020-08-15 18:42:19,164] ERROR WorkerSinkTaskid=Kafka_ops-0 Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: JSON reader was expecting a value but found 'siqdj'. (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.bson.json.JsonParseException: JSON reader was expecting a value but found 'siqdj'.
        at org.bson.json.JsonReader.readBsonType(JsonReader.java:270)
        at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:680)
        at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:722)
        at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:450)
        at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:81)
        at org.bson.BsonDocument.parse(BsonDocument.java:62)
        at com.mongodb.kafka.connect.sink.converter.JsonRawStringRecordConverter.convert(JsonRawStringRecordConverter.java:34)
        at com.mongodb.kafka.connect.sink.converter.SinkConverter.convert(SinkConverter.java:44)
        at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$buildWriteModel$6(MongoSinkTask.java:229)
        at java.util.ArrayList.forEach(Unknown Source)
        at com.mongodb.kafka.connect.sink.MongoSinkTask.buildWriteModel(MongoSinkTask.java:228)
        at com.mongodb.kafka.connect.sink.MongoSinkTask.processSinkRecords(MongoSinkTask.java:169)
        at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$2(MongoSinkTask.java:117)
        at java.util.ArrayList.forEach(Unknown Source)
        at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$3(MongoSinkTask.java:116)
        at java.util.HashMap.forEach(Unknown Source)
        at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:114)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.util.concurrent.FutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
[2020-08-15 18:42:19,166] ERROR WorkerSinkTaskid=Kafka_ops-0 Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:588)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.util.concurrent.FutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
Caused by: org.bson.json.JsonParseException: JSON reader was expecting a value but found 'siqdj'.
        at org.bson.json.JsonReader.readBsonType(JsonReader.java:270)
        at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:680)
        at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:722)
        at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:450)
        at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:81)
        at org.bson.BsonDocument.parse(BsonDocument.java:62)
        at com.mongodb.kafka.connect.sink.converter.JsonRawStringRecordConverter.convert(JsonRawStringRecordConverter.java:34)
        at com.mongodb.kafka.connect.sink.converter.SinkConverter.convert(SinkConverter.java:44)
        at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$buildWriteModel$6(MongoSinkTask.java:229)
        at java.util.ArrayList.forEach(Unknown Source)
        at com.mongodb.kafka.connect.sink.MongoSinkTask.buildWriteModel(MongoSinkTask.java:228)
        at com.mongodb.kafka.connect.sink.MongoSinkTask.processSinkRecords(MongoSinkTask.java:169)
        at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$2(MongoSinkTask.java:117)
        at java.util.ArrayList.forEach(Unknown Source)
        at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$3(MongoSinkTask.java:116)
        at java.util.HashMap.forEach(Unknown Source)
        at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:114)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
        ... 10 more

这是我通过 Kafka 消费者中的 java 程序发送的数据。

"name":"This is a test","dept":"siqdj","studentId":1
"name":"This is another","dept":"siqdj","studentId":2

每一行代表一条记录

这是我的配置文件

connect-standalone.properties

bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
plugin.path=/plugins

MongoSinkConnector.properties

name=Kafka_ops
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
topics=TestTopic4
connection.uri=mongodb://mongo1:27017,mongo2:27017,mongo3:27017
database=student_kafka
collection=students
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

【问题讨论】:

请显示来自kafka-console-consumer --from-beginning的输出 【参考方案1】:

Tariq - 我不是这个主题的专家。但我已经尝试过使用带有 Oracle 数据库的 JDBC 接收器适配器进行类似的操作。

您发送到该主题的数据格式对我来说似乎不正确。因此,您可能会收到错误消息。由于您使用的是 JsonConverter,因此主题中的每一行都应采用以下格式,以便接收器适配器解析和写入数据存储。 目前,您的数据在有效负载中没有架构。因此出现错误。

请将下面的内容传递给主题,看看它是否下沉到MongoDB。


    "schema": 
        "type": "struct",
        "fields": [
            
                "type": "string",
                "optional": false,
                "field": "name"
            ,
            
                "type": "string",
                "optional": true,
                "field": "dept"
            ,
            
                "type": "int64",
                "optional": true,
                "field": "studentId"
            
        ],
        "optional": false,
        "name": "YOUR_TABLE_NAME"
    ,
    "payload": 
        "name": "This is a test",
        "dept": "siqdj",
        "studentId": 1
    

【讨论】:

以上是关于将主题数据从 Java 生产者下沉到 Mongodb的主要内容,如果未能解决你的问题,请参考以下文章

从入门到真香!java截取字符串前两位

将JMS消息从Java EE应用程序发送到Java SE应用程序

Java程序创建Kafka Topic,以及数据生产消费,常用的命令

使用flume抓取tomcat的日志文件下沉到kafka消费

Kafka Connect:如何使用 hdfs sink 连接器将 Kafka 主题的 protobuf 数据发送到 HDFS?

确保已使用 REST 代理从 Kafka 主题读取所有消息