没有模式注册表的 Kafka-connect

Posted

技术标签:

【中文标题】没有模式注册表的 Kafka-connect【英文标题】:Kafka-connect without schema registry 【发布时间】:2021-04-13 08:23:33 【问题描述】:

我有一个 kafka 主题,我想为它提供 AVRO 数据(目前为 JSON)。我知道“正确”的做法是使用模式注册表,但出于测试目的,我想让它在没有它的情况下工作。

所以我将 AVRO 数据发送为Array[Byte],而不是常规的 Json 对象:

    val writer = new SpecificDatumWriter[GenericData.Record]("mySchema.avsc")
    val out = new ByteArrayOutputStream
    val encoder = EncoderFactory.get.binaryEncoder(out, null)
    writer.write(myAvroData, encoder)
    encoder.flush
    out.close
    out.toByteArray

架构在每个数据中展开;我怎样才能使它与kafka-connect一起工作? kafka-connect配置目前表现出以下属性(数据以json.gz文件写入s3),我想写Parquet文件:



  "name": "someName",
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "tasks.max": "120",
  "topics": "user_sync",
  "s3.region": "someRegion",
  "s3.bucket.name": "someBucket",
  "s3.part.size": "5242880",
  "s3.compression.type": "gzip",
  "filename.offset.zero.pad.width": "20",
  "flush.size": "5000",
  "rotate.interval.ms": "600000",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": "false",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
  "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
  "path.format": "YYYY/MM/dd/HH",
  "timezone" : "UTC",
  "locale": "en",
  "partition.duration.ms": "600000",
  "timestamp.extractor": "RecordField",
  "timestamp.field" : "ts",
  "schema.compatibility": "NONE"

我想我需要更改"format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat?但够了吗?

非常感谢!

【问题讨论】:

【参考方案1】:

JsonConverter 将无法使用 Avro 编码数据,因为二进制格式包含注册表中的模式 ID,需要在转换器确定数据的外观之前提取该模式 ID

您需要使用registryless-avro-converter,它将创建一个结构化对象,然后应该能够转换为 Parquet 记录。

【讨论】:

感谢您的回答!这是唯一的方法吗?架构已嵌入数据中。 是的,这个转换器假定“写入器模式”在数据中,但 Avro 仍然需要“读取器模式”来反序列化 非常感谢!如果我使用你的库,我唯一要做的就是将“value.converter”从“org.apache.kafka.connect.json.JsonConverter”切换到“me.frmr.kafka.connect.RegistrylessAvroConverter”?如您所见,我没有 key.converter 字段。非常感谢! 更改它并添加指向连接工作程序文件路径上的架构文件的属性。如果您没有为连接器指定密钥转换器,则密钥转换器默认为工作器属性中的那个

以上是关于没有模式注册表的 Kafka-connect的主要内容,如果未能解决你的问题,请参考以下文章

使用本地 kafka-connect 集群连接远程数据库的连接超时

无法在启用 SSL 的 Kafka 集群中注册 Debezium (Kafka-Connect) 连接器

使用独立模式 Kafka-connect 将 Postgresql 的数据捕获更改为 kafka 主题

有没有办法将融合模式注册表与 kafka-node 模块一起使用?

Kafka-Connect:在分布式模式下创建新连接器就是创建新组

Kafka 连接到 Bigquery 连接器而没有模式注册表给出错误