在 Kafka 连接器中设置分区策略

Posted

技术标签:

【中文标题】在 Kafka 连接器中设置分区策略【英文标题】:Setting Partition Strategy in a Kafka Connector 【发布时间】:2017-12-02 06:35:35 【问题描述】:

我正在使用自定义 Kafka 连接器(用 Java 编写,使用 Kafka Connect 的 Java API)从外部源提取数据并存储在主题中。我需要设置自定义分区策略。我知道通过设置partitioner.class property 可以在 Kafka Producer 中设置自定义partitioner。但是,此属性似乎对 Kafka 连接器没有任何作用。如何配置 Kafka Connect(我正在使用 connect-standalone 脚本运行我的连接器)以使用我编写的自定义 Partitioner

【问题讨论】:

【参考方案1】:

源连接器可以通过SourceRecordpartition 字段控制每个源记录写入的分区。如果这是您自己的连接器,这是最直接的。

但是,如果您想更改源连接器对每条记录进行分区的方式,您可以使用覆盖源记录的 partition 字段的单消息转换 (SMT)。您可能必须通过实现org.apache.kafka.connect.transforms.Transformation 并使用您自己的分区逻辑来编写自定义 SMT,但这实际上比编写自定义 Kafka 分区器要容易一些。

例如,这里有一个名义上的自定义转换,它展示了如何使用配置属性以及如何使用所需的分区号创建一个新的SourceRecord 实例。该示例不完整,因为它实际上没有任何真正的分区逻辑,但它应该是一个很好的起点。

包 io.acme.example; 导入 org.apache.kafka.common.config.AbstractConfig; 导入 org.apache.kafka.common.config.ConfigDef; 导入 org.apache.kafka.common.config.ConfigDef.Importance; 导入 org.apache.kafka.common.config.ConfigDef.Type; 导入 org.apache.kafka.connect.source.SourceRecord; 导入 org.apache.kafka.connect.transforms.Transformation; 导入 java.util.Map; 公共类 CustomPartitioner 实现转换 私有静态最终字符串 MAX_PARTITIONS_CONFIG = "max.partitions"; private static final String MAX_PARTITIONS_DOC = "最大分区数"; 私有静态最终 int MAX_PARTITIONS_DEFAULT = 1; /** * 配置的定义。我们在这里只定义一个配置属性, * 但您可以将多个“定义”方法链接在一起。复杂的配置可能需要 * 将所有与配置相关的东西拉到一个扩展 @link AbstractConfig 的单独类中 * 并添加辅助方法(例如,“getMaxPartitions()”),您将使用此类来解析 * @link #configure(Map) 中的参数而不是 @link AbstractConfig。 */ private static final ConfigDef CONFIG_DEF = new ConfigDef().define(MAX_PARTITIONS_CONFIG, Type.INT, MAX_PARTITIONS_DEFAULT, Importance.HIGH, MAX_PARTITIONS_DOC); 私有 int 最大分区; @覆盖 公共无效配置(映射配置) // 将任何配置参数存储为字段 ... AbstractConfig config = new AbstractConfig(CONFIG_DEF, configs); maxPartitions = config.getInt(MAX_PARTITIONS_CONFIG); @覆盖 公共 SourceRecord 申请(SourceRecord 记录) // 在此处计算所需的分区 int actualPartition = record.kafkaPartition(); intdesiredPartition = ... // 然后使用除新分区之外的所有现有字段创建新记录... 返回 record.newRecord(record.topic(), desiredPartition, record.keySchema(),record.key(), 记录值模式(),记录值(), 记录.timestamp()); @覆盖 公共配置定义配置() 返回 CONFIG_DEF; @覆盖 公共无效关闭() // 没做什么

ConfigDefAbstractConfig 功能非常有用,可以做更多有趣的事情,包括使用自定义验证器和推荐器,以及具有依赖于其他属性的配置属性。如果您想了解更多信息,请查看一些现有的 Kafka Connect 连接器,它们也使用相同的框架。

最后一件事。在运行 Kafka Connect 独立或分布式工作程序时,请确保将 CLASSPATH 环境变量设置为指向包含您的自定义 SMT 以及您的 SMT 所依赖的任何 JAR 文件的 JAR 文件除了那些由 Kafka 提供的文件。 connect-standalone.shconnect-distributed.sh 命令会自动将 Kafka JAR 添加到类路径中。

【讨论】:

以上是关于在 Kafka 连接器中设置分区策略的主要内容,如果未能解决你的问题,请参考以下文章

如何在不使用 Docker 或 Windows Server 2016 上的 Confluent 平台的情况下在 Kafka 中设置 Debezium SQL Server 连接器?

如何解决配置中设置的 Kafka JDBC Sink 连接器中与 TopicRecordName 与 TopicNameStrategy 的冲突

SMT 通过连接器配置创建 kafka 连接器字符串分区键

我在哪里编写 kafka 连接接收器自定义分区器的代码?

Kafka连接s3 sink多个分区

如何在 groovy sql 中设置连接超时?