高并发学习 —— 集合线程安全线程池
Posted Johnny*
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了高并发学习 —— 集合线程安全线程池相关的知识,希望对你有一定的参考价值。
高并发学习(三)
线程安全的集合
ArrayList
List list = new ArrayList();线程不安全 会发生ConcurrentModificationException异常
解决方案:
1、vector线程安全。List list = new Vector<>();
2、使用工具类,将ArrayList转为线程安全的List list = Collections.synchronizedList(new ArrayList<>());
3、使用JUC 下的CopyOnWriteArrayList。该List采用写时加锁然后复制,实现线程安全
package UnSafeDataStructure;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class ArrayListDemo {
public static void main(String[] args) {
List<Integer> list = Collections.synchronizedList(new ArrayList<>());
for (int i = 0; i < 50; i++) {
new Thread(()->{
list.add((int) (Math.random() * 1000 ));
System.out.println(list);
}).start();
}
}
}
HashSet
Set set = new HashSet<>();线程不安全 会发生ConcurrentModificationException异常
解决方案:
1、 使用JUC包下的CopyOnWriteArraySet 。 Set set = new CopyOnWriteArraySet<>();
2、 使用Collections工具类 ,syn方法返回一个线程安全的集合。Set set = Collections.synchronizedSet(new HashSet<>);
package UnSafeDataStructure;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
public class HashSetDemo {
public static void main(String[] args) {
Set<Integer> set = Collections.synchronizedSet(new HashSet<>());
for (int i = 0; i < 50; i++) {
new Thread(()->{
set.add( (int) (Math.random() * 1000 ) );
try {
System.out.println(set);
// System.out.println(Thread.currentThread().getName()+" "+set);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, i +" ").start();
}
}
}
HashMap
Map<Integer,Integer> map = new HashMap<>();线程不安全,
解决方案:
1、 使用JUC包下的ConcurrentHashMap,Map<Integer,Integer> map = new ConcurrentHashMap<>();
2、 使用工具类转换。Map<Integer,Integer> map = Collections.synchronizedMap(new HashMap<>());
package UnSafeDataStructure;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class HashMapDemo {
static int time []= {10,20,30,40,50,60,70,80,90,100};
public static void main(String[] args) {
Map<Integer,Integer> map = Collections.synchronizedMap(new HashMap<>());
for (int i = 0; i < 50; i++) {
new Thread(()->{
map.put((int) (Math.random() * 1000 ) , (int) (Math.random() * 1000 ));
/*
若增加打印语句,或者使用线程休眠。则可能由于延时使得线程得以先后执行
*/
// try {
// Thread.sleep((int) (Math.random() * 100 ));
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
//System.out.println( Thread.currentThread().getName());
System.out.println(map);
}, "" + i).start();
}
}
}
常用辅助类
CountDownLatch
CountDownLatch
官网例子:
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();
doSomethingElse(); // don't let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}
package SynAid;
import java.util.concurrent.CountDownLatch;
public class LatchDemo {
public static void main(String[] args) throws InterruptedException {
//1、创建计数器
CountDownLatch downLatch = new CountDownLatch(6);
for (int i = 0; i < 5; i++) {
new Thread(()->{
System.out.println("【线程"+Thread.currentThread().getName()+"】 让计数器减1");
downLatch.countDown();
}, ""+i).start();
}
//2、若计数器不为0 则会在这里阻塞
downLatch.await();
System.out.println("计数器已减至0");
}
}
CyclicBarrier
官网例子
class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;
class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow);
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}
public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
Runnable barrierAction = () -> mergeRows(...);
barrier = new CyclicBarrier(N, barrierAction);
List<Thread> threads = new ArrayList<>(N);
for (int i = 0; i < N; i++) {
Thread thread = new Thread(new Worker(i));
threads.add(thread);
thread.start();
}
// wait until done
for (Thread thread : threads)
thread.join();
}
}
package SynAid;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
// 若没有达到设置的parties,Runnable方法不执行
CyclicBarrier barrier = new CyclicBarrier(6, () -> {
System.out.println("计数器到达6");
});
for (int i = 0; i < 5; i++) {
new Thread(()->{
try {
System.out.println("【线程"+Thread.currentThread().getName()+"】 让计数器加1");
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, ""+i).start();
}
}
}
Semaphore
acquire请求获取permit
release释放掉获取的permit。
未得到permit的线程只能等待
package SynAid;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
public static void main(String[] args) {
//创建2个permit
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 4; i++) {
final int tag = i;
new Thread(()->{
try {
//请求获取permit
semaphore.acquire();
System.out.println("【线程"+Thread.currentThread().getName()+"】获取到信号量");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("【线程"+Thread.currentThread().getName()+"】释放信号量");
//释放掉持有的permit
semaphore.release();
}
}, "" +tag ).start();
}
}
}
读写锁
读写锁的特点是:
(1) 读的时候允许多个线程同时读。由读锁(也叫共享锁)控制。
(2)写的时候只允许一个线程写。由写锁(也叫独享锁)控制。
package RWLock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class RWLockDemo {
public static void main(String[] args) {
Data data = new Data();
for(int i = 0; i < 4; i ++){
new Thread(()->{
data.write();
data.read();
}, i + "").start();
}
}
}
class Data{
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
public void write(){
Lock lock = readWriteLock.writeLock();
lock.lock();
try {
System.out.println("【线程"+Thread.currentThread().getName()+"】 正在写");
}finally {
System.out.println("【线程"+Thread.currentThread().getName()+"】 写完毕");
lock.unlock();
}
}
public void read(){
Lock lock = readWriteLock.readLock();
lock.lock();
try {
System.out.println("【线程"+Thread.currentThread().getName()+"】 正在读");
}finally {
lock.unlock();
}
}
}
执行结果
【线程0】 正在写
【线程0】 写完毕
【线程3】 正在写
【线程3】 写完毕
【线程1】 正在写
【线程1】 写完毕
【线程2】 正在写
【线程2】 写完毕
【线程1】 正在读
【线程2】 正在读
【线程0】 正在读
【线程3】 正在读
阻塞队列
package BQueueDemo;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class BlockQueueDemo {
/*
抛出异常的增加元素和获取元素
增加超出容量:以上是关于高并发学习 —— 集合线程安全线程池的主要内容,如果未能解决你的问题,请参考以下文章
Java核心技术读书笔记10-2 阻塞队列线程安全集合类Callable与Future线程池与任务组同步框架
java并发编程:管程内存模型无锁并发线程池AQS原理与锁线程安全集合类并发设计模式