工作中关于kafka的一个奇葩问题
Posted 掘源
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了工作中关于kafka的一个奇葩问题相关的知识,希望对你有一定的参考价值。
kafka的一个奇葩问题
相信大部分人都使用过kafka吧!
我主导的业务场景中,通常根据业务类型分为两大类:
1、 系统消息(不是很频繁的那种)
2、 业务消息(吞吐大)
至少为何这么分, 我相信有编程洁癖的程序不会问, 对, 我就是!
层次化, 我司是公用一个的kafka集群(100多台), 消息满天飞, 我想不用解释为什么了。
进入主题:
那么问题来了, 在共用消息主题的时候, 通常都会设置key来区别不同的消息体。
生产者指定key–>hash(key)%/ numPartitions=分区
附上product代码:
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
但是! 但是~ 奇怪了~~
conusmer 却没有根据key获取分区的代码,反正我没找到,有找到的朋友告诉我····
至于作者为何这么设计, 个人觉得:
1、 为何用字符串key(为了业务中见名知意)
2、 还是跟业务场景有关, 那就是hash 冲突的问题,从作者代码的角度, 作者是希望我们指定具体的分区数
2.1, 因为一个好的hash函数是要花功夫的
2.2, hash函数执行的时间复杂度,mq要尽可能的保持高吞吐量,要排除一切干扰因素(个人在业务中使用了hashmap的hash函数),还不如直接上具体分区数来得痛快, 因为具体的分区我事先是知道的
好了, 除了以上的解决办法, 还有:
1、 自定义分区策略(重在hash函数的设计,跟product保持统一), 分布式场景中很头痛, 不同服务由不同的部分维护, 需要上下打通。。。。哈哈哈~ 烦烦, 人家不听你的
实现这个接口就可以了:
org.apache.kafka.clients.producer.Partitioner
2、使用它自己默认的分区策略, 但是那个序列化工具又是很头痛。
附上consumer代码:
----
public BaseKafkaConsumer(String name, String topic, String configPath, String userPrincipal, List<String> keys) {
super(name, true);
this.topic = topic;
this.subscribePartitions = new ArrayList<>(5);
SecCryptUtil256.securityPrepareForKafka(userPrincipal, configPath);
this.consumer = new KafkaConsumer<>(initConsumerProps(configPath));
if (!keys.isEmpty()) {
List<PartitionInfo> topicPartitions = this.consumer.partitionsFor(topic);
if (null == topicPartitions || topicPartitions.isEmpty()) {
log.warn("topic【{}】 partitions is empty, keys ={}", topic, keys.toArray());
return;
}
int numPartitions = topicPartitions.size();
StringSerializer stringSerializer = new StringSerializer();
for (String key : keys) {
byte[] keyBytes = stringSerializer.serialize(topic, key);
// 跟product保持统一
int Partition = Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
this.subscribePartitions.add(new TopicPartition(topic, Partition));
}
}
}
贴上自带的序列化工作, 性能垃圾的一批, 大家可以测试一下:
public class StringSerializer implements Serializer<String> {
private String encoding = "UTF8";
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("serializer.encoding");
if (encodingValue instanceof String)
encoding = (String) encodingValue;
}
@Override
public byte[] serialize(String topic, String data) {
try {
if (data == null)
return null;
else
return data.getBytes(encoding);//看看这是什么鬼, 在大数据环境整洁给零分。
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
}
}
@Override
public void close() {
// nothing to do
}
}
好! 今天的分享到这里, 下期再见~~
以上是关于工作中关于kafka的一个奇葩问题的主要内容,如果未能解决你的问题,请参考以下文章
FlinkFLink 写入kafka 中关于 Exactly-Once 的一些思考
Unity中关于AnimationEvent.Time的问题