Kafka Connect S3 sink 连接器与自定义 Partitioner 奇怪行为
Posted
技术标签:
【中文标题】Kafka Connect S3 sink 连接器与自定义 Partitioner 奇怪行为【英文标题】:Kafka Connect S3 sink connector with custom Partitioner strange behavior 【发布时间】:2020-10-29 22:26:01 【问题描述】:我计划使用自定义字段和基于时间的分区器在 s3 中对我的数据进行分区,如下所示:
/part_
我的分区器工作正常,我的 S3 存储桶中的一切都符合预期。
The problem is linked to the performance of the sink 我的输入主题中有 400kB/s/broker = ~1.2MB/s,接收器处理峰值并提交少量记录。
如果我使用经典的 TimeBasedPartitioner,enter image description here
所以我的问题似乎出在我的自定义分区器上。代码如下:
package test;
import ...;
public final class FieldAndTimeBasedPartitioner<T> extends TimeBasedPartitioner<T>
private static final Logger log = LoggerFactory.getLogger(FieldAndTimeBasedPartitioner.class);
private static final String FIELD_SUFFIX = "part_";
private static final String FIELD_SEP = "=";
private long partitionDurationMs;
private DateTimeFormatter formatter;
private TimestampExtractor timestampExtractor;
private PartitionFieldExtractor partitionFieldExtractor;
protected void init(long partitionDurationMs, String pathFormat, Locale locale, DateTimeZone timeZone, Map<String, Object> config)
this.delim = (String)config.get("directory.delim");
this.partitionDurationMs = partitionDurationMs;
try
this.formatter = getDateTimeFormatter(pathFormat, timeZone).withLocale(locale);
this.timestampExtractor = this.newTimestampExtractor((String)config.get("timestamp.extractor"));
this.timestampExtractor.configure(config);
this.partitionFieldExtractor = new PartitionFieldExtractor((String)config.get("partition.field"));
catch (IllegalArgumentException e)
ConfigException ce = new ConfigException("path.format", pathFormat, e.getMessage());
ce.initCause(e);
throw ce;
private static DateTimeFormatter getDateTimeFormatter(String str, DateTimeZone timeZone)
return DateTimeFormat.forPattern(str).withZone(timeZone);
public static long getPartition(long timeGranularityMs, long timestamp, DateTimeZone timeZone)
long adjustedTimestamp = timeZone.convertUTCToLocal(timestamp);
long partitionedTime = adjustedTimestamp / timeGranularityMs * timeGranularityMs;
return timeZone.convertLocalToUTC(partitionedTime, false);
public String encodePartition(SinkRecord sinkRecord, long nowInMillis)
final Long timestamp = this.timestampExtractor.extract(sinkRecord, nowInMillis);
final String partitionField = this.partitionFieldExtractor.extract(sinkRecord);
return this.encodedPartitionForFieldAndTime(sinkRecord, timestamp, partitionField);
public String encodePartition(SinkRecord sinkRecord)
final Long timestamp = this.timestampExtractor.extract(sinkRecord);
final String partitionFieldValue = this.partitionFieldExtractor.extract(sinkRecord);
return encodedPartitionForFieldAndTime(sinkRecord, timestamp, partitionFieldValue);
private String encodedPartitionForFieldAndTime(SinkRecord sinkRecord, Long timestamp, String partitionField)
if (timestamp == null)
String msg = "Unable to determine timestamp using timestamp.extractor " + this.timestampExtractor.getClass().getName() + " for record: " + sinkRecord;
log.error(msg);
throw new ConnectException(msg);
else if (partitionField == null)
String msg = "Unable to determine partition field using partition.field '" + partitionField + "' for record: " + sinkRecord;
log.error(msg);
throw new ConnectException(msg);
else
DateTime recordTime = new DateTime(getPartition(this.partitionDurationMs, timestamp.longValue(), this.formatter.getZone()));
return this.FIELD_SUFFIX
+ config.get("partition.field")
+ this.FIELD_SEP
+ partitionField
+ this.delim
+ recordTime.toString(this.formatter);
static class PartitionFieldExtractor
private final String fieldName;
PartitionFieldExtractor(String fieldName)
this.fieldName = fieldName;
String extract(ConnectRecord<?> record)
Object value = record.value();
if (value instanceof Struct)
Struct struct = (Struct)value;
return (String) struct.get(fieldName);
else
FieldAndTimeBasedPartitioner.log.error("Value is not of Struct !");
throw new PartitionException("Error encoding partition.");
public long getPartitionDurationMs()
return partitionDurationMs;
public TimestampExtractor getTimestampExtractor()
return timestampExtractor;
它或多或少是 FieldPartitioner 和 TimeBasedPartitioner 的合并。
关于为什么我在接收消息时表现不佳的任何线索? 使用记录中的字段进行分区时,反序列化并从消息中提取数据会导致此问题? 由于我有大约 80 个不同的字段值,这可能是内存问题,因为它会在堆中维护 80 倍的缓冲区吗?
感谢您的帮助。
【问题讨论】:
先编辑消息说“你好”不起作用,我不知道为什么......所以你好! 已经有一个开放的 PR,其中包含基于 FieldAndTime 的分区器的测试用例,仅供参考 【参考方案1】:仅供参考,问题出在分区程序本身。我的分区程序需要解码整个消息并获取信息。 由于我有很多消息,处理所有这些事件需要时间。
【讨论】:
以上是关于Kafka Connect S3 sink 连接器与自定义 Partitioner 奇怪行为的主要内容,如果未能解决你的问题,请参考以下文章
Kafka Connect S3 Sink Flush 数据 - 奇怪的延迟
Kafka-Connect Cassandra Sink 连接器不将数据推送到 Cassandra