我在哪里编写 kafka 连接接收器自定义分区器的代码?

Posted

技术标签:

【中文标题】我在哪里编写 kafka 连接接收器自定义分区器的代码?【英文标题】:Where do I write the code for kafka connect sink custom partitioner? 【发布时间】:2021-01-27 16:38:58 【问题描述】:

这可能是一个非常简单的问题,所以我会提前道歉。 我正在为一个 kafka 主题添加一个 s3 sink 连接器,conf 文件在这里:


  "schemas.enable": "false",
  "name": "my-s3-sink",
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.storage.StringConverter",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true",
  "topics": [
    "my-topic-name"
  ],
  "errors.deadletterqueue.context.headers.enable": "true",
  "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
  "flush.size": "2000",
  "rotate.schedule.interval.ms": "600000",
  "s3.bucket.name": "my-bucket-name",
  "s3.object.tagging": "true",
  "s3.region": "region",
  "s3.part.size": "5242880",
  "aws.access.key.id": "****",
  "aws.secret.access.key": "****",
  "s3.ssea.name": "AES256",
  "s3.compression.type": "gzip",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "topics.dir": "",
  "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
  "partition.duration.ms": "3600000",
  "path.format": "YYYY/MM/dd/HH",
  "locale": "en-GB",
  "timezone": "UTC"

这会以topic_name/YYYY/MM/DD/HH/message 格式输出消息,我希望密钥为YYYY/MM/DD/HH/message。经过一些研究,我发现为了从键中删除主题名称,我必须编写一个自定义分区器来扩展和覆盖 TimeBasedPartitioner 的部分内容。 (这里是一个例子https://github.com/confluentinc/kafka-connect-storage-cloud/issues/321)

我的问题是我现在不知道在哪里为该分区程序编写实际代码,它应该放在哪里?基于时间的分区器似乎链接到 confluent 拥有的某种注册表,但是自定义分区器会去哪里以及如何在连接器的 conf 文件中引用该代码?

【问题讨论】:

【参考方案1】:

您在单独的项目中编写代码,将其编译为 JAR,然后将其放在每个连接工作程序的类路径中。

那你可以参考partitioner.class

【讨论】:

以上是关于我在哪里编写 kafka 连接接收器自定义分区器的代码?的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Connect S3 sink 连接器与自定义 Partitioner 奇怪行为

具有自定义消费者组名称的 Kafka Sink 连接器

Apache Kafka 的自定义连接器

使用自定义目标接收器将日志导出到 BigQuery(表分区)

Java-API+Kafka实现自定义分区

SMT 通过连接器配置创建 kafka 连接器字符串分区键