并发编程进阶学习篇
Posted adventure.Li
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程进阶学习篇相关的知识,希望对你有一定的参考价值。
相关说明
该文章相关的学习笔记及练习代码将会放在个人 github, 博主也会尽量进行github的日更新,博客将会不定期进行更新,在更新的过程可能存在诸多不足的地方,愿大家指出,可博客私信/评论,也可QQ :674619459 联系我,一起讨论,还可以在GitHub上面提issue 。欢迎大家关注我的github,一起学习Java,一起进步!!。
一、学习目标
- 学习并发编程思维(OS的进程、线程管理)及海量业务处理
- 熟悉Java并发编程API的使用(可结合JDK源码进行分析)
- 结合nacos等开源框架分析并发编程的设计,提升设计能力
二、相关书籍
- 操作系统内功
- 见计算机基础部分
- Java并发书籍
- java 并发编程从入门到精通:从基础概述到线程安全、API及线程池框架等使用
- java 并发编程的艺术:比较贴近实操,API及JVM的内存模型等
- java 并发编程实战:和并发编程的艺术类似
- Java并发实现原理:JDK源码剖析 ,深入地进行剖析讲解。
- 相关官网推荐
- ifeve.com :并发编程网
- https://docs.oracle.com/javase/8/docs/technotes/guides/concurrency/index.html:java 并发官方文档
本次目录
一、多线程之间的交互
阻塞队列
- 基本概念
对于队列的概念肯定是比较熟悉,即FIFO的一种受限线性表,在Java中有Queue的顶级接口。对应队列的主要操作有 插入、删除,在不考虑多线程的环境下;若队列为满继续添加或为空继续删除,一般的设计面向该失败的策略有抛异常、扩容、返回特殊值,必须严格按照顺序不停留的执行。而在多线程或者单线程考虑阻塞的情况下,则可进行放缓,可以进行等待满足要求再进行操作,这个等待的过程即为阻塞的来源,对应典型的消费者和生产者模型。
- 基本分类
在Java中提供的阻塞队列有以下:
- 如何设计
-
阻塞队列 ,再普通队列的基础上 添加take(),poll()两种关键性操作,实现的机制主要通过ReetrantLock锁机制进行在相应的接口判断是否为空/满,在该情况下则进行阻塞。
-
阻塞队列的设计代码
public E take() throws InterruptedException final E x; final int c; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try while (count.get() == 0) notEmpty.await(); x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); finally takeLock.unlock(); if (c == capacity) signalNotFull(); return x;
-
关于锁的设计
-
ArrayBlockingQueue 与LinkedBlockingQueue(采用独立锁,性能更优)的锁设计对比 代码
ArrayBlockingQueue: /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ @SuppressWarnings("serial") // Classes implementing Condition may be serializable. private final Condition notEmpty; /** Condition for waiting puts */ @SuppressWarnings("serial") // Classes implementing Condition may be serializable. private final Condition notFull; LinkedBlockingQueue: 采用独立锁,性能更高, /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ @SuppressWarnings("serial")// Classes implementing Condition may be serializable. private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ @SuppressWarnings("serial")// Classes implementing Condition may be serializable. private final Condition notFull = putLock.newCondition();
主要工具类
- Semaphore
- CountDownLatch
- CyclicBarrier
抽象队列化同步器AQS
二、线程池
基本概念
- 目标
- 合理管理资源、线程,防止资源耗尽宕机
- 更加有效地使用线程
- 设计相关类图
基本原理
分析源码:ThreadPoolExecutor
自定义线程池
在知道关于线程池的七大核心参数、四大拒绝策略和线程池的工作流程之后,进行自定义的线程池设计将会是顺手拈来。接下来,我们来简单尝试一下。
- 创建线程工厂
public class DefaultThreadFactory implements ThreadFactory
private final String DOT = ".";
private volatile AtomicInteger threadId = new AtomicInteger(0);
private String name;
public DefaultThreadFactory(String name)
if(!name.endsWith(DOT))
name = name + DOT;
this.name = name;
@Override
public Thread newThread(Runnable r)
int id = threadId.getAndIncrement();
String threadName = name + id;
Thread thread = new Thread(r,threadName);
thread.setDaemon(true);// 守护线程,--可结合docker容器运用交互式、守式
return thread;
- 配置参数的常量
public class ExecutorConfig
public static final StringDEFAULT_THREAD_FACTORY_NAME="demo.thread.factory.test";
public static final IntegerTEST_EXECUTOR_CORE_SIZE= 2;
public static final IntegerTEST_EXECUTOR_MAX_SIZE= 9;
public static final IntegerTEST_EXECUTOR_QUEUE_SIZE= 5;
- 定义线程池
public class TestExecutor
//定义线程池
//阻塞队列若不设置,则无限大
private final static Executorexecutor= new ThreadPoolExecutor(ExecutorConfig.TEST_EXECUTOR_CORE_SIZE,ExecutorConfig.TEST_EXECUTOR_MAX_SIZE,1000L,
TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(ExecutorConfig.TEST_EXECUTOR_QUEUE_SIZE),new DefaultThreadFactory(ExecutorConfig.DEFAULT_THREAD_FACTORY_NAME),
new ThreadPoolExecutor.DiscardOldestPolicy());
//定义延时线程池
private final static ExecutorscheduleExecutor= new ScheduledThreadPoolExecutor(ExecutorConfig.TEST_EXECUTOR_CORE_SIZE,
new DefaultThreadFactory(ExecutorConfig.DEFAULT_THREAD_FACTORY_NAME));
public static Executor getExecutor()
returnexecutor;
public static Executor getScheduleExecutor()
returnscheduleExecutor;
- 定义任务
- 测试
public class ExecutorTests
@Test
public void test()
final int taskNum = 100;
Executor executor = TestExecutor.getExecutor();
for(int i=0;i<taskNum;i++)
executor.execute(new TestTask());//"task-"+i
try
TimeUnit.SECONDS.sleep(1);
catch (InterruptedException e)
e.printStackTrace();
简单应用分析
- Tomcat的请求线程池处理
- nginx的并发请求处理
- Nacos的线程池框架分析
在nacos中,由于是分布式部署,Server之间需要进行通信保持数据或服务的一致性。因此,在配置变更时需要进行通知其他的server,为了保证通信通知的高效与服务的高可用,此处可采用线程池进行异步化的处理。在nacos中的通知机制设计的关键类为AsyncNotifyService
。关于通知机制的大致图如下:
对于通知机制,此篇文章不去探讨,我们主要来看看通知机制的底层多线程的设计。
从ConfigExecutor
入手,进行点击快捷键 ctrl+alt(进入实现),ctrl (点击方法,查看实现),进行查看分析。我们主要去分析一下线程池的执行任务类型(延时定时任务、立刻提交执行任务)、执行任务的工厂及命名、资源分配大小等管理。简化版的大致逻辑如下:
在ConfigExecutor进行统一定义了所有Config配置所需要的线程池,因为是常量,地址不变,线程池是共享。设计为常用类、线程池变量为静态共享常量:
final class ConfigExecutor
private static final ExecutorDUMP_EXECUTOR= ExecutorFactory.Managed
.newSingleExecutorService(ClassUtils.getCanonicalName(Config.class),
new NameThreadFactory("com.alibaba.nacos.config.embedded.dump"));
并暴露了调用任务、延时任务等两种主要类型的方法供外部类调用(对于上层通知机制或其他应用层即可直接调用):
public static void executeAsyncNotify(Runnable runnable)
ASYNC_NOTIFY_EXECUTOR.execute(runnable);
public static void scheduleAsyncNotify(Runnable command, long delay, TimeUnit unit)
ASYNC_NOTIFY_EXECUTOR.schedule(command, delay, unit);
在进行分配线程池时,通过 ExecutorFactory.Managed
进行统一分配管理,而线程工厂则通过new NameThreadFactory()
进行创建;
NameThreadFactory
中进行线程的命名管理及线程设置(例如守护线程的设置)ExecutorFactory.Managed
进行资源分配、组命名等管理。此处重点关注private static final ThreadPoolManager *THREAD_POOL_MANAGER* = ThreadPoolManager.*getInstance*();
中的ThreadPoolManager
该处采用单例设计模式,进行保证线程池管理类的单例,资源不被随意消耗。所有的线程池创建来源需要向此处进行注册。
- 在
ThreadPoolManager
中主要关注resourcesManager
和lockers
的设计,此处有锁机制,因为在创建线程池的同时存在并发写(向其中注册线程池资源)的问题,因此需要进行并发锁机制控制,设置如以下代码(lock设置为的map结构,对线程池进行命名空间分组管理,对应的锁也是锁对应的命名空间组,有点类似ConcurrentHashMap中的Segement分段锁的感觉)。
private Map<String, Map<String, Set<ExecutorService>>> resourcesManager;
private Map<String, Object> lockers = new ConcurrentHashMap<>(8);
public void register(String namespace, String group, ExecutorService executor)
if (!resourcesManager.containsKey(namespace))
lockers.putIfAbsent(namespace, new Object());
final Object monitor = lockers.get(namespace);
synchronized (monitor)
Map<String, Set<ExecutorService>> map = resourcesManager.get(namespace);
if (map == null)
map = new HashMap<>(8);
map.computeIfAbsent(group, key -> new HashSet<>()).add(executor);
resourcesManager.put(namespace, map);
return;
map.computeIfAbsent(group, key -> new HashSet<>()).add(executor);
以上是关于并发编程进阶学习篇的主要内容,如果未能解决你的问题,请参考以下文章
Python学习笔记——进阶篇第八周———Socket编程进阶&多线程多进程