Java高级特性系列--Concurrent

Posted coder为

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java高级特性系列--Concurrent相关的知识,希望对你有一定的参考价值。

转载自 http://www.cnblogs.com/skywang12345/p/java_threads_category.html

JUC:java.util.concurrent

一,JUC原子类

根据修改的数据类型,可以将JUC包中的原子操作类可以分为4类。

1. 基本类型: AtomicInteger, AtomicLong, AtomicBoolean ;
2. 数组类型: AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray ;
3. 引用类型: AtomicReference, AtomicStampedRerence, AtomicMarkableReference ;
4. 对象的属性修改类型: AtomicIntegerFieldUpdater, AtomicLongFieldUpdater, AtomicReferenceFieldUpdater 。

这些类存在的目的是对相应的数据进行原子操作。所谓原子操作,是指操作过程不会被中断,保证数据操作是以原子方式进行的。基本思想都是使用unsafe类的CAS函数实现原子操作的。

比较并交换(compare and swap, CAS),是原子操作的一种,可用于在多线程编程中实现不被打断的数据交换操作,从而避免多线程同时改写某一数据时由于执行顺序不确定性以及中断的不可预知性产生的数据不一致问题。 该操作通过将内存中的值与指定数据进行比较,当数值一样时将内存中的数据替换为新的值。

 

AtomicLong:

AtomicLong是作用是对长整形进行原子操作。
在32位操作系统中,64位的long 和 double 变量由于会被JVM当作两个分离的32位来进行操作,所以不具有原子性。而使用AtomicLong能让long的操作保持原子型。

示例:

package com.util.concurrent.atomic;

import java.util.concurrent.atomic.AtomicLong;

public class AtomicLongTest {

    public static void main(String[] args) {
        // TODO Auto-generated method stub

        AtomicLong mAtoLong = new AtomicLong();
        mAtoLong.set(0x0123456789ABCDEFL);
        System.out.println("toString()" + mAtoLong.toString());
        System.out.println("get()" + mAtoLong.get());
        System.out.println("intValue()" + mAtoLong.intValue());
        System.out.println("longValue()" + mAtoLong.longValue());
        System.out.println("incrementAndGet()" + mAtoLong.incrementAndGet());
        System.out.println("getAndIncrement()" + mAtoLong.getAndIncrement());
        System.out.println("decrementAndGet()" + mAtoLong.decrementAndGet());
        System.out.println("getAndDecrement()" + mAtoLong.getAndDecrement());
        System.out.println("get()" + mAtoLong.get());
        System.out.println("addAndGet()" + mAtoLong.addAndGet(0x10));
        System.out.println("getAndAdd()" + mAtoLong.getAndAdd(0x10));
        System.out.println("getAndSet()" + mAtoLong.getAndSet(0x0123456789ABCDEEL));
        System.out.println("compareAndSet()" + mAtoLong.compareAndSet(0x0123456789ABCDEEL, 0x0123456789ABCDEDL));
        System.out.println("get()" + mAtoLong.get());
    }

}

结果:

toString()81985529216486895
get()81985529216486895
intValue()-1985229329
longValue()81985529216486895
incrementAndGet()81985529216486896
getAndIncrement()81985529216486896
decrementAndGet()81985529216486896
getAndDecrement()81985529216486896
get()81985529216486895
addAndGet()81985529216486911
getAndAdd()81985529216486911
getAndSet()81985529216486927
compareAndSet()true
get()81985529216486893

 

AtomicLongArray:

AtomicLong的作用是对长整型进行原子操作,而AtomicLongArray的作用是对长整型数组进行原子操作。

示例:

package com.util.concurrent.atomic;

import java.util.concurrent.atomic.AtomicLongArray;

public class AtomicLongArrayTest {

    public static void main(String[] args) {
        // TODO Auto-generated method stub

        long[] longArray = new long[] {10, 20, 30, 40, 50};
        AtomicLongArray array = new AtomicLongArray(longArray);
        System.out.println("array:" + array.get(3));
        System.out.println("array.addAndGet:" + array.addAndGet(3, 8));
        System.out.println("array.incrementAndGet:" + array.incrementAndGet(3));
        System.out.println("array.decrementAndGet:" + array.decrementAndGet(4));
        System.out.println("array.getAndIncrement:" + array.getAndIncrement(2));
        System.out.println("array.getAndDecrement:" + array.getAndDecrement(1));
        System.out.println("array.getAndDecrement:" + array.getAndSet(3, 9));
        
    }

}

结果: 

array:40
array.addAndGet:48
array.incrementAndGet:49
array.decrementAndGet:49
array.getAndIncrement:30
array.getAndDecrement:20
array.getAndDecrement:49

 

AtomicReference:

AtomicReference是作用是对"对象"进行原子操作。

package com.util.concurrent.atomic;

import java.util.concurrent.atomic.AtomicReference;

public class AtomicReferenceTest {

    public static void main(String[] args) {
        // TODO Auto-generated method stub

        Person p1 = new Person(101);
        Person p2 = new Person(102);
        
        AtomicReference<Person> ap = new AtomicReference<Person>(p1);
        ap.compareAndSet(p1, p2);
        
        Person p3 = ap.get();
        System.out.println("p3 is " + p3);
        System.out.println("p3.equals(p1)=" + p3.equals(p1));
        System.out.println("p3.equals(p2)=" + p3.equals(p2));
    }

}

class Person {
    volatile long id;
    public Person(long id) {
        this.id = id;
    }
    
    public String toString() {
        return "id:" + id;
    }
}

结果: 

p3 is id:102
p3.equals(p1)=false
p3.equals(p2)=true

 

AtomicLongFieldUpdater:

AtomicLongFieldUpdater可以对指定"类的 \'volatile long\'类型的成员"进行原子更新。它是基于反射原理实现的。

示例:

package com.util.concurrent.atomic;

import java.util.concurrent.atomic.AtomicLongFieldUpdater;

public class AtomicLongFieldUpdaterTest {

    public static void main(String[] args) {
        // TODO Auto-generated method stub

        Class cls = Person2.class;
        AtomicLongFieldUpdater updater = AtomicLongFieldUpdater.newUpdater(cls, "id");
        Person2 person = new Person2(1234567l);
        
        updater.compareAndSet(person, 1234567l, 7654321l);
        System.out.println("id:" + person.getId());
    }

}

class Person2 {
    volatile long id;
    public Person2(long id) {
        this.id = id;
    }
    
    public void setId(long id) {
        this.id = id;
    }
    
    public long getId() {
        return id;
    }
    
    public String toString() {
        return "id:" + id;
    }
}

结果: 

id:7654321

 

二,JUC锁

同步锁:

即通过synchronized关键字来进行同步,实现对竞争资源的互斥访问的锁。Java 1.0版本中就已经支持同步锁了。

  同步锁的原理是,对于每一个对象,有且仅有一个同步锁;不同的线程能共同访问该同步锁。但是,在同一个时间点,该同步锁能且只能被一个线程获取到。这样,获取到同步锁的线程就能进行CPU调度,从而在CPU上执行;而没有获取到同步锁的线程,必须进行等待,直到获取到同步锁之后才能继续运行。这就是,多线程通过同步锁进行同步的原理!

 

JUC包中的锁:

相比同步锁,JUC包中的锁的功能更加强大,它为锁提供了一个框架,该框架允许更灵活地使用锁,只是它的用法更难罢了。

  JUC包中的锁,包括:Lock接口,ReadWriteLock接口,LockSupport阻塞原语,Condition条件,AbstractOwnableSynchronizer/AbstractQueuedSynchronizer/AbstractQueuedLongSynchronizer三个抽象类,ReentrantLock独占锁,ReentrantReadWriteLock读写锁。由于CountDownLatch,CyclicBarrier和Semaphore也是通过AQS来实现的;因此,我也将它们归纳到锁的框架中进行介绍。

  先看看锁的框架图,如下所示。

 

ReentrantLock

ReentrantLock是一个可重入的互斥锁,又被称为“独占锁”。

ReentrantLock锁在同一个时间点只能被一个线程所持有;可重入的意思是,ReentrantLock可以被单个线程多次获取。ReentrantLock分为“公平锁”和“非公平锁”。它们的区别体现在获取锁的机制上是否公平,ReentraantLock是通过一个FIFO的等待队列来管理获取该锁所有线程的。在“公平锁”的机制下,线程依次排队获取锁;而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁。

示例:

package com.util.concurrent;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockTest {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Depot mDepot = new Depot();
        Producer2 mPro = new Producer2(mDepot);
        Customer2 mCus = new Customer2(mDepot);

        mPro.produce(60);
        mPro.produce(120);
        mCus.consume(90);
        mCus.consume(150);
        mPro.produce(110);
    }

}

class Depot { 
    private int size;        // 仓库的实际数量
    private Lock lock;        // 独占锁

    public Depot() {
        this.size = 0;
        this.lock = new ReentrantLock();
    }

    public void produce(int val) {
        lock.lock();
        try {
            size += val;
            System.out.printf("%s produce(%d) --> size=%d\\n", 
                    Thread.currentThread().getName(), val, size);
        } finally {
            lock.unlock();
        }
    }

    public void consume(int val) {
        lock.lock();
        try {
            size -= val;
            System.out.printf("%s consume(%d) <-- size=%d\\n", 
                    Thread.currentThread().getName(), val, size);
        } finally {
            lock.unlock();
        }
    }
}; 

// 生产者
class Producer2 {
    private Depot depot;
    
    public Producer2(Depot depot) {
        this.depot = depot;
    }

    // 消费产品:新建一个线程向仓库中生产产品。
    public void produce(final int val) {
        new Thread() {
            public void run() {
                depot.produce(val);
            }
        }.start();
    }
}

// 消费者
class Customer2 {
    private Depot depot;
    
    public Customer2(Depot depot) {
        this.depot = depot;
    }

    // 消费产品:新建一个线程从仓库中消费产品。
    public void consume(final int val) {
        new Thread() {
            public void run() {
                depot.consume(val);
            }
        }.start();
    }
}

结果: 

Thread-0 produce(60) --> size=60
Thread-1 produce(120) --> size=180
Thread-2 consume(90) <-- size=90
Thread-3 consume(150) <-- size=-60
Thread-4 produce(110) --> size=50

因为不符合实际情况,出现了size = -60的情况,所以需要加condition来作限定。

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;

// LockTest3.java
// 仓库
class Depot {
    private int capacity;    // 仓库的容量
    private int size;        // 仓库的实际数量
    private Lock lock;        // 独占锁
    private Condition fullCondtion;            // 生产条件
    private Condition emptyCondtion;        // 消费条件

    public Depot(int capacity) {
        this.capacity = capacity;
        this.size = 0;
        this.lock = new ReentrantLock();
        this.fullCondtion = lock.newCondition();
        this.emptyCondtion = lock.newCondition();
    }

    public void produce(int val) {
        lock.lock();
        try {
             // left 表示“想要生产的数量”(有可能生产量太多,需多此生产)
            int left = val;
            while (left > 0) {
                // 库存已满时,等待“消费者”消费产品。
                while (size >= capacity)
                    fullCondtion.await();
                // 获取“实际生产的数量”(即库存中新增的数量)
                // 如果“库存”+“想要生产的数量”>“总的容量”,则“实际增量”=“总的容量”-“当前容量”。(此时填满仓库)
                // 否则“实际增量”=“想要生产的数量”
                int inc = (size+left)>capacity ? (capacity-size) : left;
                size += inc;
                left -= inc;
                System.out.printf("%s produce(%3d) --> left=%3d, inc=%3d, size=%3d\\n", 
                        Thread.currentThread().getName(), val, left, inc, size);
                // 通知“消费者”可以消费了。
                emptyCondtion.signal();
            }
        } catch (InterruptedException e) {
        } finally {
            lock.unlock();
        }
    }

    public void consume(int val) {
        lock.lock();
        try {
            // left 表示“客户要消费数量”(有可能消费量太大,库存不够,需多此消费)
            int left = val;
            while (left > 0) {
                // 库存为0时,等待“生产者”生产产品。
                while (size <= 0)
                    emptyCondtion.await();
                // 获取“实际消费的数量”(即库存中实际减少的数量)
                // 如果“库存”<“客户要消费的数量”,则“实际消费量”=“库存”;
                // 否则,“实际消费量”=“客户要消费的数量”。
                int dec = (size<left) ? size : left;
                size -= dec;
                left -= dec;
                System.out.printf("%s consume(%3d) <-- left=%3d, dec=%3d, size=%3d\\n", 
                        Thread.currentThread().getName(), val, left, dec, size);
                fullCondtion.signal();
            }
        } catch (InterruptedException e) {
        } finally {
            lock.unlock();
        }
    }

    public String toString() {
        return "capacity:"+capacity+", actual size:"+size;
    }
}; 

// 生产者
class Producer {
    private Depot depot;
    
    public Producer(Depot depot) {
        this.depot = depot;
    }

    // 消费产品:新建一个线程向仓库中生产产品。
    public void produce(final int val) {
        new Thread() {
            public void run() {
                depot.produce(val);
            }
        }.start();
    }
}

// 消费者
class Customer {
    private Depot depot;
    
    public Customer(Depot depot) {
        this.depot = depot;
    }

    // 消费产品:新建一个线程从仓库中消费产品。
    public void consume(final int val) {
        new Thread() {
            public void run() {
                depot.consume(val);
            }
        }.start();
    }
}

public class LockTest3 {  
    public static void main(String[] args) {  
        Depot mDepot = new Depot(100);
        Producer mPro = new Producer(mDepot);
        Customer mCus = new Customer(mDepot);

        mPro.produce(60);
        mPro.produce(120);
        mCus.consume(90);
        mCus.consume(150);
        mPro.produce(110);
    }
}

结果:

Thread-0 produce( 60) --> left=  0, inc= 60, size= 60
Thread-1 produce(120) --> left= 80, inc= 40, size=100
Thread-2 consume( 90) <-- left=  0, dec= 90, size= 10
Thread-3 consume(150) <-- left=140, dec= 10, size=  0
Thread-4 produce(110) --> left= 10, inc=100, size=100
Thread-3 consume(150) <-- left= 40, dec=100, size=  0
Thread-4 produce(110) --> left=  0, inc= 10, size= 10
Thread-3 consume(150) <-- left= 30, dec= 10, size=  0
Thread-1 produce(120) --> left=  0, inc= 80, size= 80
Thread-3 consume(150) <-- left=  0, dec= 30, size= 50

 

以上是关于Java高级特性系列--Concurrent的主要内容,如果未能解决你的问题,请参考以下文章

Java高级特性系列--多线程

SpringCloud Alibaba系列Dubbo高级特性篇

Java高级特性 第7节 多线程

juc线程高级特性——CountDownLatch / Callable / Lock

廖雪峰Java11多线程编程-3高级concurrent包-1ReentrantLock

Java并发编程系列 concurrent包概览