Java实现阻塞队列简易线程池计时器

Posted 保护眼睛

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java实现阻塞队列简易线程池计时器相关的知识,希望对你有一定的参考价值。

Java实现阻塞队列、线程池、计时器

Java实现阻塞队列、简易线程池、计时器

package demo0821;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
 * user:ypc;
 * date:2021-08-21;
 * time: 9:12;
 */
public class Main 
    static class BlockingQueue 
        private Integer[] items = new Integer[100];
        private int head;
        private int tail;
        private int size;
        Object lock = new Object();

        public void put(int val) throws InterruptedException 
            synchronized (lock) 
                while (size == this.items.length) 
                    lock.wait();
                

                items[tail++] = val;
                if (tail == items.length) 
                    tail = 0;
                
                size++;
                lock.notifyAll();

            
        

        public Integer get() throws InterruptedException 
            synchronized (lock) 

                while (size == 0) 
                    lock.wait();
                
                Integer res = null;
                res = items[head];
                head++;
                if (head >= items.length) 
                    head = 0;
                
                size--;

                lock.notifyAll();

                return res;
            
        
    



class Test 
    public static void main(String[] args) 
        Main.BlockingQueue blockingQueue = new Main.BlockingQueue();

        Thread threadPut = new Thread(new Runnable() 
            @Override
            public void run() 
                for (int i = 0; i < 190; i++) 
                    try 
                        System.out.println("put " + i);
                        blockingQueue.put(i);
//                        Thread.sleep(1000);
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    

                
            
        );
        Thread threadTake = new Thread(new Runnable() 
            @Override
            public void run() 
                for (int i = 0; i < 190; i++) 

                    try 
                        System.out.println("take " + blockingQueue.get());
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    

                
            
        );

        threadPut.start();
        ;
        threadTake.start();
    


class Task implements Comparable<Task> 
    public Runnable command;
    public long time;

    Task(Runnable command, long time) 
        this.command = command;
        this.time = System.currentTimeMillis() + time;
    

    public void run() 
        command.run();
    


    @Override
    public int compareTo(Task o) 
        return (int) (this.time - o.time);
    


class Timer 
    PriorityBlockingQueue<Task> priorityQueue = new PriorityBlockingQueue<>();


    private Object lock = new Object();

    public void schedule(Runnable runnable, long time) 
        Task task = new Task(runnable, time);

        priorityQueue.put(task);

        //在插入元素的时候唤醒等待的线程,重新比较时间
        synchronized (lock) 
            lock.notifyAll();
        
    

    Timer() 
        Thread thread = new Thread(new Runnable() 
            @Override
            public void run() 
                while (true) 
                    Task task = null;
                    try 
                        task = priorityQueue.take();
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    
                    long time = System.currentTimeMillis();
                    if (time >= task.time) 
                        task.run();
                     else 
                        priorityQueue.put(task);
                        synchronized (lock) 
                            try 
                                lock.wait(task.time - time);
                             catch (InterruptedException e) 
                                e.printStackTrace();
                            
                        

                    
                
            
        );
        thread.start();
    



class TimerTest 
    public static void main(String[] args) 
        Timer timer = new Timer();


        Thread thread = new Thread(new Runnable() 
            @Override
            public void run() 
                System.out.println("hello");
                timer.schedule(this, 3000);
            
        );

        timer.schedule(thread, 2000);
    



class ExecutorsTest 
    static class ThreadPool 
        // 1. 先描述一个任务. 直接使用 Runnable, 不需要额外的类
        // 2. 组织若干个任务. 使用 阻塞队列 来组织
        private BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();

        // 3. 描述一个线程, 用来进行工作
        static class Worker extends Thread 
            private BlockingQueue<Runnable> queue = null;

            public Worker(BlockingQueue<Runnable> queue) 
                this.queue = queue;
            

            @Override
            public void run() 
                while (true) 
                    try 
                        Runnable runnable = queue.take();
                        runnable.run();
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    
                
            
        

        // 4. 把线程给组织起来
        private List<Worker> workers = new ArrayList<>();

        // 设定线程池中的线程最大数目
        private static final int MAX_WORKERS_COUNT = 10;

        // 核心接口 execute
        public void execute(Runnable command) 
            try 
                if (workers.size() < MAX_WORKERS_COUNT) 
                    // 当前池子里没有足够的线程, 就创建个新的线程.
                    Worker worker = new Worker(queue);
                    worker.start();
                    workers.add(worker);
                
                queue.put(command);
             catch (InterruptedException e) 
                e.printStackTrace();
            
        
    

    public static void main(String[] args) 
        ThreadPool pool = new ThreadPool();
        for (int i = 0; i < 100; i++) 
            pool.execute(new Runnable() 
                @Override
                public void run() 
                    System.out.println("hello" + Thread.currentThread().getName());
                
            );
        
    


以上是关于Java实现阻塞队列简易线程池计时器的主要内容,如果未能解决你的问题,请参考以下文章

Java实现阻塞队列简易线程池计时器

python 之 并发编程(进程池与线程池同步异步阻塞非阻塞线程queue)

多线程四大经典案例及java多线程的实现

多线程四大经典案例

线程Queue定时器进程池和线程池同步异步

java阻塞队列 线程同步合作