并发编程进阶学习篇

Posted adventure.Li

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程进阶学习篇相关的知识,希望对你有一定的参考价值。

相关说明

该文章相关的学习笔记及练习代码将会放在个人 github, 博主也会尽量进行github的日更新,博客将会不定期进行更新,在更新的过程可能存在诸多不足的地方,愿大家指出,可博客私信/评论,也可QQ :674619459 联系我,一起讨论,还可以在GitHub上面提issue 。欢迎大家关注我的github,一起学习Java,一起进步!!。

一、学习目标

  • 学习并发编程思维(OS的进程、线程管理)及海量业务处理
  • 熟悉Java并发编程API的使用(可结合JDK源码进行分析)
  • 结合nacos等开源框架分析并发编程的设计,提升设计能力

二、相关书籍

  1. 操作系统内功
  • 见计算机基础部分
  1. Java并发书籍
  • java 并发编程从入门到精通:从基础概述到线程安全、API及线程池框架等使用
  • java 并发编程的艺术:比较贴近实操,API及JVM的内存模型等
  • java 并发编程实战:和并发编程的艺术类似
  • Java并发实现原理:JDK源码剖析 ,深入地进行剖析讲解。
  1. 相关官网推荐

本次目录

一、多线程之间的交互

阻塞队列

  1. 基本概念

对于队列的概念肯定是比较熟悉,即FIFO的一种受限线性表,在Java中有Queue的顶级接口。对应队列的主要操作有 插入、删除,在不考虑多线程的环境下;若队列为满继续添加或为空继续删除,一般的设计面向该失败的策略有抛异常、扩容、返回特殊值,必须严格按照顺序不停留的执行。而在多线程或者单线程考虑阻塞的情况下,则可进行放缓,可以进行等待满足要求再进行操作,这个等待的过程即为阻塞的来源,对应典型的消费者和生产者模型。

  1. 基本分类

在Java中提供的阻塞队列有以下:

  1. 如何设计
  • 阻塞队列 ,再普通队列的基础上 添加take(),poll()两种关键性操作,实现的机制主要通过ReetrantLock锁机制进行在相应的接口判断是否为空/满,在该情况下则进行阻塞。

  • 阻塞队列的设计代码

    public E take() throws InterruptedException 
        final E x;
        final int c;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try 
            while (count.get() == 0) 
                notEmpty.await();
            
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
         finally 
            takeLock.unlock();
        
        if (c == capacity)
            signalNotFull();
        return x;
    
    
  • 关于锁的设计

  • ArrayBlockingQueue 与LinkedBlockingQueue(采用独立锁,性能更优)的锁设计对比 代码

    
    ArrayBlockingQueue/** Main lock guarding all access */
        final ReentrantLock lock;
    
        /** Condition for waiting takes */
        @SuppressWarnings("serial")  // Classes implementing Condition may be serializable.
        private final Condition notEmpty;
    
        /** Condition for waiting puts */
        @SuppressWarnings("serial")  // Classes implementing Condition may be serializable.
        private final Condition notFull;
    
    LinkedBlockingQueue: 采用独立锁,性能更高,
    
    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();
    
    /** Wait queue for waiting takes */
    @SuppressWarnings("serial")// Classes implementing Condition may be serializable.
    private final Condition notEmpty = takeLock.newCondition();
    
    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();
    
    /** Wait queue for waiting puts */
    @SuppressWarnings("serial")// Classes implementing Condition may be serializable.
    private final Condition notFull = putLock.newCondition();
    

主要工具类

  • Semaphore
  • CountDownLatch
  • CyclicBarrier

抽象队列化同步器AQS

二、线程池

基本概念

  1. 目标
  • 合理管理资源、线程,防止资源耗尽宕机
  • 更加有效地使用线程
  • 设计相关类图

基本原理

分析源码:ThreadPoolExecutor

自定义线程池

在知道关于线程池的七大核心参数、四大拒绝策略和线程池的工作流程之后,进行自定义的线程池设计将会是顺手拈来。接下来,我们来简单尝试一下。

  • 创建线程工厂
public class DefaultThreadFactory implements ThreadFactory 
    
    private final String DOT = ".";
    
    private volatile AtomicInteger threadId = new AtomicInteger(0);
    
    private String name;
    
    public DefaultThreadFactory(String name) 
        if(!name.endsWith(DOT))
            name = name + DOT;
        
        this.name = name;
    
    
    @Override
    public Thread newThread(Runnable r) 
        int id  = threadId.getAndIncrement();
        String threadName = name + id;
        Thread thread = new Thread(r,threadName);
        thread.setDaemon(true);// 守护线程,--可结合docker容器运用交互式、守式
        return thread;
    

  • 配置参数的常量
public class ExecutorConfig 
    public static final StringDEFAULT_THREAD_FACTORY_NAME="demo.thread.factory.test";
    public static final IntegerTEST_EXECUTOR_CORE_SIZE= 2;
    public static final IntegerTEST_EXECUTOR_MAX_SIZE= 9;
    public static final IntegerTEST_EXECUTOR_QUEUE_SIZE= 5;

  • 定义线程池
public class TestExecutor 

//定义线程池
//阻塞队列若不设置,则无限大
private final static Executorexecutor= new ThreadPoolExecutor(ExecutorConfig.TEST_EXECUTOR_CORE_SIZE,ExecutorConfig.TEST_EXECUTOR_MAX_SIZE,1000L,
            TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(ExecutorConfig.TEST_EXECUTOR_QUEUE_SIZE),new DefaultThreadFactory(ExecutorConfig.DEFAULT_THREAD_FACTORY_NAME),
            new ThreadPoolExecutor.DiscardOldestPolicy());
//定义延时线程池
private final static ExecutorscheduleExecutor= new ScheduledThreadPoolExecutor(ExecutorConfig.TEST_EXECUTOR_CORE_SIZE,
            new DefaultThreadFactory(ExecutorConfig.DEFAULT_THREAD_FACTORY_NAME));

    public static Executor getExecutor()
           returnexecutor;
    

    public static Executor getScheduleExecutor()
           returnscheduleExecutor;
    

  • 定义任务
  • 测试
public class ExecutorTests 
    @Test
    public void test()
        final int taskNum = 100;
        Executor executor =  TestExecutor.getExecutor();
        for(int i=0;i<taskNum;i++)
            executor.execute(new TestTask());//"task-"+i
try 
            TimeUnit.SECONDS.sleep(1);
         catch (InterruptedException e) 
            e.printStackTrace();
        
    

简单应用分析

  • Tomcat的请求线程池处理
  • nginx的并发请求处理
  • Nacos的线程池框架分析

在nacos中,由于是分布式部署,Server之间需要进行通信保持数据或服务的一致性。因此,在配置变更时需要进行通知其他的server,为了保证通信通知的高效与服务的高可用,此处可采用线程池进行异步化的处理。在nacos中的通知机制设计的关键类为AsyncNotifyService 。关于通知机制的大致图如下:

对于通知机制,此篇文章不去探讨,我们主要来看看通知机制的底层多线程的设计。

ConfigExecutor 入手,进行点击快捷键 ctrl+alt(进入实现),ctrl (点击方法,查看实现),进行查看分析。我们主要去分析一下线程池的执行任务类型(延时定时任务、立刻提交执行任务)、执行任务的工厂及命名、资源分配大小等管理。简化版的大致逻辑如下:

在ConfigExecutor进行统一定义了所有Config配置所需要的线程池,因为是常量,地址不变,线程池是共享。设计为常用类、线程池变量为静态共享常量:

final class ConfigExecutor

private static final ExecutorDUMP_EXECUTOR= ExecutorFactory.Managed
        .newSingleExecutorService(ClassUtils.getCanonicalName(Config.class),
                new NameThreadFactory("com.alibaba.nacos.config.embedded.dump"));

并暴露了调用任务、延时任务等两种主要类型的方法供外部类调用(对于上层通知机制或其他应用层即可直接调用):

public static void executeAsyncNotify(Runnable runnable) 
ASYNC_NOTIFY_EXECUTOR.execute(runnable);


public static void scheduleAsyncNotify(Runnable command, long delay, TimeUnit unit) 
ASYNC_NOTIFY_EXECUTOR.schedule(command, delay, unit);

在进行分配线程池时,通过 ExecutorFactory.Managed 进行统一分配管理,而线程工厂则通过new NameThreadFactory()进行创建;

  1. NameThreadFactory中进行线程的命名管理及线程设置(例如守护线程的设置)
  2. ExecutorFactory.Managed 进行资源分配、组命名等管理。此处重点关注private static final ThreadPoolManager *THREAD_POOL_MANAGER* = ThreadPoolManager.*getInstance*(); 中的ThreadPoolManager

该处采用单例设计模式,进行保证线程池管理类的单例,资源不被随意消耗。所有的线程池创建来源需要向此处进行注册。

  1. ThreadPoolManager 中主要关注resourcesManagerlockers 的设计,此处有锁机制,因为在创建线程池的同时存在并发写(向其中注册线程池资源)的问题,因此需要进行并发锁机制控制,设置如以下代码(lock设置为的map结构,对线程池进行命名空间分组管理,对应的锁也是锁对应的命名空间组,有点类似ConcurrentHashMap中的Segement分段锁的感觉)。

private Map<String, Map<String, Set<ExecutorService>>> resourcesManager;

private Map<String, Object> lockers = new ConcurrentHashMap<>(8);

public void register(String namespace, String group, ExecutorService executor) 
    if (!resourcesManager.containsKey(namespace)) 
        lockers.putIfAbsent(namespace, new Object());
    
    final Object monitor = lockers.get(namespace);
    synchronized (monitor) 
        Map<String, Set<ExecutorService>> map = resourcesManager.get(namespace);
        if (map == null) 
            map = new HashMap<>(8);
            map.computeIfAbsent(group, key -> new HashSet<>()).add(executor);
            resourcesManager.put(namespace, map);
            return;
        
        map.computeIfAbsent(group, key -> new HashSet<>()).add(executor);
    

以上是关于并发编程进阶学习篇的主要内容,如果未能解决你的问题,请参考以下文章

Java 并发学习总结

Java高级工程师进阶学习:kafka应用场景

Python学习笔记——进阶篇第八周———Socket编程进阶&多线程多进程

go语言学习笔记 — 进阶 — 并发编程:为函数创建goroutine

Java并发编程系列之一并发理论基础

Python学习第25篇:concurrent.futures模块(进程池,线程池)