kafka基操
Posted ggblog
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka基操相关的知识,希望对你有一定的参考价值。
本文略长,ctrl+f
kafka:0.10.1
confuent:3.1.2
7、通过时间戳查询指定分区的offset
使用的方法(offsetsForTimes()):
Properties props = new Properties(); KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props); java.util.Map<TopicPartition,OffsetAndTimestamp> partitionTimestampOffsets = consumer.offsetsForTimes(java.util.Map<TopicPartition,java.lang.Long>)
long offset = partitionTimestampOffsets.get(partiton0).offset()
通过时间戳查询指定分区的offsets,前后两个时间戳就是指定的时间段,所有分区相加就是指定的主题。所以可以通过时间戳查询指定分区的offsets方法来查询指定时间段内指定主题的偏移量。结果可以用来核对生产或者同步的消息数量。
完整代码如下:
public static void main(String[] args){ String bootstrapServers = args[0]; String topic = args[1]; String groupId = args[2]; String startTime = args[3]; String endTime = args[4]; getOffsetsForTimes(bootstrapServers,topic,groupId,startTime,endTime); }
public static void getOffsetsForTimes(String bootstrapServers,String topic,String groupId,String startTime,String endTime){ SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); long start = 0; long end = 0; try{ start = sdf.parse(startTime).getTime(); end = sdf.parse(endTime).getTime(); } catch (ParseException e) { e.printStackTrace(); } Properties props = new Properties(); props.put("bootstrap.servers",bootstrapServers); props.put("group.id",groupId); props.put("enable.auto.commit","false"); props.put("auto.commit.interval.ms","1000"); props.put("session.timeout.ms","30000"); props.put("auto.offset.reset","earliest"); props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props); TopicPartition topicPartition0 = new TopicPartition(topic,0); TopicPartition topicPartition1 = new TopicPartition(tipic,1); Map<TopicPartition,Long> startMap = new HashMap<>(); startMap.put(topicPartition0,start); startMap.put(topicPartition1,start); Map<TopicPartition,OffsetAndTimestamp> startOffsetMap = consumer.offsetsForTimes(startMap); Map<TopicPartition,Long> endMap = new HashMap<>(); endMap.put(topicPartition0,end); endMap.put(topicPartition1,end); Map<TopicPartition,OffsetAndTimestamp> endOffsetMap = consumer.offsetsForTimes(endMap); long partition0StartOffset = 0; if(startOffsetMap.get(topicPartition0) != null){ partition0StartOffset = startOffsetMap.get(topicPartition0).offset(); } long partition0EndOffset = 0; if(endOffsetMap.get(topicPartition0) != null){ partition0EndOffset = endOffsetMap.get(topicPartition0).offset(); }else{ if(topicPartition0StartOffset > 0){ topicPartition0EndOffset = consumer.endOffsets(Arrays.asList(topicPartition0)).get(topicPartition0); } } long partition1StartOffset = 0; if(startOffsetMap.get(topicPartition1) != null){ partition1StartOffset = startOffsetMap.get(topicPartition1).offset(); } long partition1EndOffset = 0; if(endOffsetMap.get(topicPartition1) != null){ partition1EndOffset = endOffsetMap.get(topicPartition1).offset(); }else{ if(topicPartition1StartOffset > 0){ topicPartition1EndOffset = consumer.endOffsets(Arrays.asList(topicPartition1)).get(topicPartition1); } } long total = (topicPartition0EndOffset -topicPartition0StartOffset)+(topicPartition1EndOffset -topicPartition1StartOffset ); System.out.println(topic+" offsets:"+total); }
生成jar包在linux中执行(jar包名:offsetForTime.jar)
java -jar offsetForTime.jar kafka1:9094 testTopic group1 201801050000000 20180106000000
6、新建主题指定副本分布
A、kafka自己决定副本分布
bin/kafka-topics --create --topic test001 --zookeeper localhost:2181 --replication-factor 3 --partition 3 //新建分区3,副本3的主题。 //kafka决定副本分布的方式给第一个分区随机选择,然后向右一次递增 //例如: //partition:0 0 4 1 //partition:1 1 0 2 //partition:2 2 1 3
B、认为指定副本分布
bin/kafka-topics --create --topic test002 --zookeeper localhost:2181 --replica-assignment 0:1:3,2:3:4,4:0:1 //分布结果 //partition:0 0 1 3 //partition:1 2 3 4 //partition:2 4 0 1
5、mirror-maker
A、概述图
Mirroring data between clusters (Source Cluster ---> Destination Cluster)
就是通过使用kafka自带的mirror-maker工具把一个集群里的主题cp到另外一个kafka集群中,使用场景:1、数据需要迁移到新集群; 2、多个集群之间的数据需要同步…………
B、使用
nohup bin/kafka-mirror-maker --consumer.config mirror-maker-consumer.properties --offset.commit.interval.ms 5000 --producer.config mirror-maker-producer.properties --whitelist ‘.*mirrormaker‘ &两个自建的配置文件。
bootstrap.servers=hostname1:9094,hostname2:9094(Source Cluster kafka地址) group.id=mirror-maker(自己取) auto.offset.reset=earliestII、mirror-maker-producer.properties
bootstrap.servers=hostname1:9094,hostname2:9094(Destination Cluster kafka地址,就是本集群的地址) acks=all retries=2147483647 request.timeout.ms=3600000解释:
--offset.commit.interval.ms 5000 消费的offset提交时间间隔5秒
--whitelist ‘.*mirrormaker‘ 需要mirrormaker的白名单,可使用正则表达式,也可以直接是某一个具体的主题,对应的是黑名单,--blacklist
4、删除主题和只删除主题数据
A、删除主题
delete.topic.enable=true删除主题命令
/bin/kafka-topics --delete --topic test --zookeeper localhost:2181
bin/kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name test --alter --add-config retention.ms=3000 //修改保留时间为三秒,但不是修改后三秒就马上删掉,kafka是采用轮训的方式,轮训到这个主题发现三秒前的数据都是删掉。时间由自己在server.properties里面设置,设置见下面。
数据删除后,继续使用主题,那主题数据的保留时间就不可能为三秒,所以把上面修改的配置删掉,采用server.properties里面统一的配置。
bin/kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name test --alter --delete-config retention.msserver.properties里面数据保留时间的配置
log.retention.hours=168 //保留时间,单位小时 log.retention.check.interval.ms=300000 //保留时间检查间隔,单位毫秒
3、mirror-maker权限问题和白名单
A、权限问题
export JAAS_CONFIG="-Djava.security.auth.login.config=/home/kafka/.version/kafka_client_jaas.conf"II、/home/kafka/.version/kafka_client_jaas.conf 配置文件
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="test" password="test" };III、授予生产和消费权限
//消费权限 /home/kafka/confluent/bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --consumer --topic testTopic --group ‘*‘ //生产权限 /home/kafka/confluent/bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --producer --topic testTopic
备注:以上三个针对的是testTopic这个主题,test这个用户。当然还可以针对消费组,--group ‘*‘ 任意了,可以指定特定的消费组。因为auto.create.topics.enable=true。默认为true。所以可以对用户test授予所有主题的生产权限,那样就不用先在目标集群里面创建主题然后在收取生产权限。但是存在test用户权限范围过大的问题。
/bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --producer --topic ‘*‘
nohup bin/kafka-mirror-maker --consumer.config mirror-maker-consumer.properties --num.streams 10 --offset.commit.interval.ms 5000 --producer.config mirror-maker-producer.properties --whitelist .*mirrormaker >mirror.log 2>&1 &--whitelist ‘testTopic‘
I、同步以mirrormaker结尾的主题
--whitelist ‘.*mirrormaker‘II、同步两个选择项的,一个是mirrormaker结尾的,一个是以kafka结尾的并且当中包含mirror的
--whitelist ‘.*mirrormaker|.*mirror.*kafka‘III、同步两个选择项的,一个是mirrormaker结尾的,一个是以kafka结尾的但是中间不包含mirror的
--whitelist ‘.*mirrormaker|^(?!.*mirror).*kafka‘
2、kafka扩容和分区重新分配
扩容:增加机器,例如原来三台服务器的kafka集群增加两台机器成为有五台机器的kafka集群,跟搭建差不多
分区重新分配:在原来机器上的主题分区不会自动均衡到新的机器,需要使用分区重新分配工具来均衡均衡
重新分配官方文档地址:点击打开链接
翻译官方文档中文地址:点击打开链接
上面两个链接中的文档描述的很详细。这里记录一下关键步骤,更主要是总结实际操作过来之后的问题和新的知识理解
主要步骤
A、确定要重启分配分区的主题,新建topics-to-move.json json文件
{ "topics": [ {"topic": "foo1"}, {"topic": "foo2"} ], "version":1 } // foo1 foo2 为要重新分配的主题
B、使用 bin/kafka-reassign-partitions.sh重新分配工具生成分配规则的json语句分配到 5,6机器
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" –generate
C、有分配规则的json语句输出到控制台,复制到新建的json文件expand-cluster-reassignment.json中,例如:
{"version":1, "partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo1","partition":1,"replicas":[5,6]}, {"topic":"foo1","partition":2,"replicas":[5,6]}, {"topic":"foo2","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[5,6]}, {"topic":"foo2","partition":2,"replicas":[5,6]}] } //描述分配之后分区的分布情况
D、执行命令,开始分区重新分配
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json –execute
E、验证是否完成
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json –verify //当输出全部都是completed successfully表明移动已经完成.
操作中三个小技巧:
I、可以不需要第一步和第二步,自己手动新建分配的json文件。
II、主题量很多的是就不要一个一个复制粘贴了,用excel的拼接函数,还是很方便
III、最后一步验证中,主题很多的时候,会有很多在为未完成的输出语句夹杂其中。在语句后面加上 | grep -c "progress"就知 道有多少分区还没完成,输出为0的时候就是完成了。
总结和知识点理解
I、kafka新建主题时的分区分配策略:随机选取第一个分区节点,然后往后依次增加。例如第一个分区选取为1,第二个分区就 是2,第三个分区就是3. 1,2,3是brokerid。不会负载均衡,所以要手动重新分配分区操作,尽量均衡。
II、在生产的同时进行数据迁移会出现重复数据。所以迁移的时候避免重复生产数据,应该停止迁移主题的生产。同时消费不 会,同时消费之后出现短暂的leader报错,会自动恢复。
III、新增了broker节点,如果有主题的分区在新增加的节点上,生产和消费的客户端都应该在hosts配置文件中增加新增的 broker节点,否则无法生产消费,但是也不报错。
1、数据异常生产处理
问题现象:
bin/kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name topicName --alter --add-config message.timestamp.type=LogAppendTime //我使用的是confluent 使用纯kafka对应更改意思:修改这个主题的消息写入是的时间戳采用系统的当前时间。时间戳不之后就正常删除了
以上是关于kafka基操的主要内容,如果未能解决你的问题,请参考以下文章