优先级线程池实现

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了优先级线程池实现相关的知识,希望对你有一定的参考价值。

运维在升级,无聊写博客

  最近在实现消息通知平台上面,对于针对不同的通知需要设置优先级,实现当通知队列堵塞的时候可以有限推送高优先级的消息。为了保证通知队列的高效并发,通知队列的消费端是采用多线程并发处理的,因此需要实现一个可以实现优先级的多线程处理逻辑:

对于ThreadPollExecutor来说,

public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)

如果实现优先级线程池需要注意一下三点

1.线程池中新加入的线程会放到workQueue中,如果是优先级队列,那么该参数必须要是PriorityBlockingQueue。

2.PriorityBlockingQueue容器中最终存储的是FutureTask对象,改对象是newTaskFor实例化的,因此需要实现继承自Comparable的FutureTask实现【例如:ComparableFutureTask】

3.ComparableFutureTask中实现比较线程的优先级,需要将实例化具有优先级的线程对象【例如:PriorityTask】

 

如上根据上面的点,可参考的代码如下

【PriorityTask】

public abstract class PriorityTask implements Runnable, Comparable<PriorityTask> {

    private Integer prority;



    public PriorityTask(Integer prority) {
        this.prority = prority;
    }

    @Override
    public abstract void run();
    

    @Override
    public int compareTo(PriorityTask o) {

        return prority.compareTo(o.prority);
    }
}
View Code

 

【带有ComparableFutureTask的PriorityThreadPoolExecutor】

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


public class PriorityThreadPoolExecutor extends ThreadPoolExecutor {

    public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                    long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
                super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
            }

            public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                    long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
                    RejectedExecutionHandler handler) {
                super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
            }

            public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                    long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
                    ThreadFactory threadFactory) {
                super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
            }

            public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                    long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
                    ThreadFactory threadFactory, RejectedExecutionHandler handler) {
                super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
            }

            @Override
            protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
                return new ComparableFutureTask<T>(runnable, value);
            }

            @Override
            protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
                return new ComparableFutureTask<T>(callable);
            }

            protected class ComparableFutureTask<V>
                    extends FutureTask<V> implements Comparable<ComparableFutureTask<V>> {
                private Object object;

                public ComparableFutureTask(Callable<V> callable) {
                    super(callable);
                    object = callable;
                }

                public ComparableFutureTask(Runnable runnable, V result) {
                    super(runnable, result);
                    object = runnable;
                }

                @Override
                @SuppressWarnings("unchecked")
                public int compareTo(ComparableFutureTask<V> o) {
                    if (this == o) {
                        return 0;
                    }
                    if (o == null) {
                        return -1; // high priority
                    }
                    if (object != null && o.object != null) {
                        if (object.getClass().equals(o.object.getClass())) {
                            if (object instanceof Comparable) {
                                return ((Comparable) object).compareTo(o.object);
                            }
                        }
                    }
                    return 0;
                }
            }
}
View Code

【使用代码如下】

import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.wjs.common.config.ConfigUtil;
import com.wjs.message.bo.NotifyMessage;
import com.wjs.message.service.notify.NotifyService;
import com.wjs.message.service.queue.QueueService;

/**
 * 消息发送队列,优先级队列实现
 * 
 * @author Silver
 * @date 2016年12月20日 下午8:20:45
 * 
 *
 */
@Service("queueService")
public class QueueServicePriorityImpl implements QueueService {

    private static final Logger LOGGER = LoggerFactory.getLogger(QueueServicePriorityImpl.class);

    private volatile static Queue<NotifyMessage> queue = new PriorityBlockingQueue<NotifyMessage>(100000);

    @Autowired
    NotifyService notifyService;

    /**
     * 服务初始化,启动队列消费
     * 
     * @author Silver
     * @date 2016年12月21日 上午9:07:20
     */
    @PostConstruct
    public void init() {

        new Thread(new Runnable() {

            @Override
            public void run() {

                Integer execSize = ConfigUtil.getInteger("message.queue.poll.execsize");
                if (null == execSize || execSize == 0) {
                    // 由于任务后续是发送邮件/短信/调用APP推送/调用dubbo的,是非CPU密集型的计算,因此线程数控制在核数  * 3的值
                    execSize = Double.valueOf(Runtime.getRuntime().availableProcessors() * 3).intValue();
                }
                LOGGER.info("Queue_Consume_thread_size:{}",execSize);
                ExecutorService es = new PriorityThreadPoolExecutor(execSize, execSize,
                                0L, TimeUnit.MILLISECONDS,
                                new PriorityBlockingQueue<Runnable>());
                while (true) {
                    //**    poll         移除并返问队列头部的元素    如果队列为空,则返回null
                    final NotifyMessage message = queue.poll();
                    if (null != message) {
                        es.submit(new PriorityTask(message.getPriority()) {

                            @Override
                            public void run() {

                                try {
                                    System.out.println(message);
                                } catch (Exception e) {
                                    LOGGER.error("MessageQueue-ERROR->:{},", message, e);
                                }
                            }
                        });
                    }
                }

            }
        }).start();
    }

    @Override
    public boolean push(NotifyMessage message) {
        //        offer  添加一个元素并返回true 如果队列已满,或者异常情况,则返回false

        return queue.offer(message);
    }

    @Override
    public Integer size() {

        return queue.size();
    }

}
View Code

【单测代码如下】

import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;

import com.wjs.message.bo.NotifyMessage;
import com.wjs.message.bo.NotifyMessageEmail;
import com.wjs.message.bo.NotifyMessagePush;
import com.wjs.message.bo.NotifyMessageSms;
import com.wjs.message.bo.NotifyMessageSys;
import com.wjs.message.service.BaseServiceTest;


public class QueueTestService extends BaseServiceTest{

    
    @Autowired
    QueueService queueService;
    
    @Test
    public void testPull(){
        for (int i = 10000; i > 1; i--) {

            NotifyMessage message = new NotifyMessage();
            switch (i % 3) {
            case 0:
                message = new NotifyMessageEmail();
                message.setContent("Email"+i);
                break;
            case 1:
                message = new NotifyMessageSys();
                message.setContent("Sys"+i);
                break;
            case 2:
                message = new NotifyMessageSms();
                message.setContent("Sms"+i);
                break;
            case 3:
                message = new NotifyMessagePush();
                message.setContent("Push"+i);
                break;

            default:
                break;
            }
            message.setContent(i+"");
            message.setPriority(i);
            queueService.push(message);
        }
        
        try {
            Thread.sleep(1000000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
View Code

 

以上是关于优先级线程池实现的主要内容,如果未能解决你的问题,请参考以下文章

线程池-实现一个取消选项

优先级线程池实现

java高性能编程基础 - 线程池的应用及实现原理

Motan在服务provider端用于处理request的线程池

线程池:业务代码常见的问题

4种线程池和7种并发队列