java并发 day05 ThreadPoolExecutorJUC

Posted halulu.me

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java并发 day05 ThreadPoolExecutorJUC相关的知识,希望对你有一定的参考价值。

目录

ThreadPoolExecutor

线程池的拒绝策略有4种

AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
CallerRunsPolicy 让调用者运行任务
DiscardPolicy 放弃本次任务
DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之

JDK Executors 类中提供了众多工厂方法来创建各种用途的线程池

线程池状态

newFixedThreadPool

特点:

核心线程数 == 最大线程数(没有救急线程被创建),因此,也没有超时时间
阻塞队列是无界的,可以放任意是数量的任务
适用于任务量已知,相对耗时的任务。

newCachedThreadPool

特点:

核心线程是0,救急线程数Integer.MAX_VALUE,60s后会被回收
救急线程可以无限创建
队列采用了SynchronuxQueue,实现的特点是,他没有容量,没有线程来取是放不进去的(一手交钱,一手交货)
整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线程。 适合任务数比较密集,但每个任务执行时间较短的情况

newSingleThreadExecutor

使用场景:

希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。

区别:

自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作

1、Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改。FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法

2、Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改。对外暴露的是ThreadPoolExecutor对象,可以强转后调用 setCorePoolSize 等方法进行修改

提交任务

关闭线程池

饥饿现象

饥饿

固定大小线程池会有饥饿现象。这是由于线程池的数量不足以及分工不明确导致导致的。
比如,t1和t2是线程池中的2个线程,他们都需要处理2个任务A,B任务,这是2个阶段任务,只有A执行完毕,B任务才能开始执行。
如果t1执行A,t2接着执行B,这是没有问题的。
但是如果同时来了2个用户,t1执行了第一个用户的A任务,t2执行了第2个用户的A任务,这就会造成B任务没有人执行,也就是所谓的饥饿现象。

解决方法:

不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率

创建多少线程池合适

过小会导致程序不能充分地利用系统资源、容易导致饥饿
过大会导致更多的线程上下文切换,占用更多内存

CPU 密集型运算

CPU 密集型运算主要指应用程序需要大量CPU支持,比如数据分析。

通常采用 cpu 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费

I/O 密集型运算

CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程RPC调用时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。

经验公式如下
线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间

例如 4 核 CPU 计算时间是 50% ,其它等待时间是 50%,期望 cpu 被 100% 利用,套用公式 4 * 100% * 100% / 50% = 8
例如 4 核 CPU 计算时间是 10% ,其它等待时间是 90%,期望 cpu 被 100% 利用,套用公式4 * 100% * 100% /10% = 40

任务调度线程池

newScheduledThreadPool

@Slf4j
public class Test41 
    public static void main(String[] args) 
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
        pool.scheduleAtFixedRate(()->
            log.info("runing...");
        ,1,1, TimeUnit.SECONDS);
    

正确处理线程池中的异常

1、主动抓异常。
2、使用Future,获取返回值的时候也会抛出异常。

定时任务

public class TestSchedule 

    // 如何让每周四 18:00:00 定时执行任务?
    public static void main(String[] args) 
        //  获取当前时间
        LocalDateTime now = LocalDateTime.now();
        System.out.println(now);
        // 获取周四时间
        LocalDateTime time = now.withHour(18).withMinute(0).withSecond(0).withNano(0).with(DayOfWeek.THURSDAY);
        // 如果 当前时间 > 本周周四,必须找到下周周四
        if(now.compareTo(time) > 0) 
            time = time.plusWeeks(1);
        
        System.out.println(time);
        // initailDelay 代表当前时间和周四的时间差
        // period 一周的间隔时间
        long initailDelay = Duration.between(now, time).toMillis();
        long period = 1000 * 60 * 60 * 24 * 7;
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
        pool.scheduleAtFixedRate(() -> 
            System.out.println("running...");
        , initailDelay, period, TimeUnit.MILLISECONDS);
    


fork/join

1、Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型运算
2、Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率
3、Fork/Join 默认会创建与 cpu 核心数大小相同的线程池

使用

提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值)

public class TestForkJoin2 

    public static void main(String[] args) 
        ForkJoinPool pool = new ForkJoinPool(4);
        System.out.println(pool.invoke(new MyTask(5)));
        // new MyTask(5)  5+ new MyTask(4)  4 + new MyTask(3)  3 + new MyTask(2)  2 + new MyTask(1)
    


// 1~n 之间整数的和
class MyTask extends RecursiveTask<Integer> 

    private int n;

    public MyTask(int n) 
        this.n = n;
    

    @Override
    protected Integer compute() 
        // 如果 n 已经为 1,可以求得结果了
        if (n == 1) 
            return n;
        
        // 将任务进行拆分(fork)
        MyTask t1 = new MyTask(n - 1);
        t1.fork();
        // 合并(join)结果
        int result = n + t1.join();
        return result;
    

JUC

AQS

全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架
子类需要继承该父类

特点:

1、 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
getState - 获取 state 状态
setState - 设置 state 状态
compareAndSetState - cas 机制设置 state状态
独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
初始值是0,使用compareAndSerState(0,1)修改state为1,如果成功则占有锁成功,反之亦然。
使用setState(0)释放锁

2、提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList

3、条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet

4、子类主要实现这样一些方法(默认抛出UnsupportedOperationException)
tryAcquire 独占
tryRelease 独占
tryAcquireShared 共享
tryReleaseShared 共享
isHeldExclusively 是否持有独占锁

自定义锁

// 自定义锁(不可重入锁)
class MyLock implements Lock 

    // 独占锁  同步器类
    class MySync extends AbstractQueuedSynchronizer 
        @Override
        protected boolean tryAcquire(int arg) 
            if(compareAndSetState(0, 1)) 
                // 加上了锁,并设置 owner 为当前线程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            
            return false;
        

        @Override
        protected boolean tryRelease(int arg) 
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        

        @Override // 是否持有独占锁
        protected boolean isHeldExclusively() 
            return getState() == 1;
        

        public Condition newCondition() 
            return new ConditionObject();
        
    

    private MySync sync = new MySync();

    @Override // 加锁(不成功会进入等待队列)
    public void lock() 
        sync.acquire(1);
    

    @Override // 加锁,可打断
    public void lockInterruptibly() throws InterruptedException 
        sync.acquireInterruptibly(1);
    

    @Override // 尝试加锁(一次)
    public boolean tryLock() 
        return sync.tryAcquire(1);
    

    @Override // 尝试加锁,带超时
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException 
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    

    @Override // 解锁
    public void unlock() 
        sync.release(1);
    

    @Override // 创建条件变量
    public Condition newCondition() 
        return sync.newCondition();
    

ReentrantLock

可重入原理

第1次加锁会将AQS的state状态改为1,同时将当前线程设置为ExclusiveOwnerThread,加锁成功。
第2次加锁如果该线程还是当前线程,就会将state自增变为2并返回true,这就表示发生了锁重入。

第1次释放锁,会将State自减变为1,这时释放锁失败。
第2次释放锁,会再次将State自减变为0,同时设置ExclusiveOwnerThread为null,这时释放锁成功。

ReentrantReadWriteLock

当读操作远远高于写操作时,这时候使用 读写锁 让 读-读 可以并发,提高性能。
其中,读-读是并发的,读-写是互斥的,写-写是互斥的

注意事项

读锁不支持条件变量
重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
重入时降级支持:即持有写锁的情况下去获取读锁

StampedLock

该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用
StampedLock 不支持条件变量
StampedLock 不支持可重入

乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。
先乐观读,如果数据没有改变,就不加锁。如果数据改变,就加锁

class DataContainerStamped 
    private int data;
    private final StampedLock lock = new StampedLock();

    public DataContainerStamped(int data) 
        this.data = data;
    

    public int read(int readTime) 
        long stamp = lock.tryOptimisticRead();
        sleep(readTime);
        if (lock.validate(stamp)) 
            return data;
        
        // 锁升级 - 读锁
        try 
            stamp = lock.readLock();
            sleep(readTime);
            return data;
         finally 
            lock.unlockRead(stamp);
        
    

    public void write(int newData) 
        long stamp = lock.writeLock();
        try 
            sleep(2);
            this.data = newData;
         finally 
            lock.unlockWrite(stamp);
        
    

Semaphore

信号量,用来限制能同时访问共享资源的线程上限。

CountdownLatch

用来进行线程同步协作,等待所有线程完成倒计时。
其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown()用来让计数减一

应用:(游戏加载)

public class TestCountDownLatch 
    public static void main(String[] args) throws InterruptedException 
        ExecutorService pool = Executors.newFixedThreadPool(5);
        CountDownLatch countDownLatch = new CountDownLatch(5);
        Random random = new Random();
        String[] all = new String[5];
        for (int j = 0; j < 5; j++) 
            int k = j;
            pool.submit(()->
                for (int i = 0; i <= 100; i++) 
                    try 
                        TimeUnit.MILLISECONDS.sleep(random.nextInt(00));
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    
                    all[k] = i +"%";
                    System.out.print("\\r" + Arrays.toString(all));
                
                countDownLatch.countDown();
            );
        
        countDownLatch.await();
        System.out.println("\\n" + "游戏开始");
        pool.shutdown();

    

CyclicBarrier

循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行.
CyclicBarrier 与 CountDownLatch 的主要区别在于 CyclicBarrier 是可以重用的 CyclicBarrier 可以被比喻为『人满发车』

ConcurrentHashMap

内部很多操作使用 cas 优化,一般可以提供较高吞吐量

注意:

每个方法是原子的,但不代表2个方法联合起来就是原子的。
比如concurrentHashMap的put()和get()是线程安全的,但是联合起来用就不是线程安全的。

如何解决:(正确使用ConcurrentHashMap)

方法1,加锁(不推荐使用)

方法2:使用内部方法

HashMap并发死锁(多线程下使用HashMap)

1、究其原因,是因为在多线程环境下使用了非线程安全的 map 集合(JDK7)
2、JDK7的HashMap会将同一个桶中后加入的元素放入链表头,由此就导致了扩容前和扩容后的顺序发生了颠倒。在多线程下会形成循环链表,因此造成死锁(四循环)。
3、JDK 8 虽然将扩容算法做了调整,不再将元素加入链表头(而是保持与扩容前一样的顺序),但仍不意味着能够在多线程环境下能够安全扩容,还会出现其它问题(如扩容丢数据)
4、多线程下使用concurentHashMap

扩容前

扩容后

以上是关于java并发 day05 ThreadPoolExecutorJUC的主要内容,如果未能解决你的问题,请参考以下文章

java并发 day05 ThreadPoolExecutorJUC

java并发 day05 ThreadPoolExecutorJUC

Java并发编程——线程池

Java线程——线程池概念

Java找工作必备知识——day05事务详解(小王,小刘情景在线解释事务的属性)

Day839.并发容器-Java 并发编程实战