Java: Concurrency

Posted 荒废的养鸡场

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java: Concurrency相关的知识,希望对你有一定的参考价值。

Executor Framework

e.g.

 1 public class YourTask implements Runnable {
 2     
 3     public YourTask() {
 4         /*constructor*/
 5     }
 6     
 7     public void run() {
 8         try {
 9             /*perform task */
10 Thread.sleep(sleepTime);//put to sleep
11 /* sleep is called only for demonstration cuz normally it’s unpredictable when and how long will  each threa perform its task. */
12         } catch (InterruptedException exception) {
13             exception.printStackTrace();
14             Thread.currentThread().interrupt();
15         }
16     }
17     
18 }
19 
20 public static void main(String[] args) {
21     PrintTask task1=new yourTask("task1");
22     PrintTask task2=new yourTask("task2");
23     PrintTask task3=new yourTask("task3");
24     System.out.println("Starting Executor");
25     
26     ExecutorService executorService=Executors.newCachedThreadPool();
27     
28     executorService.execute(task1);
29     executorService.execute(task2);
30     executorService.execute(task3);
31     
32     executorService.isShutdown();
33        executorService.awaitTermination(1, TimeUnit.MINUTES); 
34 }

Runnable interface

specifies a task to execute concurrently with other tasks.

run() contains the task to perform.

Executor Interface

,executes Runnable objects, manages a thread pool.

execute() accepts argument Runnable, which will be passed to one of the available threads (to perform its run()) in the thread pool.

ExecutorService interface

Extends Executor.

Obtained by calling class Executors’static method newCachedThreadPool().

shutdown() notifies the ExecutorService to stop accepting new tasks but continue executing tasks that have already been submitted.

awaitTermination() returns control to its caller either when all tasks executing in the ExecutorService complete (return true) or when the specified timeout elapses(return false). Can be used to ask the main thread to wait for scheduled tasks to complete (because in practice main thread can terminates early while other threads of the program can continue executing).

 

 

Producer-Consumer

1 //home-made interface Buffer shared by Producer and Consumer threads
2 public interface Buffer {
3     //called by Producer
4     public void blockingPut(int value) throws InterruptedException;
5     //called by Consumer
6     public int blockingGet() throws InterruptedException;
7 }

Java.util.concurrent -- use encapsulated synchronization class

Classes from java.util.concurrent encapsulate the synchronization.

ArrayBlockingQueue: A fully implemented thread-safe buffer class that implement interface BlockingQueue.

put() places an element at the end of the BlockingQueue, waiting if the queue is full.

take() removes an element from the head of the BlockingQueue, waiting if the queue is empty.

import java.util.concurrent.ArrayBlockingQueue; 

 1 public class BlockingBuffer implements Buffer {
 2 
 3     private final ArrayBlockingQueue<Integer> buffer;
 4     
 5     public BlockingBuffer() {
 6         buffer=new ArrayBlockingQueue<Integer>(1);
 7     }
 8     
 9     public void blockingPut(int value) throws InterruptedException {
10         buffer.put(value);
11         /* other operations
12         ...
13         */
14     }
15     
16     public int blockingGet() throws InterruptedException {
17         int readValue=buffer.take();
18         /* other operations
19         ...
20         */
21         return readValue;
22     }
23 }

 Monitors: synchronized, wait, notify & notifyAll

Every object has a monitor and a monitor lock. The monitor ensures that the object’s monitor lock is held by a maximum of 1 thread at any time.

synchronized

synchronized (object) { //object is normally ‘this’
    /* statements */
}

code placed in a synchronized statement is guarded by the monitor lock, a thread must acquire the lock to execute the guarded statements.

synchronized methods

Before executing, a synchronized instance method must acquire the lock on the object used to call the method.

A static synchronized method must acquire the lock on the class used to call the method.

Place all access to mutable data that may be shared by multiple threads inside synchronized statement or synchronized methods.

Keep the duration of synchronized statement as short as possible to minimize the wait time for blocked thread. E.g. avoid performing I/O, length calculation.

wait(): When a thread holding the monitor lock on an object determines that it cannot continue its task until some condition is satisfied, it call wait() on the synchronized obj to release the monitor lock on the synchronized object, then the thread switches to waiting state while other threads can enter the obj’s synchronized statements/methods.

notify(): When a thread executing a synchronized statement/method completes or satisfies the condition on which another thread may be waiting, it can call notify() on the synchronized obj to allow a waiting thread to transition to runnable again.

notifyAll(): If a thread call notifyAll() on the synchronized obj, all threads waiting for that monitor lock become eligible to reaquire the lock.

e.g.

 1 public class SynchronizedBuffer implements Buffer {
 2     
 3     private int buffer=-1;
 4     private boolean occupied=false;
 5     
 6     public synchronized void blockingPut(int value) throws InterruptedException {
 7         //when there are no empty location, place thread in waiting state
 8         while (occupied) {
 9             System.out.println("Producer tries to write.");
10             displayState("Buffer full. Producer waits.");
11             wait();
12         }
13         //put value
14         buffer=value;
15         occupied=true;
16         displayState("Producer writes "+buffer);
17         //tell waiting threads to enter runnable state
18         notifyAll();
19     } //release lock on SynchronizedBuffer
20     
21     public synchronized int blockingGet() throws InterruptedException {
22         //when there‘s no data to read, place thread in waiting state
23         while (!occupied) {
24             System.out.println("Consmer tries to read.");
25             displayState("Buffer empty. Consumer waits.");
26             wait();
27         }
28         //retrieve value
29         occupied=false;
30         displayState("Consumer reads "+buffer);
31         //tell waiting threads to enter runnable state
32         notifyAll();
33         return buffer;
34     } //release lock on SynchronizedBuffer
35     
36     private synchronized void displayState(String operation) {
37         System.out.printf("%-40s%d		%b%n%n", operation,buffer,occupied);
38     }
39 }

 

Lock & Condition

Lock interface

lock() & unlock(): A thread calls Locks’s lock() method to acquire the lock. Once a Lock has been obtained by one thread, the Lock object will not allow another thread to obtain the Lock until the first thread releases the Lock (by calling the Lock’s unlock() method).

Place unlock() in a finally block so that if an exception is thrown unlock must still be called.

ReentrantLock: a class that is a basic implementation of Lock. Its constructor takes a Boolean argument that specifies fairness policy – true: “the longest-waiting thread will acquire the lock when it’s available”(avoids indefinite postpone); false: no guarantee as to which waiting thread will acquire the lock when it’s available.

newCondition(): returns an object that implements Condition interface that are associated with that specific Lock.

Condition interface

If a thread that owns a Lock determines that it cannot continue with its task until some condition is satisfied, the thread can wait on a condition object.

await(): A thread can call the Condition’s await(), this immediately releases the associated Lock and places the thread in waiting state for that Condition object.

signal(): When a runnable thread completes its task, it can call Condition method signal() to allow a thread in that Condition’s waiting state to return to runnable.

signalAll(): If a thread calls Condition method signalAll(), all threads waiting for that condition transition to runnable state, only one of them can obtain the Lock and other will wait again.

e.g. /* somehow it does not work properly in Windows 10 because the lock isn’t released when the thread await on a condition, I don’t know why. It works fine in MAC OS X. */ 

 1 public class SynchronizedBuffer implements Buffer {
 2 
 3     private final Lock accessLock=new ReentrantLock();
 4     private final Condition canWrite=accessLock.newCondition();
 5     private final Condition canRead=accessLock.newCondition();
 6     private int buffer=-1;
 7     private boolean occupied=false;
 8     
 9     public void blockingPut(int value) throws InterruptedException {
10         accessLock.lock(); //lock this object
11         try {
12             while (occupied) {
13                 System.out.println("Producer tries to write.");
14                 displayState("Buffer full. Producer waits.");
15                 canWrite.await(); //wait until buffer is empty
16             }
17             buffer=value;
18             occupied=true;
19             displayState("Producer writes "+buffer);
20             canRead.signalAll();
21         } finally {
22             accessLock.unlock();
23         }
24     }
25     
26     public int blockingGet() throws InterruptedException {
27         int readValue=0;
28         accessLock.lock();
29         try {
30             //if there‘s no data to read, place the thread in waiting state
31             while (!occupied) {
32                 System.out.println("Consumer tries to read.");
33                 displayState("Buffer empty. Consumer waits.");
34                 canRead.await();//wait on Condition canRead (full buffer) also release the lock implicitly
35             }
36             occupied=false;
37             readValue=buffer;
38             displayState("Consumer reads "+readValue);
39             //signal any threads waiting for Condition canWrite (empty buffer)
40             canWrite.signalAll();
41         } finally {
42             accessLock.unlock();
43         }
44         return readValue;
45     }
46     
47     private void displayState(String operation) {
48         try {
49             accessLock.lock();
50             System.out.printf("%-40s%d		%b%n%n", operation,buffer,occupied);
51         } finally {
52             accessLock.unlock();
53         }
54     }
55     
56 }

 

SwingWorker – Multithreading with GUI

event dispatch thread: All Swing applications have this single thread to handle interaction with the application’s GUI components. All tasks that require interaction with an application’s GUI are placed in an event queue and are executed sequentially by the event dispatch thread.

Swing GUI component are not thread safe. Thread safety in GUI application is achieved not by synchronizing thread action, but by thread confinement.

Thread confinement: Allowing just one thread (the event dispatch thread) to access Swing components.

Problem: If an application must perform a lengthy computation in response to a user interaction, the event dispatch thread cannot attend to other tasks in the event queue while computation, this causes the GUI component to be unresponsive.

Solution: to handle a long-running computation in a separate thread, freeing the event dispatch thread to continue managing other GUI interactions.

SwingWorker<T,V> class: implements Runnable. When execute() is called, the object will be scheduled to perform an asynchronous task in a worker thread then update Swing components from the event dispatch thread based on the task’s results. 

T – the type returned by doInBackground().

V – the type passed between publish() and process() to handle intermediate results.

The GUI components that will be manipulated by SwingWorker methods (e.g. process() or done()) should be passed to the SwingWorker subclass’s constructor and stored in the subclass object.

doInBackground() defines a long computation and is called in a worker thread.

done() executes on the event dispatch thread when doInBackground() returns.

execute() schedules the SwingWorker object to be executed in a worker thread.

get() waits for the result to be ready if necessary. 

publish() sends intermediate results from doInBackground() to process().

process() receives intermediate results from publish() and processes them on the event dispatch thread.

setProgress() sets the progress property to notify any property change listener on the event dispatch thread of progress bar updates.

e.g. use SwingWorker subclass to perform background calculation

 1 public class BackgroundCalculator extends SwingWorker<Long, Object> {
 2     
 3     private final JLabel resultJLabel; //some related UI object
 4     
 5     public BackgroundCalculator(int n, JLabel resultJLabel) {
 6         //constructor
 7     }
 8     
 9     public Long doInBackground() {
10         //long-running task to performed in a worker thread
11     }
12     
13     protected void done() {
14         //code to run on the event dispatch thread when doinBackground returns
15         try {
16             //get the result of doInBackground and display it
17             
18             resultJLabel.setText(get().toString());
19         } catch (InterruptedException ex) {
20             /* if the current thred is interrupted while waiting for get() to return.
21 But Since we call get() from done(), the computation will be complete before get() is called
22 Thus this exception will not occur in this example. */
23         } catch (ExecutionException ex) {
24             /* if an exception occurs during the computation */
25         }
26     }
27 
28 }
 1 public class FibonacciNumbers extends JFrame {
 2     //button to activate the swingWorker subclass‘s task
 3     private final JButton goJButton=new JButton("Go");
 4     //ui component connected to the swingWorker subclass
 5     private final JLabel fibonacciJLabel=new JLabel();
 6     
 7     //Constructor
 8     public FibonacciNumbers() {
 9         /* super, setLayout, ...etc */
10         goJButton.addActionListener(
11             new ActionListener() {
12                 public void ActionPerformed(ActionEvent event) {
13                     /* retrieve user‘s input as swingWorker subclass‘s input
14                     e.g. integer n. */
15                     //create a task to perform calculation in background
16                     BackgroundCalculator task=new BackgroundCalculator(n, fibonacciJLabel);
17                     task.execute(); //execute the task
18             });
19         /* add GUI components, etc.*/
20         
21         
22     }
23     
24 }

 

e.g. update the GUI with intermediate results before the long calculation completes.

-- using publish(), process() and setProgress().

publish() -> sends prime numbers to process() as they’re found;

process() -> displays found primes in a GUI component;

setProgress() -> updates the progress property.

 1 public class PrimeCalculator extends SwingWorker<Integer, Integer> {
 2     
 3     /* GUI components & arrays for finding primes */
 4     private final boolean[] primes;
 5     
 6     //constructor
 7     public PrimeCalculator(int max, JTextArea intermediateJTextArea,
 8         JLabel statusJLabel, JButton getPrimesJButton,
 9         JButton cancelJButton) {
10         /* stores parameters GUI component inside */
11     }
12     
13     //calculation: find all primes up to max
14     public Integer doInBackground() {
15         //...
16         if (isCancelled())
17             return count;
18         else {
19             /* ... */
20             /*put the worker thread to sleep for a few milliseconds between calls to publish()
21             thus slow down the calculation and event dispatch thread can keep up with requests of update.
22             Because too many requests piling up on the event dispatch thread will cause event queue out of memory.
23              
24             */
25             Thread.sleep();
26             //during calculation, set progress according to count of found primes
27             setProgress(100*(i+1)/primes.length);    
28             //during calculation, make a found prime i available for displaying an intermediate result
29             publish(i);
30         }
31         /* ... */
32     }
33     
34     //display found primes in a GUI component
35     protected void process(List<Integer> publishedVals) {
36         for (int i=0;i<publishedVals.size();i++)
37             intermediateJTextArea.append(publishedVals.get(i)+"
");
38     }
39     
40     //code to execute when doInBackground completes
41     protected void done() {
42         /* ... */
43     }
44         
45 }

 

-- In JFrame subclass, add PropertyChangeListener for the SwingWorker subclass to listen for progress changes.

 1 public class FindPrimes extends JFrame {
 2 
 3     //GUI components & the SwingWorker subclass
 4     private final JProgressBar progressJProgressBar=new JProgressBar();
 5     private PrimeCalculator calculator;
 6     
 7     //Consturctor
 8     public FindPrimes() {
 9         /* ...
10         * Initialization
11         * & reset GUI components
12         * & get user input */
13         //inside the actionPerformed() event listener method of the start button,
14         //construct a new SwingWorker subclass object
15         calculator = new PrimeCalculator(number,displayPrimesJTextArea,
16             statusJLabel, getPrimesJButton, cancelJButton);
17         //listen for progress bar property changes
18         calculator.addPropertyChangeListener(
19             new PropertyChangeListener() {
20                 public void propertyChange(PropertyChangeEvent e) {
21                     //if the changed property is progress, update the progresss bar
22                     if (e.getPropertyName().contentEquals("progress")) {
23                         int newValue=(integer)e.getNewValue();
24                         progressJProgressBar.setValue(newValue);
25                     }
26                 }
27             });
28             calculator.execute();
29     }
30     
31 }

 

 

 

Parallel Sort & Parallel Stream

ParallelSort()

For sorting large arrays on multi-core systems, Arrays.parallelSort() is more efficient than Arrays.sort().

Can compare the efficiency by Date/Time API:

e.g.

1 Instant sortStart=Instant.now();
2 Arrays.sort(array1);
3 Instant sortEnd=Instant.now();
4 long sortTime=Duration.between(sortStart,sortEnd).toMillis();
5 Instant parallelSortStart=Instant.now();
6 Arrays.sort(array2);
7 Instant parallelSortEnd=Instant.now();
8 long parallelSortTime=Duration.between(parallelSortStart,parallelSortEnd).toMillis();
9 String percentage=NumberFormat.getPercentInstance().format((double)sortTime/parallelSortTime);

Parallel Stream

Invoke parallel() on an existing stream to obtain a parallel stream thus enhance performance on multi-core system.

Long stream stream2=Arrays.stream(values).parallel():

 

 

 

Interface Callable

Problem: Runnable’s run() cannot return a value, would require shared mutable data to pass the value back if the calling thread needs it.

Solution: Callable.

Callable interface

call(): returns a value representing the result of its task.

ExecutorService interface

submit(): executes its Callable argument and returns an object of type Future.

Future interface

Represents the Callable’s future result.

get(): blocks the calling thread and waits for the Callable to complete and return its result.

Also provides methods that cancel a Callable’s execution/determine whether Callable was cancelled/determine whether the Callable completed its task.

CompletableFuture class

Implements the Future interface. Can asynchronously execute Runnables or Suppliers.

e.g.

 1 //perform synchronous calculation tasks fibonacci()
 2 //TimeData is the specific return type of fibonacci()
 3 TimeData synchronousResult1=startFibonacci(45);
 4 TimeData synchronousResult2=startFibonacci(44);
 5          
 6 //perform asynchronous calculation tasks fibonacci()
 7 CompletableFuture<TimeData> futureResult1=
 8                 CompletableFuture.supplyAsync(()->startFibonacci(45));
 9 CompletableFuture<TimeData> futureResult2=
10                 CompletableFuture.supplyAsync(()->startFibonacci(44));
11 //wait for results from the asynchronous operations
12 TimeData asynchronousResult1=futureResult1.get();
13 TimeData asynchronousResult2=futureResult2.get();

 

 

以上是关于Java: Concurrency的主要内容,如果未能解决你的问题,请参考以下文章

JAVA concurrency 之ThreadLocal源码详解,80%人不会

Java Concurrency in Practice 4.1-4.2相关问题及理解

Java concurrency : @GuardedBy

在此类的对象上调用 start() 是不是安全? Java Concurrency in practice 的一个例子

在这个类的对象上调用start()是否安全? Java Concurrency实践中的一个例子

Java Concurrency - Concurrent Collections