没有模式注册表的 Kafka-connect
Posted
技术标签:
【中文标题】没有模式注册表的 Kafka-connect【英文标题】:Kafka-connect without schema registry 【发布时间】:2021-04-13 08:23:33 【问题描述】:我有一个 kafka 主题,我想为它提供 AVRO 数据(目前为 JSON)。我知道“正确”的做法是使用模式注册表,但出于测试目的,我想让它在没有它的情况下工作。
所以我将 AVRO 数据发送为Array[Byte]
,而不是常规的 Json 对象:
val writer = new SpecificDatumWriter[GenericData.Record]("mySchema.avsc")
val out = new ByteArrayOutputStream
val encoder = EncoderFactory.get.binaryEncoder(out, null)
writer.write(myAvroData, encoder)
encoder.flush
out.close
out.toByteArray
架构在每个数据中展开;我怎样才能使它与kafka-connect一起工作? kafka-connect配置目前表现出以下属性(数据以json.gz文件写入s3),我想写Parquet文件:
"name": "someName",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "120",
"topics": "user_sync",
"s3.region": "someRegion",
"s3.bucket.name": "someBucket",
"s3.part.size": "5242880",
"s3.compression.type": "gzip",
"filename.offset.zero.pad.width": "20",
"flush.size": "5000",
"rotate.interval.ms": "600000",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "YYYY/MM/dd/HH",
"timezone" : "UTC",
"locale": "en",
"partition.duration.ms": "600000",
"timestamp.extractor": "RecordField",
"timestamp.field" : "ts",
"schema.compatibility": "NONE"
我想我需要更改"format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat
?但够了吗?
非常感谢!
【问题讨论】:
【参考方案1】:JsonConverter
将无法使用 Avro 编码数据,因为二进制格式包含注册表中的模式 ID,需要在转换器确定数据的外观之前提取该模式 ID
您需要使用registryless-avro-converter,它将创建一个结构化对象,然后应该能够转换为 Parquet 记录。
【讨论】:
感谢您的回答!这是唯一的方法吗?架构已嵌入数据中。 是的,这个转换器假定“写入器模式”在数据中,但 Avro 仍然需要“读取器模式”来反序列化 非常感谢!如果我使用你的库,我唯一要做的就是将“value.converter”从“org.apache.kafka.connect.json.JsonConverter”切换到“me.frmr.kafka.connect.RegistrylessAvroConverter”?如您所见,我没有 key.converter 字段。非常感谢! 更改它并添加指向连接工作程序文件路径上的架构文件的属性。如果您没有为连接器指定密钥转换器,则密钥转换器默认为工作器属性中的那个以上是关于没有模式注册表的 Kafka-connect的主要内容,如果未能解决你的问题,请参考以下文章
使用本地 kafka-connect 集群连接远程数据库的连接超时
无法在启用 SSL 的 Kafka 集群中注册 Debezium (Kafka-Connect) 连接器
使用独立模式 Kafka-connect 将 Postgresql 的数据捕获更改为 kafka 主题
有没有办法将融合模式注册表与 kafka-node 模块一起使用?