kafka基操

Posted ggblog

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka基操相关的知识,希望对你有一定的参考价值。

本文略长,ctrl+f

kafka:0.10.1

confuent:3.1.2

 

7、通过时间戳查询指定分区的offset

使用的方法(offsetsForTimes()):

 

Properties props = new Properties();
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
java.util.Map<TopicPartition,OffsetAndTimestamp> partitionTimestampOffsets = consumer.offsetsForTimes(java.util.Map<TopicPartition,java.lang.Long>)
long offset = partitionTimestampOffsets.get(partiton0).offset()

 

通过时间戳查询指定分区的offsets,前后两个时间戳就是指定的时间段,所有分区相加就是指定的主题。所以可以通过时间戳查询指定分区的offsets方法来查询指定时间段内指定主题的偏移量。结果可以用来核对生产或者同步的消息数量。

 

 完整代码如下:

 

public static void main(String[] args){
  String bootstrapServers = args[0];
  String topic = args[1];
  String groupId = args[2];
  String startTime = args[3];
  String endTime = args[4];
  getOffsetsForTimes(bootstrapServers,topic,groupId,startTime,endTime);
}

 

public static void getOffsetsForTimes(String bootstrapServers,String topic,String groupId,String startTime,String endTime){
  SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
  long start = 0;
  long end = 0;
  try{
    start = sdf.parse(startTime).getTime();
    end = sdf.parse(endTime).getTime();
  } catch (ParseException e) {
    e.printStackTrace();
  }
  
  Properties props = new Properties();
  props.put("bootstrap.servers",bootstrapServers);
  props.put("group.id",groupId);
  props.put("enable.auto.commit","false");
  props.put("auto.commit.interval.ms","1000");
  props.put("session.timeout.ms","30000");
  props.put("auto.offset.reset","earliest");
  props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
  KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
  TopicPartition topicPartition0 = new TopicPartition(topic,0);
  TopicPartition topicPartition1 = new TopicPartition(tipic,1);
  Map<TopicPartition,Long> startMap = new HashMap<>();
  startMap.put(topicPartition0,start);
  startMap.put(topicPartition1,start);
  Map<TopicPartition,OffsetAndTimestamp> startOffsetMap = consumer.offsetsForTimes(startMap);
  Map<TopicPartition,Long> endMap = new HashMap<>();
  endMap.put(topicPartition0,end);
  endMap.put(topicPartition1,end);
  Map<TopicPartition,OffsetAndTimestamp> endOffsetMap = consumer.offsetsForTimes(endMap);
  
  long partition0StartOffset = 0;
  if(startOffsetMap.get(topicPartition0) != null){
    partition0StartOffset = startOffsetMap.get(topicPartition0).offset();
  }
  long partition0EndOffset = 0;
  if(endOffsetMap.get(topicPartition0) != null){
    partition0EndOffset = endOffsetMap.get(topicPartition0).offset();
  }else{
    if(topicPartition0StartOffset > 0){
      topicPartition0EndOffset = consumer.endOffsets(Arrays.asList(topicPartition0)).get(topicPartition0);
    }
  }
  long partition1StartOffset = 0;
  if(startOffsetMap.get(topicPartition1) != null){
    partition1StartOffset = startOffsetMap.get(topicPartition1).offset();
  }
  long partition1EndOffset = 0;
  if(endOffsetMap.get(topicPartition1) != null){
    partition1EndOffset = endOffsetMap.get(topicPartition1).offset();
  }else{
    if(topicPartition1StartOffset > 0){
      topicPartition1EndOffset = consumer.endOffsets(Arrays.asList(topicPartition1)).get(topicPartition1);
    }
  }
  long total = (topicPartition0EndOffset -topicPartition0StartOffset)+(topicPartition1EndOffset -topicPartition1StartOffset );
  System.out.println(topic+" offsets:"+total);

}

 

 

生成jar包在linux中执行(jar包名:offsetForTime.jar)

java -jar offsetForTime.jar kafka1:9094 testTopic group1 201801050000000 20180106000000

6、新建主题指定副本分布

A、kafka自己决定副本分布

bin/kafka-topics --create --topic test001 --zookeeper localhost:2181 --replication-factor 3 --partition 3
//新建分区3,副本3的主题。
//kafka决定副本分布的方式给第一个分区随机选择,然后向右一次递增
//例如:
//partition:0     0 4 1
//partition:1     1 0 2
//partition:2     2 1 3

B、认为指定副本分布

bin/kafka-topics --create --topic test002 --zookeeper localhost:2181 --replica-assignment 0:1:3,2:3:4,4:0:1
//分布结果
//partition:0     0 1 3
//partition:1     2 3 4
//partition:2     4 0 1

5、mirror-maker

A、概述图

技术分享图片

Mirroring data between clusters (Source Cluster  --->  Destination Cluster)

就是通过使用kafka自带的mirror-maker工具把一个集群里的主题cp到另外一个kafka集群中,使用场景:1、数据需要迁移到新集群; 2、多个集群之间的数据需要同步…………

 

B、使用

nohup bin/kafka-mirror-maker --consumer.config  mirror-maker-consumer.properties  --offset.commit.interval.ms 5000 --producer.config  mirror-maker-producer.properties   --whitelist  ‘.*mirrormaker‘ &
两个自建的配置文件。
I、mirror-maker-consumer.properties
bootstrap.servers=hostname1:9094,hostname2:9094(Source Cluster kafka地址)
group.id=mirror-maker(自己取)
auto.offset.reset=earliest
II、mirror-maker-producer.properties
bootstrap.servers=hostname1:9094,hostname2:9094(Destination Cluster kafka地址,就是本集群的地址)
acks=all
retries=2147483647
request.timeout.ms=3600000
解释:

--offset.commit.interval.ms 5000    消费的offset提交时间间隔5秒
--whitelist  ‘.*mirrormaker‘    需要mirrormaker的白名单,可使用正则表达式,也可以直接是某一个具体的主题,对应的是黑名单,--blacklist
C、检测
粗略的检测:查看mirror主题的log文件更新时间
精确的检测:查看mirror主题的offsets,可以参考另一篇:http://blog.csdn.net/forrest_ou/article/details/78978575
 
D、理解
I、对于mirrormaker的过程:mirror-maker会先去Source Cluster消费,把消费得到的消息生产到Destination Cluster。所有会有命令中的两个配置文件,一个消费配置文件,一个生产配置文件。
II、关于是否需要在Destination  Cluster中创建与Source Cluster一样的主题。答案是可创可不创,创建了就用创建了的,没有会自动创建,自动创建结合配置auto.create.topics.enable=true,默认为true
III、当出现具体主题的mirror成功,使用正则的停滞。首先测试Source Cluster 有没有问题,从Source Cluster消费单个主题是否可行。如果两个都行的通,原因是正则包含的主题里面有主题是不能消费的,一直retry,mirror-maker停滞。
 

4、删除主题和只删除主题数据

A、删除主题

在server.properties中增加设置,默认未开启
delete.topic.enable=true
删除主题命令
/bin/kafka-topics --delete --topic test --zookeeper localhost:2181
 
B、只删除主题数据
如果想保留主题,只删除主题现有数据(log)。可以通过修改数据保留时间实现
bin/kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name test --alter --add-config retention.ms=3000
//修改保留时间为三秒,但不是修改后三秒就马上删掉,kafka是采用轮训的方式,轮训到这个主题发现三秒前的数据都是删掉。时间由自己在server.properties里面设置,设置见下面。

数据删除后,继续使用主题,那主题数据的保留时间就不可能为三秒,所以把上面修改的配置删掉,采用server.properties里面统一的配置。
 
bin/kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name test --alter --delete-config retention.ms
server.properties里面数据保留时间的配置
log.retention.hours=168  //保留时间,单位小时
log.retention.check.interval.ms=300000 //保留时间检查间隔,单位毫秒


 3、mirror-maker权限问题和白名单

A、权限问题

如果集群配置了权限,mirrormaker的成功运行需要有两个权限。
 
I、被mirror的源集群的消费权限
II、mirror目的地的目标集群的生产权限
 
这里正好对应了mirrormaker工作的整个过程,先消费过来,再生产进去。
 
以SASL为例:
I、修改kafka-mirror-maker脚本,增加如下:
export JAAS_CONFIG="-Djava.security.auth.login.config=/home/kafka/.version/kafka_client_jaas.conf" 
II、/home/kafka/.version/kafka_client_jaas.conf  配置文件
KafkaClient {
	org.apache.kafka.common.security.plain.PlainLoginModule required
	username="test"
	password="test"
};
III、授予生产和消费权限
//消费权限
/home/kafka/confluent/bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --consumer --topic testTopic --group ‘*‘
//生产权限
/home/kafka/confluent/bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --producer --topic testTopic


备注:以上三个针对的是testTopic这个主题,test这个用户。当然还可以针对消费组,--group ‘*‘ 任意了,可以指定特定的消费组。因为auto.create.topics.enable=true。默认为true。所以可以对用户test授予所有主题的生产权限,那样就不用先在目标集群里面创建主题然后在收取生产权限。但是存在test用户权限范围过大的问题。
授予所有主题的生产权限
/bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:test --producer --topic ‘*‘

B、白名单
mirrormaker启动命令
nohup bin/kafka-mirror-maker --consumer.config mirror-maker-consumer.properties --num.streams 10 --offset.commit.interval.ms 5000 --producer.config mirror-maker-producer.properties --whitelist .*mirrormaker   >mirror.log 2>&1 &	
--whitelist ‘testTopic‘
--whitelist ‘.*mirrormaker‘
白名单定义要mirror的是哪些主题。可以是单个的主题名,也可以是匹配多个主题的正则表达式。
这里记录一下正则表达式。因为与官网说的有出入
 

I、同步以mirrormaker结尾的主题
--whitelist ‘.*mirrormaker‘
II、同步两个选择项的,一个是mirrormaker结尾的,一个是以kafka结尾的并且当中包含mirror的
--whitelist ‘.*mirrormaker|.*mirror.*kafka‘
III、同步两个选择项的,一个是mirrormaker结尾的,一个是以kafka结尾的但是中间不包含mirror的
--whitelist ‘.*mirrormaker|^(?!.*mirror).*kafka‘
 
这里说与官网有出入的在有两个选择项的正则表达式中,官网中提倡用,代替 |  但是使用 | 可以成功,使用逗号只有第一个选择项生效。
可能不同的具体的生产环境不同,会有不同的要求。
但是应该是实践是第一原则。



2、kafka扩容和分区重新分配

扩容:增加机器,例如原来三台服务器的kafka集群增加两台机器成为有五台机器的kafka集群,跟搭建差不多

分区重新分配:在原来机器上的主题分区不会自动均衡到新的机器,需要使用分区重新分配工具来均衡均衡

 

重新分配官方文档地址:点击打开链接

翻译官方文档中文地址:点击打开链接

上面两个链接中的文档描述的很详细。这里记录一下关键步骤,更主要是总结实际操作过来之后的问题和新的知识理解

主要步骤

A、确定要重启分配分区的主题,新建topics-to-move.json   json文件

 

{
  "topics": [
    {"topic": "foo1"},
    {"topic": "foo2"}
  ],
  "version":1
}
// foo1 foo2 为要重新分配的主题

B、使用 bin/kafka-reassign-partitions.sh重新分配工具生成分配规则的json语句分配到         5,6机器

 

 

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6"  –generate

C、有分配规则的json语句输出到控制台,复制到新建的json文件expand-cluster-reassignment.json中,例如:

 

 

{"version":1,
  "partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
 		{"topic":"foo1","partition":1,"replicas":[5,6]},
                {"topic":"foo1","partition":2,"replicas":[5,6]},
                {"topic":"foo2","partition":0,"replicas":[5,6]},
                {"topic":"foo2","partition":1,"replicas":[5,6]},
                {"topic":"foo2","partition":2,"replicas":[5,6]}]
}

//描述分配之后分区的分布情况

D、执行命令,开始分区重新分配

 

 

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json –execute

E、验证是否完成

 

 

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json –verify
//当输出全部都是completed successfully表明移动已经完成.

 

 


操作中三个小技巧:

I、可以不需要第一步和第二步,自己手动新建分配的json文件。

II、主题量很多的是就不要一个一个复制粘贴了,用excel的拼接函数,还是很方便

III、最后一步验证中,主题很多的时候,会有很多在为未完成的输出语句夹杂其中。在语句后面加上  | grep -c "progress"就知         道有多少分区还没完成,输出为0的时候就是完成了。

 

总结和知识点理解

I、kafka新建主题时的分区分配策略:随机选取第一个分区节点,然后往后依次增加。例如第一个分区选取为1,第二个分区就       是2,第三个分区就是3.  1,2,3是brokerid。不会负载均衡,所以要手动重新分配分区操作,尽量均衡。

II、在生产的同时进行数据迁移会出现重复数据。所以迁移的时候避免重复生产数据,应该停止迁移主题的生产。同时消费不           会,同时消费之后出现短暂的leader报错,会自动恢复。

III、新增了broker节点,如果有主题的分区在新增加的节点上,生产和消费的客户端都应该在hosts配置文件中增加新增的               broker节点,否则无法生产消费,但是也不报错。




1、数据异常生产处理

问题现象:

A、监控观察一个主题数据量的变化,数据量会持续增加然后删除减少,然后再持续增加然后再删除减少,一直循环
 
B、观察日志:rolled......  scheduling......  deleting.......  也是这样对一个主题一直循环
 
原因解释:首先成功写入kafka了,只是写入后很快就被删除了。这里有几个原因。
 
A、kafka配置了log的保留时间很短或者保留大小很小,所以就会很快就被删除,如果这样,属于正常范围。
 
B、如果保留时间较长,大小较大,还是这样的情况,就是不正常的情况了,也不是我们希望的情况了。这种情况是因为消息的时间戳在log配置时间之外,即消息的最后写入时间错误,所以检查到了就会马上删除
 
下面主要是解决第二种不正常的情况
 
问题解决:
修改主题的配置
bin/kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name topicName --alter --add-config message.timestamp.type=LogAppendTime
//我使用的是confluent 使用纯kafka对应更改
意思:修改这个主题的消息写入是的时间戳采用系统的当前时间。时间戳不之后就正常删除了

























以上是关于kafka基操的主要内容,如果未能解决你的问题,请参考以下文章

MySQL系列:kafka停止命令

Mongodb3.6 基操命令——help有大用

配置 kafka 同步刷盘

基操勿 6 | Node.js 的异步I/O到底有多秀?

mysql基操

pandas-python入门基操