Java编程思想-并发

Posted vanpersie_9987

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java编程思想-并发相关的知识,希望对你有一定的参考价值。

性能调优

通过前几篇的介绍,我们知道JDK1.5中的java.util.concurrent包中存在大量的辅助并用于提升性能的类。本文将着重讨论并比较这个包中的部分类之间的性能。

比较各类互斥技术

首先看看老式的同步机制synchronized关键字和JDK1.5中新的Lock和Atomic类之间的性能差异和适用范围:

abstract class Incrementable 
    protected long counter = 0;
    public abstract void increment();


//使用synchronized同步
class SynchronizingTest extends Incrementable 
    public synchronized void increment() 
        ++counter;
    


//使用Lock同步
class LockKingTest extends Incrementable 
    private Lock lock = new ReentrantLock();
    public void increment() 
        lock.lock();
        try 
            ++counter;
         finally 
            lock.unlock();
        

    


public class SimpleMicroBenchmark 
    public long test(Incrementable incr) 
        long start = System.nanoTime();
        //循环一千万次,以比较性能
        for(long i = 0; i < 10000000L; ++i) 
            incr.increment();
        
        return System.nanoTime() - start;
    
    public static void main(String[] args) 
        long synchTime = test(new SynchronizingTest());
        long lockTime = test(new LockingTest());
        System.out.print("synchronized: %1$20d\\n", synchTime);
        System.out.print("Lock:         %1$10d\\n", lockTime);
        System.out,print("Lock/synchronized = %1$.3f", (double)lockTime / (double)synchTime);
    

//输出:
synchronized:  244919117
Lock:          939098964
Lock/synchronized = 3.834

从输出可以看出,使用synchronized同步比Lock快得多,前者的速度是后者的将近4倍。

但是,上述测试并不全面,它存在大量问题,首先,真实环境中应该包含多个任务同时竞争一把锁的情况;其次,当编译器看到synchronized关键字时,可能会进行优化,因为编译器意识到这个程序是单线程的,而且counter的递增数量的固定的。所以会预先计算出结果。当然还有其他问题,比如不同编译器可能有不同的结果等等。

为了创建有效的测试,应该创建更加复杂的测试程序。首先需要多个任务,以模拟多个线程并发。其次这些任务应该有读有写。另外计算必须足够复杂,使得编译器没机会执行优化。这可以通过预加载一个大型的随机int数组,并在计算总和时使用它们来实现:

abstract class Accumulator 
    public static long cycles = 50000L;
    private static final int N = 4;
    public static ExecutorService exec = Executors.newFixedThreadPool(N * 2);
    private static CyclicBarrier barrier = new CyclicBarrier(N * 2 + 1);
    protected volatile int index = 0;
    protected volatile long value = 0;
    protected long duration = 0;
    protected String id = "error";
    protected final static int SIZE = 100000;
    protected static int[] preLoaded = new int[SIZE];
    static 
        Random random = new Random(47);
        for(int i = 0;i < SIZE; ++i) 
            preLoaded[i] = random.nextInt();

        

    
    public abstract void accumulate();
    public abstract long read();
    private class Modifier implements Runnable 
        public void run() 
            for(int i = 0; i < cycles; ++i) 
                accumulate();
            
            try 
                barrier.await();
             catch(Exception e) 
                throw new RuntimeException(e);
            
        
    
    private class Reader implements Runnable 
        private volatile long value;
        public void run() 
            for(int i = 0; i < cycles; ++i) 
                value = read();
            
            try 
                barrier.await();
             catch(Exception e) 
                throw new RuntimeException(e);
            
        
     
    public void timedTest 
        long start = System.nanoTime();
        for(int i = 0; i < N; ++i) 
            exec.execute(new Modifier());
            exec.execute(nre Reader());
        
        try 
            barrier.await();

         catch(Exception e) 
            throw new RuntimeException(e);
        
        duration = System.nanoTime() - start;
        System.out.print("%-13s: %13d\\n", id, duration);
     
    public static void report(Accumulator acc1, Accumulator acc2) 
        System.out.print("%-22s: %.2f\\n", acc.id + "/" + acc2.id, (double)acc1.duration / (double)acc2.duration);
    

class BaseLine extends Accumulator 
    
        id = "BaseLine";
    
    public void accumulate() 
        value += preLoaded[index++];
        if(index >= SIZE) 
            index = 0;
        
    
    public long read() 
        return value;
    

class SynchronizedTest extends Accumulator 
    
        id = "synchronized";
    
    public synchronized void accumulaie() 
        value += preLoaded[index++];
        if(index >= SIZE) 
            index = 0;
        
    
    public synchronized long read() 
        return value;
    


class LockTest extends Accumulator 
    
        id = "Lock";

    
    private Lock lock = new ReetrantLock();
    public void accumulate() 
        lock.lock();
        try 
            value += preLoaded[index++];
            if(index >= SIZE) 
                index = 0;
            
         finally 
            lock.unlock();
        
    
    public long read() 
        lock.lock();
        try 
            return value;
         finally 
            lock.unlock();
        
    



class AtomicTest extends Acuumulator 
    
        id = "Atomic";
    
    private AtomicInteger index = new AtomicInteger(0);
    private AtomicLong value = new AtomicLong(0);
    public void accumulate() 
        int i = index.getAndIncrement();
        value.getAndAdd(preLoaded[i]);
        if(++i >= SIZE) 
            index.set(0);
        
    
    public long read() 
        return value.get();
    


public class SynchronizationComparisons 
    static BaseLine baseLine = new BaseLine();
    static SynchronizedTest synch = new SynchronizedTest();
    static LockTest lock = new LockTest();
    static AtomicTest atomic = new AtomicTest();
    static void test() 
        System.out.print("===================");
        System.out.printf("%-12s : %13d\\n", "Cycles", Accumulator.cycles);
        baseLine.timedTest();
        synch.timedTest();
        lock.timedTest();
        atomic.timedTest();
        Accumulator.report(synch, baseLine);
        Accumulator.report(lock, baseLine);
        Accumulator.report(atomic, baseLine);
        Accumulator.report(synch, atomic);
        Accumulator.report(lock, atomic);
    
    public static void main(String[] args) 
        int iterations = 5;
        if(args.length > 0) 
            iterations = new Integer(args[0]);

        
        System.out.print("Warmup");
        baseLine.timedTest();
        for(int i = 0; i < iterations; ++i) 
            test();
            Accumulator.cycles *= 2;
        
        Accumulator.exec.shutdown();
    
//输出

程序使用accumulate()和read()方法表达了实现互斥的不同方式。

为了保证测试效果,初始测试执行了两次,第一次的结果被丢弃,因为它包含了初始线程的创建。

程序中的CyclicBarrier保证了所有的任务在声明每个测试完成之前都已经完成。

每次调用accumulate()时,它都会移动到preLoad数组的下一个位置,到达末尾时再回到开始位置,并将这个位置的随机生成的数字加到value上多个Modifier和Reader任务提供了在Accumulator对象上的竞争。

从输出来看,开始的四次synchronized看起来比使用Lock和Atomic高效,但是之后synchronized就变得低效了。

结论是:Lock比synchronized效率高很多,而且后者变化范围大。

最后需要说一点,在并发程序中使用Atomic类时,只有在非常简单的情况下才有用,这些情况通常包括只有一个要被修改的Atomic对象,并且这个对象独立于其他所有的对象。更安全的做法是:以更加传统的互斥方式入手,只有在性能方面的需求能够明确指示时,再替换为Atomic。

免锁容器

JDK1.5中添加了新的容器,通过使用更加灵巧的技术来消除加锁(Collections中提供了很多加同步的静态方法,但它们底层实现仍是基于synchronized的,而更早期的线程安全类Vector或者HashTable,同样是基于synchronized,这使得它们在多线程环境下的工作效率非常低,但是新的容器类避免了这些缺点),这些免锁容器背后的通用策略是:对容器的修改可以与读取操作同时发生,而这是在保证读取者能看到正确的修改结果的前提下进行的。修改是在容器数据结构的某个部分的一个单独副本(有时是整个数据结构的副本)上执行的,并且这个副本在修改过程中是不可视的。只有当修改完成时,被修改的结构才回自动地与主数据进行交换,之后读取者就可以看到这个修改了。

这些免锁容器包括ConcurrentHashMap和ConcurrentLinkedQueue,允许并发的读取和写入,但是容器中只有部分内容而不是整个容器可以被复制和修改。然而,任何修改在完成之前,读取者仍旧不能看它们。

乐观锁

只要从免锁容器中读取,那么它就会比synchronized方式实现的同步效率高很多。因为获取和释放锁的开销被省掉了。如果向免锁容器中执行少量写入,情况也是如此(依然省去了获取和释放锁的步骤)。但是这个所谓的“少量”该如何界定?本节将介绍在不同条件下,这些容器在性能方面的大致概念。

public abstract class Tester<C> 
    static int testReps = 10;
    static int testCycles = 1000;
    static int containersSize = 1000;
    abstract C containerInitializer();
    abstract void startReadersAndWriters();
    C testContainer;
    String testId;
    int nReaders;
    int nWriters;
    volatile long readResult = 0;
    volatile long readTime = 0;
    volatile long writerTime = 0;
    CountDownLatch endLatch;
    static ExecutorService exec = Executors.newCachedThreadPool();
    Integer[] writeData;
    Tester(String testId, int nReaders, int nWriters) 
        this.testId = testId + " " + nReaders + "r " + nWriters + "w";
        this.nReaders = nReaders;
        this.nWriters = nWriters;
        writeData = Generated.array(Interger.class, new RandomGenerator.Integer(), containerSize);
        for(int i = 0;i < testReps; ++i) 
            runTest();
            readTime = 0;
            writeTime = 0;
        
    
    void runTest() 
        endLatch  = new CountDownLatch(nReaders + nWriters);
        testContainer = ContainerInitializer();
        startReadersAndWriters();
        try 
            endLatch.await();

         catch(InterruptedException e) 
            System.out.println("endLatch interrupted");
        
        System.out.printf("%-27s %14d %14d\\n", testId, readTime, writeTime);
        if(readTime != 0 && writeTime != 0) 
            System.out.printf("%-27s %14d\\n", "readTime + writeTime =", readTime + writeTime);
        
     
    abstract class TestTask implements Runnable 
        abstract void test();
        abstract void putResults();
        long duration;
        public void run() 
            long startTime = System.nanoTime();
            test();
            duration = System.nanoTime() - startTime;
            synchronized(Tester.this) 
                putResults();
            
            endLatch.countDown();
        
    
    public static void initMain(String[] args) 
        if(args.length > 0) 
            testReps = new Integer(args[0]);

        
        if(args.length > 1) 
            testCycles = new Integer(args[1]);

        
        if(args.length > 2) 
            containersSize = new Integer(args[2]);

        
        System.out.printf("%-27s %14s %14s\\n", "Type", "Read time", "write time");
    

abstract方法containerInitializer返回将被测试的初始化后的容器,它被存储在testContainer域中,另一个abstract方法startReadersAndWriters()启动读取者和写入者,它们将读取和修改测容器。不同的测试在运行时将具有数量变化的读取者和写入者,这样就可以观察到锁的竞争(针对synchronize容器而言)和写入(针对免锁容器)的效果。

为了使用这个框架,我们必须让想要测试的特定类型和容器继承Tester,并提供合适的Reader和Writer类:

abstract class ListTest extends Tester<List<Integer>> 
    ListTest(String testId, int nReaders, int nWriters) 
        super(testId, nReaders, nWriters);
    
    class Reader extends TestTask 
        long result = 0;
        void test() 
            for(long i = 0; i < testCycles; ++i) 
                for(int index = 0; index < containerSize; ++index) 
                    testContainer.set(index, writData[index]);
                
            
        
        void putResults() 
            writeTime += duration;
        
    
    void startReadersAndWriters() 
        for(int i = 0; i < nReaders; ++i) 
            exec.execute(new Reader());
        
        for(int i = 0; i < nWriters; ++i) 
            exec.execute(new Writer());
        
    
    class SynchronizedArrayListTest extends ListTest 
        List<Integer> containerInitalizer() 
            return Collections.synchronizedList(containerSize);
        
        SynchronizedArrayListTest(int nReaders, int nWriters) 
            super("Synched ArrayList", nReaders, nWriters);
        
    
    class CopyOnWriteArrayListTest extends ListTest 
        List<Integer> containerInitalizer() 
            return new CopyOnWriteArrayList<Integer>(new CountingIntegerList(containerSize));
        
        CopyOnWriteArrayListTest(int nReaders, int nWriters) 
            super("CopyOnWriteArrayList", nReaders, nWriters);
        
    


public class ListComparisons 
    public static void main(String[] args) 
        Tester.initMain(args);
        new SynchronizedArrayListTest(10, 0);
        new SynchronizedArrayListTest(9, 1);
        new SynchronizedArrayListTest(5, 5);
        new CopyOnWriteArrayListTest(10, 0);
        new CopyOnWriteArrayListTest(9, 1);
        new CopyOnWriteArrayListTest(5, 5);
        Tester.exec.shutdown();
    
//输出:
Type                            Read time       Write time
Synched ArrayList 10r 0w     232158294700                0
Synched ArrayList 9r 1w      198947618203      24918613399
readTime + writeTime =       223866231602                
Synched ArrayList 5r 5w      117367305062     132176613508           
readTime + writeTime =       249543918570
CopyOnWriteArrayList 10r 0w     758386889                0
CopyOnWriteArrayList 9r 1w      741305671        136145237
readTime + writeTime =          877450908
CopyOnWriteArrayList 5r 5w      212763075      67967464300
readTime + writeTime =        68180227375

从输出可以看到,synchronized ArrayList 无论读取者和写入者的数量是多少,都具有大致相同的性能——读取者与其他读取者竞争锁的方式与写入者相同,但是,CopyOnWriteArrayList在没有写入者时,速度会快许多,并且在由5个写入者时,速度仍旧明显地块。看起来应该尽量使用CopyOnWriteArrayList,队列表写入的影响并没有超过短期同步整个列表的影响。当然,你必须在你的具体应用中尝试这两种不同的方式,以了解到底哪一个更好一些。

比较各种Map实现

我们可以使用相同的框架得到synchronizedHashMap和ConcurrentHashMap在性能方面的比较结果:

abstract class MapTest extends Tester<Map<Integer,Integer>> 
    MapTest(String testId, int nReaders, int nWriters) 
        super(testId, nReaders, nWriters);
    
    class Reader extends TestTask 
        long reault = 0;
        void test() 
            for(long i = 0; i < testCycles; ++i) 
                for(int index = 0; index < containerSize; ++index) 
                    result += testContainer.get(index);
                
            
        
        void putResults() 
            readReaults += result;
            readTime += duration;
        
    
    class Writer extends TestTask 
        void test() 
            for(long i = 0; i < testCycles; ++i) 
                for(int index = 0; index < containerSize; index++) 
                    testContainer.put(index, writeData[index]);
                
            
        
        void putResult() 
            writeTime += duration;
        
    
    void  startReadersAndWriters() 
        for(int i = 0; i < nReaders; ++i) 
            exec.execute(new Reader());
        
        for(int i = 0; i < nWriters; ++i) 
            exec.execute(new Writer());
        
    
    class SynchronizedHashMapTest extends MapTest 
        Map<Integer,Integer> containerInitialier() 
            return Collections.synchronizedMap(new HashMap<Integer,Integer>(new CountingGenerator.Integer(), new CountingGenerator.Integer(), containerSize)); 
        
        synchronizedHashMapTest(int nReaders, int nWriters) 
            super("Synched HashMap", nReader, nWriters);
        
    
    class ConcurrentHashMapTest extends MapTest 
        Map<Integer,Integer> containerInitalizer() 
            return new ConcurrentHashMap<Integer,Integer>(MapData.map(new CountingGenerator.Integer(),new CountingGenerator.Integer(),containerSize));
        
        ConcurrentHashMapTest(int nReaders, int nWriters) 
            super("ConcurrentHashMap", nReaders, nWriters);

        
    




public class MapConparisons 
    public static void main(String[] args) 
        Tester.initMain(args);
        new SynchronizedHashMapTest(10,0);
        new SynchronizedHashMapTest(9,1);
        new SynchronizedHashMapTest(5,5);
        new ConcurrentHashMapTest(10,0);
        new ConcurrentHashMapTest(9,1);
        new ConcurrentHashMapTest(5,5);

    
//输出:
Type                            Read time       Write time
Synched ArrayList 10r 0w     306052025049                0
Synched ArrayList 9r 1w      428319156207      47697347568
readTime + writeTime =       476016503775      
Synched ArrayList 5r 5w      243956877760     244012003202           
readTime + writeTime =        48796880962
CopyOnWriteArrayList 10r 0w   23352654318                0
CopyOnWriteArrayList 9r 1w    18833089400       1541853224
readTime + writeTime =        20374942624
CopyOnWriteArrayList 5r 5w    12037625732      11850489099
readTime + writeTime =        23888114831

向ConcurrentHashMap添加写入者的影响甚至还不如CopyOnWriterArrayList明显,这是因为ConcurrentHashMap使用了一种不同的写入技术,他可以明显地最小化写入所造成的影响。

乐观加锁

乐观加锁针对的是某些Atomic类,它指的是当你执行某项计算时,实际上没有使用互斥,但是在这项计算完成,并且准备更新这个Atomic对象时,你需要使用一个称为compareAndSet()的方法。你将旧值和新值一起提交给这个方法,如果旧值与它在Atomic对象中发现的值不一样,那么这个操作就会失败——这意味着某个其他的任务已经于此操作执行期间修改了一个对象。我们通常使用互斥(synchronized和Lock)来防止多个任务同时修改同一个对象,但这里我们是“乐观的”,因为我们保持数据为未锁定状态,并希望没有任何其他任务插入修改它。所有这些又都是以性能的名义执行——通过使用Atomic来代替synchronized或Lock,可以获得性能上的好处。

乐观加锁最棘手的问题是compareAndSet()方法失败,如果失败,就必须使用传统的同步。

ReadWriteLock-读写锁

读写锁对向数据结构相对不频繁地写入,但是有多个任务要经常读取这个数据结构的这类情况进行了优化。读写锁使得你可以同时有多个读取者,但写入的频率不高。如果写锁已经被其他任务持有,那么任何读取者都不能访问,直至这个写锁被释放位置。

public class ReaderWriterList<T> 
    private ArrayList<T> lockedList;
    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
    public ReaderWriterList(int size, T initalValue) 
        lockedList = new ArrayList<T>(Collections.nCopies(size, initalValue));
    
    public T set(int index, T element) 
        lock wlock = lock.writeLock();
        wlock.lock();
        try 
            return lockedList.set(index,element);
         finally 
            wlock.unlock();
        
    
    public T get(int index) 
        Lock lock = lock.readLock();
        rlock.lock();
        try 
            if(lock.getReadLockCount() > 1) 
                System.out.print(lock.getReadLockCount());
            
            return lockedList.get(index);
         finally 
            rlock.unlock();
        
    
    public static void main(String[] args) 
        new ReaderWriterListTest(30,1);
    


class ReaderWriterListTest 
    ExecutorService exec = Executors.newCachedThreadPool();
    private final static int SIZE = 100;
    private static Random random = new Random(47);
    private ReaderWriterList<Integer> list = new ReaderWriterList<Integer>(SIZE, 0);
    private class Writer implemants Runnable 
        public void run() 
            try 
                for(int i = 0;i < 20; ++i) 
                    list.set(i, random.nextInt());
                    TimeUnit.MILLISECONDS.sleep(100);
                
             catch(InterruptedException e) 

            
            System.out.print("Writer finished, shutting down");
            exec.shutdown();
        
    
    private class Reader implements Runnable 
        public void run() 
            try 
                while(!Thread.interrupted()) 
                    for(int i = 0; i < SIZE; ++i) 
                        list.get();
                        TimeUnit.MILLISECONDS.sleep(1);
                    
                
             catch(InterruptedException e) 

            
        
    
    public ReaderWriterListTest(int readers, int writers) 
        for(int i = 0; i < readers; ++i) 
            exec.execute(new Reader());
        
        for(int i = 0; i < writers; ++i) 
            exec.execute(new Writer());
        
    

ReadWriterList可以持有固定数量的任何对象。你必须向构造器提供所希望的列表尺寸和组装这个列表时所用的初始对象。set()方法获取一个写锁,get()获取一个读锁。注意,本例中写入的任务要远少于读取者。

以上是关于Java编程思想-并发的主要内容,如果未能解决你的问题,请参考以下文章

Java编程思想之二十 并发

Java编程思想-并发

Java编程思想读书笔记--第21章并发

Java编程思想-并发

Java编程思想-并发

Java并发编程- FutureTask详解与池化思想的设计和实战二