Kafka Connect S3 sink 连接器与自定义 Partitioner 奇怪行为

Posted

技术标签:

【中文标题】Kafka Connect S3 sink 连接器与自定义 Partitioner 奇怪行为【英文标题】:Kafka Connect S3 sink connector with custom Partitioner strange behavior 【发布时间】:2020-10-29 22:26:01 【问题描述】:

我计划使用自定义字段和基于时间的分区器在 s3 中对我的数据进行分区,如下所示: /part_=/part_date=YYYY-MM-dd/part_hour=HH/....parquet。

我的分区器工作正常,我的 S3 存储桶中的一切都符合预期。

The problem is linked to the performance of the sink 我的输入主题中有 400kB/s/broker = ~1.2MB/s,接收器处理峰值并提交少量记录。

如果我使用经典的 TimeBasedPartitioner,enter image description here

所以我的问题似乎出在我的自定义分区器上。代码如下:

package test;
import ...;

public final class FieldAndTimeBasedPartitioner<T> extends TimeBasedPartitioner<T> 

private static final Logger log = LoggerFactory.getLogger(FieldAndTimeBasedPartitioner.class);
private static final String FIELD_SUFFIX = "part_";
private static final String FIELD_SEP = "=";
private long partitionDurationMs;
private DateTimeFormatter formatter;
private TimestampExtractor timestampExtractor;
private PartitionFieldExtractor partitionFieldExtractor;

protected void init(long partitionDurationMs, String pathFormat, Locale locale, DateTimeZone timeZone, Map<String, Object> config) 

    this.delim = (String)config.get("directory.delim");
    this.partitionDurationMs = partitionDurationMs;

    try 
        this.formatter = getDateTimeFormatter(pathFormat, timeZone).withLocale(locale);
        this.timestampExtractor = this.newTimestampExtractor((String)config.get("timestamp.extractor"));
        this.timestampExtractor.configure(config);
        this.partitionFieldExtractor = new PartitionFieldExtractor((String)config.get("partition.field"));
     catch (IllegalArgumentException e) 
        ConfigException ce = new ConfigException("path.format", pathFormat, e.getMessage());
        ce.initCause(e);
        throw ce;
    


private static DateTimeFormatter getDateTimeFormatter(String str, DateTimeZone timeZone) 
    return DateTimeFormat.forPattern(str).withZone(timeZone);


public static long getPartition(long timeGranularityMs, long timestamp, DateTimeZone timeZone) 
    long adjustedTimestamp = timeZone.convertUTCToLocal(timestamp);
    long partitionedTime = adjustedTimestamp / timeGranularityMs * timeGranularityMs;
    return timeZone.convertLocalToUTC(partitionedTime, false);


public String encodePartition(SinkRecord sinkRecord, long nowInMillis) 
    final Long timestamp = this.timestampExtractor.extract(sinkRecord, nowInMillis);
    final String partitionField = this.partitionFieldExtractor.extract(sinkRecord);
    return this.encodedPartitionForFieldAndTime(sinkRecord, timestamp, partitionField);


public String encodePartition(SinkRecord sinkRecord) 
    final Long timestamp = this.timestampExtractor.extract(sinkRecord);
    final String partitionFieldValue = this.partitionFieldExtractor.extract(sinkRecord);
    return encodedPartitionForFieldAndTime(sinkRecord, timestamp, partitionFieldValue);


private String encodedPartitionForFieldAndTime(SinkRecord sinkRecord, Long timestamp, String partitionField) 

    if (timestamp == null) 
        String msg = "Unable to determine timestamp using timestamp.extractor " + this.timestampExtractor.getClass().getName() + " for record: " + sinkRecord;
        log.error(msg);
        throw new ConnectException(msg);
     else if (partitionField == null) 
        String msg = "Unable to determine partition field using partition.field '" + partitionField  + "' for record: " + sinkRecord;
        log.error(msg);
        throw new ConnectException(msg);
      else 
        DateTime recordTime = new DateTime(getPartition(this.partitionDurationMs, timestamp.longValue(), this.formatter.getZone()));
        return this.FIELD_SUFFIX
                + config.get("partition.field")
                + this.FIELD_SEP
                + partitionField
                + this.delim
                + recordTime.toString(this.formatter);
    


static class PartitionFieldExtractor 

    private final String fieldName;

    PartitionFieldExtractor(String fieldName) 
        this.fieldName = fieldName;
    

    String extract(ConnectRecord<?> record) 
        Object value = record.value();
        if (value instanceof Struct) 
            Struct struct = (Struct)value;
            return (String) struct.get(fieldName);
         else 
            FieldAndTimeBasedPartitioner.log.error("Value is not of Struct !");
            throw new PartitionException("Error encoding partition.");
        
    


public long getPartitionDurationMs() 
    return partitionDurationMs;


public TimestampExtractor getTimestampExtractor() 
    return timestampExtractor;


它或多或少是 FieldPartitioner 和 TimeBasedPartitioner 的合并。

关于为什么我在接收消息时表现不佳的任何线索? 使用记录中的字段进行分区时,反序列化并从消息中提取数据会导致此问题? 由于我有大约 80 个不同的字段值,这可能是内存问题,因为它会在堆中维护 80 倍的缓冲区吗?

感谢您的帮助。

【问题讨论】:

先编辑消息说“你好”不起作用,我不知道为什么......所以你好! 已经有一个开放的 PR,其中包含基于 FieldAndTime 的分区器的测试用例,仅供参考 【参考方案1】:

仅供参考,问题出在分区程序本身。我的分区程序需要解码整个消息并获取信息。 由于我有很多消息,处理所有这些事件需要时间。

【讨论】:

以上是关于Kafka Connect S3 sink 连接器与自定义 Partitioner 奇怪行为的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Connect S3 Sink Flush 数据 - 奇怪的延迟

Kafka连接s3 sink多个分区

kafka s3 sink连接器在获取NULL数据时崩溃

Kafka-Connect Cassandra Sink 连接器不将数据推送到 Cassandra

Debezium 消息与 kafka-connect sink 连接器期望的格式兼容

Confluent Kafka Connect HDFS Sink 连接器延迟