kafka connect到底会不会重写/丢失数据

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka connect到底会不会重写/丢失数据相关的知识,希望对你有一定的参考价值。

参考技术A 要注意些注意事项于partitionconsumer
1. consumer比partition浪费kafka设计partition允许并发所consumer数要于partition数
2. consumer比partition少consumer应于partitions主要合理配consumer数partition数否则导致partition面数据取均匀
partiton数目consumer数目整数倍所partition数目重要比取24容易设定consumer数目
3. consumerpartition读数据保证数据间顺序性kafka保证partition数据序partition根据读顺序同
4. 增减consumerbrokerpartition导致rebalance所rebalanceconsumer应partition发变化
5. High-level接口获取数据候block
简单版
简单坑测试流程先produce些数据再用consumer读记加第句设置
初始offset默认非设置意思offset非何修offset默认largest即新所加配置读前produce数据且候再加smallest配置没用offset合再修需要手工或用工具改重置offset

Properties props = new Properties();
props.put("auto.offset.reset", "smallest"); //必须要加要读旧数据
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "pv");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");

ConsumerConfig conf = new ConsumerConfig(props);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(conf);
String topic = "page_visits";
Map topicCountMap = new HashMap();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream> streams = consumerMap.get(topic);

KafkaStream stream = streams.get(0);
ConsumerIterator it = stream.iterator();
while (it.hasNext())
System.out.println("message: " + new String(it.next().message()));


if (consumer != null) consumer.shutdown(); //其实执行面hasNextblock

用high-levelconsumer两给力工具
1. bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group pv
看前group offset状况比看pv状况3partition
Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 21 21 0 none
pv page_visits 1 19 19 0 none
pv page_visits 2 20 20 0 none
关键offsetlogSizeLag
前读完所offset=logSize并且Lag=0
2. bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest config/consumer.properties page_visits
3参数
[earliest | latest]表示offset置哪
consumer.properties 配置文件路径
topictopic名page_visits
我面pv group执行完操作再check group offset状况结
Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 0 21 21 none
pv page_visits 1 0 19 19 none
pv page_visits 2 0 20 20 none
看offset已经清0Lag=logSize

底给原文线程consumer完整代码

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConsumerGroupExample
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;

public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic)
consumer = kafka.consumer.Consumer.createJavaConsumerConnector( // 创建Connector注意面conf配置
createConsumerConfig(a_zookeeper, a_groupId));
this.topic = a_topic;


public void shutdown()
if (consumer != null) consumer.shutdown();
if (executor != null) executor.shutdown();


public void run(int a_numThreads) // 创建并发consumers
Map topicCountMap = new HashMap();
topicCountMap.put(topic, new Integer(a_numThreads)); // 描述读取哪topic需要几线程读
Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap); // 创建Streams
List<KafkaStream> streams = consumerMap.get(topic); // 每线程应于KafkaStream

// now launch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);

// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams)
executor.submit(new ConsumerTest(stream, threadNumber)); // 启consumer thread
threadNumber++;



private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId)
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");

return new ConsumerConfig(props);


public static void main(String[] args)
String zooKeeper = args[0];
String groupId = args[1];
String topic = args[2];
int threads = Integer.parseInt(args[3]);

ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
example.run(threads);

try
Thread.sleep(10000);
catch (InterruptedException ie)


example.shutdown();



SimpleConsumer
另种SimpleConsumer名字起简单接口其实low-level consumer更复杂接口
参考
候用接口?
Read a message multiple times
Consume only a subset of the partitions in a topic in a process
Manage transactions to make sure a message is processed once and only once

用接口代价即partition,broker,offset再透明需要自管理些并且要handle broker leader切换麻烦
所定要用别用
You must keep track of the offsets in your application to know where you left off consuming.
You must figure out which Broker is the lead Broker for a topic and partition
You must handle Broker leader changes
使用SimpleConsumer步骤:
Find an active Broker and find out which Broker is the leader for your topic and partition
Determine who the replica Brokers are for your topic and partition
Build the request defining what data you are interested in
Fetch the data
Identify and recover from leader changes
首先必须知道读哪topic哪partition
找负责该partitionbroker leader找存该partition副本broker
再者自写request并fetch数据
终要注意需要识别处理broker leader改变

使 Kafka 主题日志保留永久化

【中文标题】使 Kafka 主题日志保留永久化【英文标题】:Make Kafka Topic Log Retention Permanent 【发布时间】:2017-02-05 16:44:51 【问题描述】:

我正在将日志消息写入 Kafka 主题,并且我希望永久保留该主题。我在 Kafka 和 Kafka Connect(_schemas、connect-configs、connect-status、connect-offsets 等)中看到了日志保留时间不会删除的特殊主题。我如何强制一个主题与这些其他特殊主题一样?是命名约定还是其他一些属性?

谢谢

【问题讨论】:

【参考方案1】:

如果您想永久保留所有主题,可以将log.retention.hourslog.retention.bytes 都设置为-1。

【讨论】:

似乎在 2015 年 4 月之前执行此操作的方法是将 log.retention.hours 设置为 2147483647。然后JIRA 添加了 -1 功能。然而,JIRA 没有明确说明什么值被解释为“永远保留”。我稍微挖掘了一下代码,确实是-1。你也可以为hours, minutes or ms设置-1 除了占用太多空间之外,还有什么理由不应该这样做?除此之外,是否存在可扩展性或速度问题? 值得注意的是,限制是每个分区的空间,每个分区被限制为一个磁盘的大小。间接地,这可能会影响可扩展性,但除此之外我认为没有任何影响。 您确定 log.retention.hours 有效吗?上下文:***.com/a/70535714/4106031【参考方案2】:

这些特殊主题是压缩主题。这意味着它们由键控消息组成,并且只保留列表最近的键。完整的写here。在大多数情况下,这可能是您想要的无限保留时间。

【讨论】:

如何只保留一个主题的数据(永远保留数据)? log.retention.bytes 和 log.retention.ms 适用于所有主题,但不仅仅适用于单个主题。【参考方案3】:

您可以在此处找到参数的默认值: https://jaceklaskowski.gitbooks.io/apache-kafka/content/kafka-properties.html

log.retention.byteslog.retention.ms 如果您想永久保留主题中的数据,可以将其设置为 -1。

【讨论】:

如何只保留一个主题的数据(永远保留数据)? log.retention.bytes 和 log.retention.ms 适用于所有主题,但不适用于任何单个主题。 log.retention.hours=-1 不起作用吗?

以上是关于kafka connect到底会不会重写/丢失数据的主要内容,如果未能解决你的问题,请参考以下文章

Kafka消息中间件到底会不会丢消息

刨根问底,Kafka消息中间件到底会不会丢消息

不看损失大了,刨根问底,Kafka消息中间件到底会不会丢消息

消息队列(kafka/nsq 等)与任务队列(celery)到底有啥不同

Kafka Connect:没有为连接器创建任务

Kafka Connect 不适用于主题策略