工作中关于kafka的一个奇葩问题

Posted 掘源

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了工作中关于kafka的一个奇葩问题相关的知识,希望对你有一定的参考价值。

kafka的一个奇葩问题

相信大部分人都使用过kafka吧!

我主导的业务场景中,通常根据业务类型分为两大类:
1、 系统消息(不是很频繁的那种)
2、 业务消息(吞吐大)

至少为何这么分, 我相信有编程洁癖的程序不会问, 对, 我就是!

层次化, 我司是公用一个的kafka集群(100多台), 消息满天飞, 我想不用解释为什么了。

进入主题:

那么问题来了, 在共用消息主题的时候, 通常都会设置key来区别不同的消息体。

生产者指定key–>hash(key)%/ numPartitions=分区

附上product代码:

 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }

但是! 但是~ 奇怪了~~
conusmer 却没有根据key获取分区的代码,反正我没找到,有找到的朋友告诉我····

至于作者为何这么设计, 个人觉得:
1、 为何用字符串key(为了业务中见名知意)
2、 还是跟业务场景有关, 那就是hash 冲突的问题,从作者代码的角度, 作者是希望我们指定具体的分区数
2.1, 因为一个好的hash函数是要花功夫的
2.2, hash函数执行的时间复杂度,mq要尽可能的保持高吞吐量,要排除一切干扰因素(个人在业务中使用了hashmap的hash函数),还不如直接上具体分区数来得痛快, 因为具体的分区我事先是知道的

好了, 除了以上的解决办法, 还有:

1、 自定义分区策略(重在hash函数的设计,跟product保持统一), 分布式场景中很头痛, 不同服务由不同的部分维护, 需要上下打通。。。。哈哈哈~ 烦烦, 人家不听你的

实现这个接口就可以了:
org.apache.kafka.clients.producer.Partitioner

2、使用它自己默认的分区策略, 但是那个序列化工具又是很头痛。
附上consumer代码:

----
public BaseKafkaConsumer(String name, String topic, String configPath, String userPrincipal, List<String> keys) {
        super(name, true);
        this.topic = topic;
        this.subscribePartitions = new ArrayList<>(5);
        SecCryptUtil256.securityPrepareForKafka(userPrincipal, configPath);
        this.consumer = new KafkaConsumer<>(initConsumerProps(configPath));

        if (!keys.isEmpty()) {
            List<PartitionInfo> topicPartitions = this.consumer.partitionsFor(topic);
            if (null == topicPartitions || topicPartitions.isEmpty()) {
                log.warn("topic【{}】 partitions is empty, keys ={}", topic, keys.toArray());
                return;
            }
            int numPartitions = topicPartitions.size();
            StringSerializer stringSerializer = new StringSerializer();
            for (String key : keys) {
                byte[] keyBytes = stringSerializer.serialize(topic, key);
                // 跟product保持统一
                int Partition = Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
                this.subscribePartitions.add(new TopicPartition(topic, Partition));
            }
        }
    }

贴上自带的序列化工作, 性能垃圾的一批, 大家可以测试一下:

public class StringSerializer implements Serializer<String> {
    private String encoding = "UTF8";

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("serializer.encoding");
        if (encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    @Override
    public byte[] serialize(String topic, String data) {
        try {
            if (data == null)
                return null;
            else
                return data.getBytes(encoding);//看看这是什么鬼, 在大数据环境整洁给零分。
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
        }
    }

    @Override
    public void close() {
        // nothing to do
    }
}

好! 今天的分享到这里, 下期再见~~

以上是关于工作中关于kafka的一个奇葩问题的主要内容,如果未能解决你的问题,请参考以下文章

FlinkFLink 写入kafka 中关于 Exactly-Once 的一些思考

如何从另一个活动或底部工作表对话框/片段中关闭一个活动

kafka中关于主题的命令

Unity中关于AnimationEvent.Time的问题

带你整理面试过程中关于消息队列(RabbitMQ/RocketMQ/Kafka)的相关知识点

应用程序在主要活动中关闭onBackPressed