kafka系列 -- 多线程消费者实现

Posted stillcoolme

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka系列 -- 多线程消费者实现相关的知识,希望对你有一定的参考价值。

看了一个星期的kafka,然后写了消费Kafka数据的代码。
感觉自己还是很不合格。

  1. 不能随心所欲地操作数据,数据结构没学好,spark的RDD操作没学好。
  2. 不能很好地组织代码结构,设计模式没学好,面向对象思想理解不够。

消费程序特点:

  1. 用队列来存储要消费的数据。
  2. 用队列来存储要提交的offest,然后处理线程将其给回消费者提交。
  3. 每个分区开一个处理线程来处理数据,分区与处理器的映射放在map中。
  4. 当处理到一定的数量或者距离上一次处理一定的时间间隔后, 由poll线程进行提交offset。

不好的地方:

  1. 每次处理的数据太少,而且每个数据都进行判断其分区是否已经有处理线程在处理了。
  2. 获取topic不太优雅。

流程图

技术分享图片

下面是多线程消费者实现:

1. 管理程序

/**
 * 负责启动消费者线程MsgReceiver, 保存消费者线程MsgReceiver, 保存处理任务和线程RecordProcessor, 以及销毁这些线程
 * Created by stillcoolme on 2018/10/12.
 */
public class KafkaMultiProcessorMain {
    private static final Logger logger = LoggerFactory.getLogger(KafkaMultiProcessorMain.class);
    // 消费者参数
    private Properties consumerProps = new Properties();
    // kafka消费者参数
    Map<String, Object> consumerConfig;
    //存放topic的配置
    Map<String, Object> topicConfig;

    //订阅的topic
    private String alarmTopic;
    //消费者线程数组
    private Thread[] threads;

    //保存处理任务和线程的map
    ConcurrentHashMap<TopicPartition, RecordProcessor> recordProcessorTasks = new ConcurrentHashMap<>();
    ConcurrentHashMap<TopicPartition, Thread> recordProcessorThreads = new ConcurrentHashMap<>();

    public void setAlarmTopic(String alarmTopic) {
        this.alarmTopic = alarmTopic;
    }

    public static void main(String[] args) {
        KafkaMultiProcessorMain kafkaMultiProcessor = new KafkaMultiProcessorMain();
        //这样设置topic不够优雅啊!!!
        kafkaMultiProcessor.setAlarmTopic("picrecord");

        kafkaMultiProcessor.init(null);
    }

    private void init(String consumerPropPath) {
        getConsumerProps(consumerPropPath);
        consumerConfig = getConsumerConfig();

        int threadsNum = 3;
        logger.info("create " + threadsNum + " threads to consume kafka warn msg");
        threads = new Thread[threadsNum];
        for (int i = 0; i < threadsNum; i++) {
            MsgReceiver msgReceiver = new MsgReceiver(consumerConfig, alarmTopic, recordProcessorTasks, recordProcessorThreads);
            Thread thread = new Thread(msgReceiver);
            threads[i] = thread;
        }
        for (int i = 0; i < threadsNum; i++) {
            threads[i].start();
        }
        logger.info("finish creating" + threadsNum + " threads to consume kafka warn msg");
    }

    //销毁启动的线程
    public void destroy() {
        closeRecordProcessThreads();
        closeKafkaConsumer();
    }

    private void closeRecordProcessThreads() {
        logger.debug("start to interrupt record process threads");
        for (Map.Entry<TopicPartition, Thread> entry : recordProcessorThreads.entrySet()) {
            Thread thread = entry.getValue();
            thread.interrupt();
        }
        logger.debug("finish interrupting record process threads");
    }

    private void closeKafkaConsumer() {
        logger.debug("start to interrupt kafka consumer threads");
        //使用interrupt中断线程, 在线程的执行方法中已经设置了响应中断信号
        for (int i = 0; i < threads.length; i++) {
            threads[i].interrupt();
        }
        logger.debug("finish interrupting consumer threads");
    }

    private Map<String,Object> getConsumerConfig() {
        return ImmutableMap.<String, Object>builder()
                .put("bootstrap.servers", consumerProps.getProperty("bootstrap.servers"))
                .put("group.id", "group.id")
                .put("enable.auto.commit", "false")
                .put("session.timeout.ms", "30000")
                .put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
                .put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
                .put("max.poll.records", 1000)
                .build();

    }

    /**
     * 获取消费者参数
     *
     * @param proPath
     */
    private void getConsumerProps(String proPath) {
        InputStream inStream = null;
        try {
            if (StringUtils.isNotEmpty(proPath)) {
                inStream = new FileInputStream(proPath);
            } else {
                inStream = this.getClass().getClassLoader().getResourceAsStream("consumer.properties");
            }
            consumerProps.load(inStream);
        } catch (IOException e) {
            logger.error("读取consumer配置文件失败:" + e.getMessage(), e);
        } finally {
            if (null != inStream) {
                try {
                    inStream.close();
                } catch (IOException e) {
                    logger.error("读取consumer配置文件失败:" + e.getMessage(), e);
                }
            }
        }
    }
}

2. 消费者任务 MsgReceiver

/**
 * 负责调用 RecordProcessor进行数据处理
 * Created by zhangjianhua on 2018/10/12.
 */
public class MsgReceiver implements Runnable {

    private static final Logger logger = LoggerFactory.getLogger(MsgReceiver.class);

    private BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitQueue = new LinkedBlockingQueue<>();

    private ConcurrentHashMap<TopicPartition, Thread> recordProcessorThreads;
    private ConcurrentHashMap<TopicPartition, RecordProcessor> recordProcessorTasks;
    private String alarmTopic;
    private Map<String, Object> consumerConfig;


    public MsgReceiver(Map<String, Object> consumerConfig, String alarmTopic,
                       ConcurrentHashMap<TopicPartition, RecordProcessor> recordProcessorTasks,
                       ConcurrentHashMap<TopicPartition, Thread> recordProcessorThreads) {

        this.consumerConfig = consumerConfig;
        this.alarmTopic = alarmTopic;
        this.recordProcessorTasks = recordProcessorTasks;
        this.recordProcessorThreads = recordProcessorThreads;
    }

    @Override
    public void run() {
        //kafka Consumer是非线程安全的,所以需要每个线程建立一个consumer
        KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerConfig);
        kafkaConsumer.subscribe(Arrays.asList(alarmTopic));
        try{
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    //看commitQueue里面是非有需要提交的offest, 这样查看好频繁啊!!!
                    //查看该消费者是否有需要提交的偏移信息, 使用非阻塞读取
                    Map<TopicPartition, OffsetAndMetadata> offestToCommit = commitQueue.poll();
                    if (offestToCommit != null) {
                        logger.info(Thread.currentThread().getName() + "commit offset: " + offestToCommit);
                        kafkaConsumer.commitAsync();
                    }
                    //最多轮询1000ms
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
                    if (records.count() > 0) {
                        logger.info("poll records size: " + records.count());
                    }
                    for (ConsumerRecord record : records) {
                        String topic = record.topic();
                        int partition = record.partition();
                        TopicPartition topicPartition = new TopicPartition(topic, partition);
                        RecordProcessor processTask = recordProcessorTasks.get(topicPartition);
                        //每条消息都去检查
                        //如果当前分区还没有开始消费, 则就没有消费任务在map中
                        if (processTask == null) {
                            //生成新的处理任务和线程, 然后将其放入对应的map中进行保存
                            processTask = new RecordProcessor(commitQueue);
                            recordProcessorTasks.put(topicPartition, processTask);

                            Thread processTaskThread = new Thread(processTask);
                            processTaskThread.setName("Thread-for " + topicPartition.toString());
                            logger.info("start processor Thread: " + processTaskThread.getName());
                            processTaskThread.start();
                            recordProcessorThreads.put(topicPartition, processTaskThread);
                        }
                        //有 processor 可以处理该分区的 record了
                        processTask.addRecordToQueue(record);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    logger.warn("MsgReceiver exception " + e + " ignore it");
                }
            }
        } finally {
            kafkaConsumer.close();
        }
    }
}

3. 消息处理任务 RecordProcessor

public class RecordProcessor implements Runnable{

    private static Logger logger = LoggerFactory.getLogger(RecordProcessor.class);

    //保存MsgReceiver线程发送过来的消息
    private BlockingQueue<ConsumerRecord<String, String>> queue = new LinkedBlockingQueue<>();
    //用于向consumer线程提交消费偏移的队列
    private BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitQueue;
    //上一次提交时间
    private LocalDateTime lastTime = LocalDateTime.now();
    //消费了20条数据, 就进行一次提交
    private long commitLength = 20L;
    //距离上一次提交多久, 就提交一次
    private Duration commitTime = Duration.standardSeconds(2);
    //当前该线程消费的数据条数
    private int completeTask = 0;
    //保存上一条消费的数据
    private ConsumerRecord<String, String> lastUncommittedRecord;

    public RecordProcessor(BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitQueue) {
        this.commitQueue = commitQueue;
    }

    @Override
    public void run() {
        while(!Thread.interrupted()){
            ConsumerRecord<String, String> record = null;
            try {
                record = queue.poll(100, TimeUnit.MICROSECONDS);
                if (record != null) {
                    process(record);
                    //完成任务数加1
                    this.completeTask++;
                    //保存上一条处理记录
                    lastUncommittedRecord = record;
                }
                //提交偏移给queue中
                commitTOQueue();
            } catch (InterruptedException e) {
                //线程被interrupt,直接退出
                logger.info(Thread.currentThread() + "is interrupted");
            }

        }

    }

    //将当前的消费偏移量放到queue中, 由MsgReceiver进行提交
    private void commitTOQueue() {
        if(lastUncommittedRecord == null){
            return;
        }
        //如果消费了设定的条数, 比如又消费了commitLength消息
        boolean arrivedCommitLength = this.completeTask % commitLength == 0;
        //获取当前时间, 看是否已经到了需要提交的时间
        LocalDateTime currentTime = LocalDateTime.now();
        boolean arrivedTime = currentTime.isAfter(lastTime.plus(commitTime));

        if(arrivedCommitLength || arrivedTime){
            lastTime = currentTime;
            long offset = lastUncommittedRecord.offset();
            int partition = lastUncommittedRecord.partition();
            String topic = lastUncommittedRecord.topic();
            TopicPartition topicPartition = new TopicPartition(topic, partition);
            logger.info("partition: " + topicPartition + " submit offset: " + (offset + 1L) + " to consumer task");
            Map<TopicPartition, OffsetAndMetadata> map = Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset + 1L));
            commitQueue.add(map);
            //置空
            lastUncommittedRecord = null;
        }
    }

    //consumer线程向处理线程的队列中添加record
    public void addRecordToQueue(ConsumerRecord<String, String> record) {
        try {
            queue.put(record);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void process(ConsumerRecord<String, String> record) {
        //具体业务逻辑
        //System.out.println(record);
    }
}

改进

  1. 对处理程序RecordProcessor进行抽象,抽象出BasePropessor父类。以后业务需求需要不同的处理程序RecordProcessor就可以灵活改变了。
  2. 反射来构建RecordProcessor??在配置文件配置具体要new的RecordProcessor类路径,然后在创建MsgReceiver的时候传递进去。

以上是关于kafka系列 -- 多线程消费者实现的主要内容,如果未能解决你的问题,请参考以下文章

kafka多线程消费

如何在 kafka 0.9.0 中使用多线程消费者?

SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段

Kafka consumer在项目中的多线程处理方式

Apache Kafka系列 多线程Consumer方案

Kafka分区数与消费者个数