kafka consumer重新连接后如何获取当前最新数据
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka consumer重新连接后如何获取当前最新数据相关的知识,希望对你有一定的参考价值。
参考技术A 不过要注意一些注意事项,对于多个partition和多个consumer1. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数
2. 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀
最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目
3. 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
4. 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化
5. High-level接口中获取不到数据的时候是会block的
简单版,
简单的坑,如果测试流程是,先produce一些数据,然后再用consumer读的话,记得加上第一句设置
因为初始的offset默认是非法的,然后这个设置的意思是,当offset非法时,如何修正offset,默认是largest,即最新,所以不加这个配置,你是读不到你之前produce的数据的,而且这个时候你再加上smallest配置也没用了,因为此时offset是合法的,不会再被修正了,需要手工或用工具改重置offset
Properties props = new Properties();
props.put("auto.offset.reset", "smallest"); //必须要加,如果要读旧数据
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "pv");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
ConsumerConfig conf = new ConsumerConfig(props);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(conf);
String topic = "page_visits";
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
KafkaStream<byte[], byte[]> stream = streams.get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext())
System.out.println("message: " + new String(it.next().message()));
if (consumer != null) consumer.shutdown(); //其实执行不到,因为上面的hasNext会block
在用high-level的consumer时,两个给力的工具,
1. bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group pv
可以看到当前group offset的状况,比如这里看pv的状况,3个partition
Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 21 21 0 none
pv page_visits 1 19 19 0 none
pv page_visits 2 20 20 0 none
关键就是offset,logSize和Lag
这里以前读完了,所以offset=logSize,并且Lag=0
2. bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest config/consumer.properties page_visits
3个参数,
[earliest | latest],表示将offset置到哪里
consumer.properties ,这里是配置文件的路径
topic,topic名,这里是page_visits
我们对上面的pv group执行完这个操作后,再去check group offset状况,结果如下,
Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 0 21 21 none
pv page_visits 1 0 19 19 none
pv page_visits 2 0 20 20 none
可以看到offset已经被清0,Lag=logSize
底下给出原文中多线程consumer的完整代码
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConsumerGroupExample
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic)
consumer = kafka.consumer.Consumer.createJavaConsumerConnector( // 创建Connector,注意下面对conf的配置
createConsumerConfig(a_zookeeper, a_groupId));
this.topic = a_topic;
public void shutdown()
if (consumer != null) consumer.shutdown();
if (executor != null) executor.shutdown();
public void run(int a_numThreads) // 创建并发的consumers
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads)); // 描述读取哪个topic,需要几个线程读
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); // 创建Streams
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // 每个线程对应于一个KafkaStream
// now launch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams)
executor.submit(new ConsumerTest(stream, threadNumber)); // 启动consumer thread
threadNumber++;
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId)
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
public static void main(String[] args)
String zooKeeper = args[0];
String groupId = args[1];
String topic = args[2];
int threads = Integer.parseInt(args[3]);
ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
example.run(threads);
try
Thread.sleep(10000);
catch (InterruptedException ie)
example.shutdown();
SimpleConsumer
另一种是SimpleConsumer,名字起的,以为是简单的接口,其实是low-level consumer,更复杂的接口
参考,
什么时候用这个接口?
Read a message multiple times
Consume only a subset of the partitions in a topic in a process
Manage transactions to make sure a message is processed once and only once
当然用这个接口是有代价的,即partition,broker,offset对你不再透明,需要自己去管理这些,并且还要handle broker leader的切换,很麻烦
所以不是一定要用,最好别用
You must keep track of the offsets in your application to know where you left off consuming.
You must figure out which Broker is the lead Broker for a topic and partition
You must handle Broker leader changes
使用SimpleConsumer的步骤:
Find an active Broker and find out which Broker is the leader for your topic and partition
Determine who the replica Brokers are for your topic and partition
Build the request defining what data you are interested in
Fetch the data
Identify and recover from leader changes
首先,你必须知道读哪个topic的哪个partition
然后,找到负责该partition的broker leader,从而找到存有该partition副本的那个broker
再者,自己去写request并fetch数据
最终,还要注意需要识别和处理broker leader的改变本回答被提问者采纳
Kafka Consumer 获取键值对
【中文标题】Kafka Consumer 获取键值对【英文标题】:Kafka Consumer get key value pair 【发布时间】:2017-05-14 23:45:50 【问题描述】:我目前正在使用 Kafka 和 Flink,我在本地 PC 上运行了 kafka,并且我创建了一个正在使用的主题。
桌面\kafka\bin\windows>kafka-console-consumer.bat --zookeeper localhost:2181 -topic test
但它只是检索消息,
有没有办法获得有关消息的更多详细信息?让我们说时间?钥匙?我检查了 kafka 文档,但没有找到有关此主题的内容
【问题讨论】:
【参考方案1】:使用开箱即用的控制台消费者(我使用的是 Kafka 0.9.0.1),您只能使用不同格式打印消息的键和值。要打印密钥,请设置属性print.key=true
。
还有另一个属性key.separator
,默认情况下是“\t”(一个选项卡),您也可以将其更改为您想要的任何内容。
要设置这些属性,您可以创建一个配置文件并使用--consumer.config <config file>
或使用--property key=value
传递属性。
您也可以实现自己的格式化程序并将其与--formatter
选项一起使用,但您仍然只有键和值,因为这是 MessageFormatter 特征提供的(参见下面的 writeTo)。
trait MessageFormatter
def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)
def init(props: Properties)
def close()
例如:
./bin/kafka-console-consumer.sh --new-consumer --bootstrap-server kafka-1:9092 --topic topic1 --property print.key=true --property key.separator="-" --from-beginning
key-p1
key-p2
key-p3
null-4
null-8
null-0
【讨论】:
我不想提出一个新问题,话虽如此,我认为 Kafka 应该提供一种方法来为每条消息或至少为每个主题指定生产者的密钥。你知道我可以用什么吗? $ ./bin/kafka-console-producer.sh --broker-list kafka-1:9092 --topic topic1 --property parse.key=true --property key.separator=" -" 关键-价值 为我解决了问题..【参考方案2】:使用以下命令:
kafka-console-consumer --bootstrap-server localhost:9092 --topic topic_name \
--from-beginning --formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true --property print.value=true \
--property key.deserialzer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
【讨论】:
Windows 用户注意 => 对于 cmd 将 \ 替换为 ^以上是关于kafka consumer重新连接后如何获取当前最新数据的主要内容,如果未能解决你的问题,请参考以下文章
Kafka 如何从 __consumer_offsets 主题中读取