Apache Kafka:正好在0.10版本中
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Kafka:正好在0.10版本中相关的知识,希望对你有一定的参考价值。
为了实现Kafka消费者对消息的一次性处理,我一次只提交一条消息,如下所示
public void commitOneRecordConsumer(long seconds) {
KafkaConsumer<String, String> consumer = consumerConfigFactory.getConsumerConfig();
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);
try {
for (ConsumerRecord<String, String> record : records) {
processingService.process(record);
consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(),record.partition()), new OffsetAndMetadata(record.offset() + 1)));
System.out.println("Committed Offset" + ": " + record.offset());
}
} catch (CommitFailedException e) {
// application specific failure handling
}
}
} finally {
consumer.close();
}
}
上面的代码将消息的处理异步委托给下面的另一个类。
@Service
public class ProcessingService {
@Async
public void process(ConsumerRecord<String, String> record) throws InterruptedException {
Thread.sleep(5000L);
Map<String, Object> map = new HashMap<>();
map.put("partition", record.partition());
map.put("offset", record.offset());
map.put("value", record.value());
System.out.println("Processed" + ": " + map);
}
}
但是,这仍然不能保证一次性交付,因为如果处理失败,它可能仍然提交其他消息,并且以前的消息永远不会被处理和提交,这里有什么选择?
0.10.2及更早版本的原始答案(0.11及更高版本见答案)
目前,Kafka无法提供开箱即用的一次性处理。如果在成功处理消息后提交消息,则可以进行至少一次处理,或者如果在开始处理之前直接在poll()
之后提交消息,则可以进行最多一次处理。
(另见http://docs.confluent.io/3.0.0/clients/consumer.html#synchronous-commits的“交付保证”一节)
但是,如果您的处理是幂等的,那么至少一次保证“足够好”,即即使您处理两次记录,最终结果也是相同的。幂等处理的示例是将消息添加到键值存储。即使您添加相同的记录两次,第二个插入将只替换第一个当前键值对,KV存储仍将包含正确的数据。
在上面的示例代码中,您更新了
HashMap
,这将是幂等操作。即使您在失败的情况下可能具有不一致的状态,例如在崩溃之前仅执行两次put
调用。但是,这种不一致状态将在再次处理同一记录时得到修复。对
println()
的调用并不是幂等的,因为这是一个具有“副作用”的操作。但我想打印仅用于调试目的。
作为替代方案,您需要在用户代码中实现事务语义,这需要在发生故障时“撤消”(部分执行)操作。一般来说,这是一个难题。
Apache Kafka 0.11+的更新(对于0.11之前的版本,请参阅上面的答案)
从0.11开始,Apache Kafka支持使用Kafka Streams的幂等生成器,事务生成器和完全一次处理。它还为消费者添加了"read_committed"
模式,以便只读取已提交的消息(以及删除/过滤中止的消息)。
- https://kafka.apache.org/documentation/#semantics
- https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
- https://www.confluent.io/blog/transactions-apache-kafka/
- https://www.confluent.io/blog/enabling-exactly-kafka-streams/
Apache Kafka 0.11.0.0刚刚发布,它现在只支持一次交付。
http://kafka.apache.org/documentation/#upgrade_11_exactly_once_semantics
我认为一旦使用kafka 0.10.x本身就可以实现处理。但有一些问题。我从this书中分享了高层次的想法。相关内容可以在第4章:Seek and Exactly Once Processing
中的Kafka Consumers - Reading Data from Kafka
部分找到。您可以使用(免费)safaribooksonline帐户查看该图书的内容,或者在帐单结束后购买,或者从其他来源获取,我们不会谈论这些内容。
理念:
想想这个常见场景:您的应用程序从Kafka读取事件,处理数据,然后将结果存储在数据库中。假设我们真的不想丢失任何数据,也不想将相同的结果存储在数据库中两次。
如果有一种方法可以在一个原子动作中存储记录和偏移量,那么它是可行的。记录和偏移都已提交,或者都没有提交。为此,我们需要在一个事务中将记录和偏移量写入数据库。然后我们就知道要么我们完成了记录而且提交了偏移量,要么我们没有,并且记录将被重新处理。
现在唯一的问题是:如果记录存储在数据库而不是Kafka中,我们的消费者在分配分区时如何知道从哪里开始阅读?这正是seek()
可以用来做的。当消费者启动或分配新分区时,它可以查找数据库中的偏移量和seek()
到该位置。
书中的示例代码:
public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
commitDBTransaction();
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for(TopicPartition partition: partitions)
consumer.seek(partition, getOffsetFromDB(partition));
}
}
consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
consumer.poll(0);
for (TopicPartition partition: consumer.assignment())
consumer.seek(partition, getOffsetFromDB(partition));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
processRecord(record);
storeRecordInDB(record);
storeOffsetInDB(record.topic(), record.partition(), record.offset());
}
commitDBTransaction();
}
以上是关于Apache Kafka:正好在0.10版本中的主要内容,如果未能解决你的问题,请参考以下文章