Java并发基础
Posted forever_elf
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发基础相关的知识,希望对你有一定的参考价值。
现代操作系统在运行一个程序时,会为其创建一个进程。现代操作系统调度的最小单元是线程,也叫轻量级进程(Light Weight Process)。在一个进程里可以创建多个线程,这些线程都拥有各自的程序计数器,堆栈和局部变量等属性,并能够访问共享的内存变量。
public class MultiThread{ public static void main(String[] args){ ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false); for(ThreadInfo threadInfo : threadInfos){ System.out.println("[" + threadInfo.getThreadId() + "]" + threadInfo.getThreadName()); } } }
使用多线程原因:
1. 更多的处理器核心
线程是大多数操作系统调度的基本单元,一个程序作为一个进程来运行,程序运行过程中能够创建多个线程,而一个线程在一个时刻只能运行在一个处理器核心上。
2. 更快的响应时间
3. 更好的变成模型
Java为多线程编程提供了良好,考究并且一致的变成模型,使开发人员能够更加专注于问题的解决,即为所遇到的问题建立合适的模型,而不是绞尽脑汁地考虑如何将其多线程化。
现代操作系统基本上采用时分的形式调度运行的线程,操作系统会分出一个个时间片,线程会分配到若干时间片,当线程的事件片用完了就会发生线程调度,并等待着下次分配。线程分配到的时间片多少决定了线程使用处理器资源的多少,而线程优先级就是决定线程需要多或少分配一些处理器资源的线程属性。
在Java线程中,通过一个整型成员变量priority来控制优先级,优先级范围从1~10,在线程构建的时候可以通过setPriority(int)方法来修改优先级,默认优先级是5,优先级高的线程分配的时间片的数量要多余优先级低的线程。设置线程优先级时,针对频繁阻塞(休眠或IO操作)的线程需要设置较高的优先级,而偏重计算(需要较多的CPU事件或偏运算)的线程则设置较低的优先级,确保处理器不会被独占。在不同的JVM及操作系统上,线程该规划会存在差异。
线程的优先级不能作为程序正确性的依赖,操作系统可以完全不理会Java线程对于优先级的设定。
Java线程在运行的生命周期可能处于6种不同的状态,在给定的一个时刻,线程只能处于其中的一个状态。
状态名称 | 说明 |
NEW | 初始状态,线程被构建,但还没调用start()方法 |
RUNNABLE | 运行状态,Java线程将操作系统中的就绪和运行两种状态笼统的称作“运行中” |
BLOCKED | 阻塞状态,表示线程阻塞于锁 |
WAITING | 等待状态,表示线程进入等待状态,进入该状态表示当前线程需要其他线程做出一些特定动作,如通知或中断 |
TIME_WAITING | 超时等待状态,该状态不同于WAITING,它是可以在指定的时间自行返回的 |
TERMINATED | 终止状态,表示当前线程已经执行完毕 |
线程创建之后,调用start()方法开始运行。当线程执行wait()方法之后,线程进入等待状态。进入等待状态的线程需要依靠其他线程的通知才能够返回到运行状态,而超市等待状态相当于在等待状态的基础上增加了超时限制,也就是超时时间到达时将会返回到运行状态。当线程调用同步方法时,在没有获取到锁的情况下,线程将会进入到阻塞状态。线程在执行Runnabke的run()方法后会进行到终止状态。
Daemon线程是一种支持型线程,因为他主要被用作程序中后台调度以及支持性工作。当一个Java虚拟机中不存在非Daemon线程时,Java虚拟机将会退出。可以通过调用Thread.setDaemon(true)将线程设置为Daemon线程。Daemon属性需要在启动线程之前设置。但在JVM退出时Daemon线程中的finally块并不一定会执行。因此不能依靠finally块中的内容来确保执行关闭或清理资源的逻辑。
在运行线程之前首先要构造一个线程对象,线程对象在构造的时候需要提供线程所需要的属性,若线程所属的线程组,线程优先级,是否是Daemon线程等信息。
private void init(ThreadGroup g, Runnable target, String name, long stackSize, AccessControlContext acc){ if(name == null){ throw new NullPointerException("name cannot be null"); } Thread parent = currentThread(); this.group = g; this.daemon = parent.getPrority(); this.name = name.toCharArray(); this.target = target; setPrority(priority); if(parent.inheritableThreadLocals != null){ this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); tid = nextThreadID(); } }
一个新构造的线程对象是由其parent线程来进行空间分配的,而child线程继承了parent是否为Daemon,优先级和加载资源的contextClassLoader以及可继承的ThreadLocal,同时还会分配一个唯一的ID来标识这个child线程。
线程对象在初始化完成之后,调用start()方法即可启动线程。start()方法的含义是:当前线程同步告知JVM,若线程规划器空闲,应立即启动调用start()方法的线程。启动一个线程前,最好为这个线程设置线程的名称。
中断可以理解为线程的一个标志位属性,它表示一个运行中的线程是否被其他线程进行了中断操作。其他线程通过调用该线程的interrupe()方法对其进行中断操作。线程通过检查自身是否被中断来进行响应,线程通过方法isInterrupted()来进行判断是否被中断,也可以调用静态方法Thread.interrupted()对当前线程的中断标识位进行复位。若该线程已经处于终结状态,即使该线程被中断过,在调用该线程对象的isInterrupted()依旧会返回false。从JAVA API可以看到,许多声明抛出InterruptedException的方法在抛出InterruptedException之前,JVM会将该线程的中断标识位清楚,然后抛出InterruptedException,此时调用isInterrupted()方法将返回false。
suspend(),resume()和stop()方法完成了对线程的暂停,恢复和终止操作。但这些API是过期的。不建议使用的原因是在调用这些方法后不会释放已经占有的资源。
可以通过使用一个boolean变量来控制是否需要停止任务并终止该线程
public class Shutdown{ public static void main(String[] args){ Runner one = new Runner(); Thread countThread = new Thread(one, "CountThread"); countThread.start(); TimeUnit.SECONDS.sleep(1); countThread.interrupt(); Runner two = new Runner(); countThread = new Thread(two, "CountThread"); countThread.start(); TimeUnit.SECONDS.sleep(1); two.cancel(); } private static class Runner implements Runnable{ private long i; private volatile boolean on = true; public void run(){ while(on && !Thread.currentThread().isInterrupted()){ i++; } System.out.println("Count i = " + i); } public void cancel(){ on = false; } } }
Java支持多个线程同时访问一个对象或对象的成员变量,由于每个线程可以拥有这个变量的拷贝,所以在程序执行过程中,一个线程看到的变量不一定是最新的。关键字volatile可以用来修饰字段,告知任何程序对该变量的访问均需要从共享内存中获取,而对他的改变必须同步刷新回共享内存, 确保所有线程对变量访问的可见性。关键字synchronized可以修饰方法或以同步块的形式来进行使用,它主要确保多个线程在同一时刻只能有一个线程处于方法或同步块中,保证了线程对变量访问的可见性和排他性。对于同步块的实现使用了monitorenter和monitorexit指令,而同步方法则依靠方法修饰符上的ACC_SYNCHRONIZED来完成。无论哪种方式,本质是对一个对象的监视器获得获取,这个获取是排他的,即同一时刻只能有一个线程获取到由synchronized所保护对象的监视器。任意一个对象都用于自己的监视器,当这个对象由同步块或这个对象的同步方法调用时,执行方法的线程必须获取到该对象的监视器才能进入同步块或同步方法,而没有获取到监视器的线程将会阻塞在同步块和同步方法的入口处,进入BLOCKED状态。
一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应的操作,整个过程开始于一个线程,而最终执行又是另一个线程。前者是生产者,后者是消费者。简单的实现方式是让消费者线程不断地循环检查变量是否符合预期。但这种方法却存在难以确保及时行以及难以降低开销两个问题。
方法名称 | 描述 |
notify() | 通知一个在对象上等待的线程,使其从wait()方法返回,而返回的前提是该线程获取到了对象的锁 |
notifyAll() | 通知所有等待在该对象上的线程 |
wait() | 调用该方法的线程进入WAITING状态,只有等待另外线程的通知或中断才会返回,需要注意,调用wait()方法后,会释放对象的锁 |
wait(long) | 超时等待一段时间,这里的时间单位是毫秒,若等待时间长达n毫秒,若没有通知就超时返回 |
wait(long, int) | 对于超时时间更细粒度的控制,可以达到纳秒 |
等待/通知机制,是指一个线程A调用了对象O的wait()方法进入等待状态,而另一个线程B调用了对象O的notify()或notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进而执行后续操作。对象上的wait()和notify/notifyAll()用来完成等待方和通知方之间的交互工作。
import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.TimeUnit; public class WaitNotify { static boolean flag = true; static Object lock = new Object(); public static void main(String[] args) throws InterruptedException { Thread waitThread = new Thread(new Wait(), "WaitThread"); waitThread.start(); TimeUnit.SECONDS.sleep(1); Thread notifyThread = new Thread(new Notify(), "NotifyThread"); notifyThread.start(); } static class Wait implements Runnable{ @Override public void run() { synchronized (lock) { while (flag) { try{ System.out.println(Thread.currentThread() + " flag is true. wait @ " + new SimpleDateFormat("HH:mm:ss").format(new Date())); lock.wait(); }catch (InterruptedException e) { // TODO: handle exception } } System.out.println(Thread.currentThread() + " flag is false. running @ " + new SimpleDateFormat("HH:mm:ss").format(new Date())); } } } static class Notify implements Runnable{ @Override public void run() { synchronized (lock) { System.out.println(Thread.currentThread() + " hold lock. notify @ " + new SimpleDateFormat("HH:mm:ss").format(new Date())); lock.notifyAll(); flag = false; try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } synchronized (lock) { System.out.println(Thread.currentThread() + " hold lock again. Sleep @ " + new SimpleDateFormat("HH:mm:ss").format(new Date()) ); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } }
使用wait(),notify()以及notifyAll()时需要注意一下细节:
使用wait(),notify()和notifyAll()时需要对调用对象加锁
调用wait()方法后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的等待队列
notify()或notifyAll()方法调用后,等待线程依旧不会从wait()返回,需要调用notif()或notifyAll()的线程释放锁之后,等待线程才有机会从wait()返回
notify()方法将等待队列中的一个等待线程从等待队列中移到同步队列中,而notifyAll()方法则将等待队列中所有线程全部移除到同步队列,被移动的线程状态由WAITING变为BLOCKED
从wait()方法返回的前提是获得了调用对象的锁
WaitThread先获取了对象的锁,然后调用对象的wait()方法,从而放弃了锁并进入了对象的等待队列WaitQueue中,进入等待状态。由于WaitThread释放了对象的锁,NotifyThread随后获取了对象的锁,并调用对象的notify()方法,将WaitThread从WaitQueue移到SynchronizedQueue中,此时WaitThread的状态变为阻塞状态。NotifyThread释放了锁之后,WaitThread再次获取到锁并未从wait()方法返回继续执行,
等待/通知经典范式分为两部分,分别针对等待方和通知方
等待方遵循如下原则:
获取对象的锁
若条件不满足,调用对象的wait()方法,被通知后仍要检查条件
条件满足则执行对应的逻辑
遵循方如下原则:
获得对象的锁
改变条件
通知所有等待在对象上的线程
管道输入/输出流和普通的文件输入/输出流或网络输入/输出流不同之处在于,它主要用于线程之间的数据传输,而传输的媒介为内存。管道输入/输出流主要包括了以下4种具体实现:PipedOutputStream,PipedInputStream,PipedReader和PipedWriter,前两种面向字节,后两种面向字符。对Piped类型的流,必须先要进行绑定,即调用connect()方法, 若没有将输入/输出流绑定起来,对于该流的访问将抛异常。
public class Piped { public static void main(String[] args) throws IOException { PipedWriter out = new PipedWriter(); PipedReader in = new PipedReader(); out.connect(in); Thread printThread = new Thread(new Print(in), "PrintThread"); printThread.start(); int receive = 0; try{ while((receive = System.in.read()) != -1){ out.write(receive); } }finally { out.close(); } } static class Print implements Runnable{ private PipedReader in; public Print(PipedReader in) { this.in = in; } @Override public void run() { int receive = 0; try{ while ((receive = in.read()) != -1) { System.out.println((char)receive); } }catch (IOException e) { // TODO: handle exception } } } }
若一个线程A执行thread.join()语句,其含义是:当前线程A等待thread线程终止之后从thread.join()返回。线程还提供了join(long millis)和join(long millis, int nanos)。这两个超时方法表示,若线程thread在给定的超时时间没有终止,那么将会从该超时方法中返回。当线程终止时,会调用线程自身的notifyAll()方法,会通知所有等待在该线程对象上的线程。
public class Join { public static void main(String[] args) throws InterruptedException { Thread previous = Thread.currentThread(); for(int i = 0; i < 10; i ++){ Thread thread = new Thread(new Domino(previous), String.valueOf(i)); thread.start(); previous = thread; } TimeUnit.SECONDS.sleep(5); System.out.println(Thread.currentThread().getName() + " terminate."); } static class Domino implements Runnable{ private Thread thread; public Domino(Thread thread) { this.thread = thread; } @Override public void run() { try{ thread.join(); }catch (InterruptedException e) { } System.out.println(Thread.currentThread().getName() + " terminate."); } } }
ThreadLocal,即线程变量,是一个以ThreadLocal对象为键,任意对象为值的存储结构。这个结构被附带在线程上,也就是说一个线程可以根据一个ThreadLocal对象查询到绑定在这个线程上的一个值。可以通过set(T)方法来设置一个值,在当前线程下再通过get()方法获取到原先设置的值。
public class Profiler { private static final ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<Long>(){ protected Long initialValue(){ return System.currentTimeMillis(); } }; public static final void begin(){ TIME_THREADLOCAL.set(System.currentTimeMillis()); } public static final long end(){ return System.currentTimeMillis() - TIME_THREADLOCAL.get(); } public static void main(String[] args) throws InterruptedException { Profiler.begin(); TimeUnit.SECONDS.sleep(1); System.out.println("Cost: " + Profiler.end() + " mills"); } }
等待超时模式
假设超时时间段是T,那么可以推断出在当前时间now+T之后会超时。定义如下变量:
等待持续时间: REMAINING = T
超时时间: FUTURE = now + T
此时只需要wait(REMAINING)即可,在wait(REMAINING)返回之后将执行REMAINING = FUTURE - now。若REMAINING <=0,表示REMAINING小于等于0,表示已经超时,直接退出,否则将继续执行wait(REMAINING)。
数据库连接池的简单实现
连接池:通过构造函数初始化连接的最大上限,通过一个双向队列来维护连接,调用方需要先调用fetchConnection(long)方法来指定在多少毫秒内超时获取连接,当连接使用完成之后,需要调用releaseConnection(Connection)方法将连接放回线程池
public class ConnectionPool { private LinkedList<Connection> pool = new LinkedList<>(); public ConnectionPool(int initialSize){ if(initialSize > 0){ for(int i = 0; i < initialSize; i++){ pool.addLast(ConnectionDriver.createConnection()); } } } public void releaseConnection(Connection connection){ if(connection != null){ synchronized (pool) { pool.addLast(connection); pool.notifyAll(); } } } public Connection fetchConnection(long millis) throws InterruptedException{ synchronized (pool) { if(millis < 0){ while (pool.isEmpty()) { pool.wait(); } return pool.removeFirst(); }else{ long future = System.currentTimeMillis() + millis; long remaining = millis; while (pool.isEmpty() && remaining > 0) { pool.wait(remaining); remaining = future - System.currentTimeMillis(); } Connection result = null; if (!pool.isEmpty()) { result = pool.removeFirst(); } return result; } } } } public class ConnectionDriver { static class ConnectionHandler implements InvocationHandler{ public Object invoke(Object proxy, Method method, Object[] args) throws InterruptedException{ if(method.getName().equals("commit")){ TimeUnit.MICROSECONDS.sleep(100); } return null; } } public static final Connection createConnection(){ return (Connection) Proxy.newProxyInstance(ConnectionDriver.class.getClassLoader(), new Class<>[]{Connection.class}, new ConnectionHandler()); } } public class ConnectionPoolTest { static ConnectionPool pool = new ConnectionPool(10); static CountDownLatch start = new CountDownLatch(1); static CountDownLatch end; public static void main(String[] args) throws InterruptedException { int threadCount = 10; end = new CountDownLatch(threadCount); int count = 20; AtomicInteger got = new AtomicInteger(); AtomicInteger notGot = new AtomicInteger(); for(int i = 0; i < threadCount; i++){ Thread thread = new Thread(new ConnectionRunner(count, got, notGot), "ConnectionRunnerThread"); thread.start(); } start.countDown(); end.wait(); System.out.println("total invoke : " + (threadCount * count)); System.out.println("got connection : " + got); System.out.println("not got connection : " + notGot); } static class ConnectionRunner implements Runnable{ int count; AtomicInteger got; AtomicInteger notGot; public ConnectionRunner(int count, AtomicInteger got, AtomicInteger notGot) { this.count = count; this.got = got; this.notGot = notGot; } @Override public void run() { try{ start.wait(); }catch (Exception e) { } while(count > 0){ try { Connection connection = pool.fetchConnection(1000); if(connection != null){ try { connection.createStatement(); connection.commit(); } finally { pool.releaseConnection(connection); got.incrementAndGet(); } }else { notGot.incrementAndGet(); } } catch (Exception e) { e.printStackTrace(); }finally { count --; } } end.countDown(); } } }
线程池
public interface ThreadPool<Job extends Runnable>{ void execute(Job job); // 执行一个Job void shutdown(); // 关闭线程池 void addWorkers(int num); //增加工作者线程 void removeWorker(int num); //减少工作者线程 int getJobSize(); //得到正在等待执行任务的数量 }
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> { private static final int MAX_WORKER_NUMBERS = 10; private static final int DEFAULT_WORKER_NUMBERS = 5; private static final int MIN_WORKER_NUMBRRS = 1; private final LinkedList<Job> jobs = new LinkedList<>(); private final List<Worker> workers = Collections.synchronizedList(new ArrayList<>()); private int workerNum = DEFAULT_WORKER_NUMBERS; private AtomicLong threadNum = new AtomicLong(); public DefaultThreadPool() { initializeWorkers(DEFAULT_WORKER_NUMBERS); } public DefaultThreadPool(int num) { workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBRRS ? MIN_WORKER_NUMBRRS : num; initializeWorkers(DEFAULT_WORKER_NUMBERS); } public void execute(Job job) { if (job != null) { synchronized (jobs) { jobs.addLast(job); jobs.notify(); } } } public void shutdown() { for (Worker worker : workers) { worker.shutdown(); } } public void addWorkers(int num) { synchronized (jobs) { if (num + this.workerNum > MAX_WORKER_NUMBERS) { num = MAX_WORKER_NUMBERS - this.workerNum; } initializeWorkers(num); this.workerNum += num; } } public void removeWorker(int num) { synchronized (jobs) { if (num >= this.workerNum) { throw new IllegalArgumentException("beyond workNum"); } } int count = 0; while (count < num) { Worker worker = workers.get(count); if (workers.remove(worker)) { worker.shutdown(); count++; } } this.workerNum -= count; } public int getJobSize() { return jobs.size(); } private void initializeWorkers(int num) { for(int i = 0; i < num; i++){ Worker worker = new Worker(); workers.add(worker); Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet()); thread.start(); } } class Worker implements Runnable{ private volatile boolean running = true; public void run(){ while (running) { Job job = null; synchronized (jobs) { while(jobs.isEmpty()){ try { jobs.wait(); } catch (Exception e) { Thread.currentThread().interrupt(); return; } } job = jobs.removeFirst(); } if(job != null){ try { job.run(); } catch (Exception e) { // TODO: handle exception } } } } public void shutdown(){ running = false; } } }
当客户端调用execute(Job)方法时,会不断向任务列表jobs中添加Job,而每个工作者线程会不断地从jobs上取出一个Job进行执行,当jobs为空时,工作者线程进入等待状态。添加一个Job后,对工作队列调用了其notify()方法,而不是notifyAll()方法。这样可以确保有工作者线程被唤醒,用notify()会比notifyAll()方法减少开销。
public class SimpleHttpServer { static TheadPool<HttpRequestHandler> threadPool = new DefaultThreadPool<>(1); static String basePath; static ServerSocket serverSocket; static int port = 8080; public static void setPort(int port){ if(port > 0){ SimpleHttpServer.port = port; } } public static void setBasePath(String basePath){ if(basePath != null && new File(basePath).exists() && new File(basePath).isDirectory()){ SimpleHttpServer.basePath = basePath; } } public static void start() throws Exception{ serverSocket = new ServerSocket(port); Socket socket = null; while((socket = serverSocket.accept()) != null){ threadPool.execute(new HttpRequestHandler(socket)); } serverSocket.close(); } static class HttpRequestHandler implements Runnable{ private Socket socket; public public HttpRequestHandler(Socket socket) { this.socket = socket; } @Override public void run() { String line = null; BufferedReader br = null; BufferedReader reader = null; PrintWriter out = null; InputStream in = null; try { reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); String header = reader.readLine(); String filePath = basePath + header.split(" ")[1]; out = new PrintWriter(socket.getOutputStream()); if(filePath.endsWith("jpg") || filePath.endsWith("ico")){ in = new FileInputStream(filePath); ByteArrayOutputStream baos = new ByteArrayOutputStream(); int i = 0; while ((i = in.read()) != -1) { baos.write(i); } byte[] array = baos.toByteArray(); out.println("HTTP/1.1 200 OK"); out.println("Server : Molly"); out.println("Content-Type: image/jpeg"); out.println("Content-Length: " + array.length); out.println(""); socket.getOutputStream().write(array, 0, array.length); }else{ br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath))); out = new PrintWriter(socket.getOutputStream()); out.println("HTTP/1.1 200OK"); out.println("Server: Molly"); out.println("Content-Type: text/html; charset=UTF-8"); out.print(""); while ((line = br.readLine()) != null) { out.println(line); } } out.flush(); } catch (Exception e) { out.println("HTTP/1.1 500"); out.println(""); out.flush(); }finally { close(br, in ,reader, out, socket); } } } private static void以上是关于Java并发基础的主要内容,如果未能解决你的问题,请参考以下文章 [Go] 通过 17 个简短代码片段,切底弄懂 channel 基础