kafka s3融合连接器 - 将json上传为字符串

Posted

技术标签:

【中文标题】kafka s3融合连接器 - 将json上传为字符串【英文标题】:kafka s3 confluent connector - upload json as string 【发布时间】:2021-09-14 07:00:04 【问题描述】:

我正在使用来自 confluent 的 kafka s3 sink 连接器将 json 发送到 s3。

来自 kafka 的每个条目都是一个有效的 json,但是当我在 s3 json 文件中获取数据时,我的字符串看起来像这样:

"\"one\":\"test\"\n\n\"two\":\"none\""

这是我当前的配置:

name=s3-sink
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1000
rotate.schedule.interval.ms=60000
topics=MYTOPIC
flush.size=1
s3.bucket.name=BUCKETNAME
s3.region=us-east-1
aws.access.key.id=MYSUPERSECRETID
aws.secret.access.key=MYSUPERSECRETSECRET
s3.part.size=5242880
#value.converter=org.apache.kafka.connect.storage.StringConverter
#value.converter=org.apache.kafka.connect.json.JsonConverter
format.class=io.confluent.connect.s3.format.json.JsonFormat
#key.converter=org.apache.kafka.connect.storage.StringConverter
#keys.format.class=io.confluent.connect.s3.format.json.JsonFormat
storage.class=io.confluent.connect.s3.storage.S3Storage
schema.compatibility=NONE
timezone=UTC
locale=en-US
partitioner.class=io.confluent.connect.storage.partitioner.TimeBasedPartitioner
partition.duration.ms=86400000
path.format=YYYY/MM/dd/

我目前的问题是我不能将这些 json 用于其他类似的处理。

配置中的注释部分是options,试图改变json中的结果字符串无济于事。

【问题讨论】:

【参考方案1】:

format.class 描述了连接器如何将数据序列化到 S3 中。看起来您的 JSON 正在被序列化为 JSON 对象。

如果您想要主题中数据的精确副本,请使用

format.class=io.confluent.connect.s3.format.bytearray.ByteArrayFormat
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

【讨论】:

很好,我现在得到了一个有效的 json,你知道它是否可以生成 json 文件来代替 bin 文件吗? @night-gold 该扩展仅由format.class 控制。如果你想要.json 文件,你会使用JsonFormat

以上是关于kafka s3融合连接器 - 将json上传为字符串的主要内容,如果未能解决你的问题,请参考以下文章

gsutil 无法验证 Kafka Connect S3 上传的文件的哈希值

如何将 Apache Kafka 与 Amazon S3 连接?

Kafka Connect:读取 JSON 序列化的 Kafka 消息,转换为 Parquet 格式并保存在 S3 中

Kafka HDFS 连接器 - 没有完全融合

kafka s3 sink连接器在获取NULL数据时崩溃

在 Confluent S3 Kafka 连接器中压缩 Avro 数据