Java并发编程实战之基于生产者消费者模式的日志服务读书笔记
Posted 郭梧悠
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发编程实战之基于生产者消费者模式的日志服务读书笔记相关的知识,希望对你有一定的参考价值。
日常开发中,我们会经常跟日志系统打交道,日志系统算是典型的生产者消费者模型的系统。(多个不同的)生产者线程负责生产日志消息,这些系统扮演者生产者的角色,日志系统负责将消息队列里的日志上传到服务器,属于消费者角色。是一种多生产者单消费者模式的设计方式。如果生产者的生产速度大于LoggerThread
的消费速度,则BlockingDeque
线程会阻塞生产者,直到LoggerThread
有能力处理新的消息。
不支持关闭的生产者-消费者日志服务(bug版本)
public class LogService {
private final BlockingQueue<String> queue;
private final LoggerThread loggerThread;
public LogService() {
//注意队列的容量为10
queue = new LinkedBlockingQueue<>(10);
loggerThread = new LoggerThread();
}
public void start() {
loggerThread.start();
}
/**
* 生产者生产消息,每个log的调用者都行相当于一个生产者
*
* @param msg
* @throws InterruptedException
*/
public void log(String msg) throws InterruptedException {
queue.put(msg);
}
/**
* 消费者线程消费消息,该线程扮演的是消费者这一角色。
*/
private class LoggerThread extends Thread {
public LoggerThread(){
}
public void run() {
try {
while (true) {//无限循环
System.out.println(queue.take());
}//end while
} catch (InterruptedException e) {//在循环外响应中断
} finally {
System.out.println("log service is close");
}
}
}
}
上面代码看起来很正常,但是有如下缺点:
1、LoggerThread
没有提供关闭的方法,当没有生产者生产消息的时候,那么LoggerThread
在调用queue.take()
的时候会一直处于阻塞状态,导致JVM没法正常关闭。比如下面的调用代码,就会发生LoggerThread
无法关闭的情况。
public static void main(String[] args) {
LogService logService = new LogService();
for(int i=0;i<100;i++) {
final int index =i;
new Thread(new Runnable() {
@Override
public void run() {
try {
logService.log("生产者生产消息===="+index);
} catch (InterruptedException e) {
System.out.println("response interception");
}
}
}).start();
}
logService.start();
}
关闭LoggerThread
很容易啊,只需要把LoggerThread
的方法改成如下就可以了吧?
public void run() {
try {
while (!queue.isEmpty()) {//判断队列是否为空,如果为空则关闭
System.out.println(queue.take());
}
} catch (InterruptedException e) {
System.out.println("response interception");
} finally {
System.out.println("log service is close="+queue.size());
}
}
上面的改动中,我们将while(true)
改成了while(!queue.isEmpty()).
可以吗?答案是完全不可以!在我们这个demo中,队列的大小是有限制的,如果消费者的消费速度大于生产者的速度,那么队列在某一瞬间就变成空了,也就是queue.isEmpty()为true,此时消费者关闭,但是生产者可以继续生产,则队列满的情况下生产者会阻塞。取消一个生产者-消费者模型的最可靠的方法是需要同时取消生产者和消费者,因为当单独的关闭消费者的时候,会阻塞生产者;当单独的关闭生产者的时候,会阻塞消费者。其关闭原则应该是:我们应该提供一个Stop方法,调用stop方法的时候表示日志服务已经关闭,此时生产者不可以再生产消息,当消费者消费完队列里的消息后才正式关闭日志服务,避免造成日志的丢失,
且看下面代码,我们为LogService提供了stop方法,并且修改了log方法,当isShutDown为true的时候,就不能在生产消息。然而这看似完美的操作却有个巨大漏洞,stop方法不是原子的,if(!isShutdown)也不是原子的,使得关闭方法并不可靠
private boolean isShutdown=false;
public void stop(){
isShutdown=true;
}
public void log(String msg) throws InterruptedException {
if(!isShutdown){
queue.put(msg);
}
}
所以我们需要改造。使之成为原子的即可。我们改造如下
支持关闭的生产者-消费者日志服务(不安全版本)
使用synchronized关键字,将stop改造成原子的,需要注意的是,因为put方法本身就可以阻塞,所以我们不需要在消息加入队列的时候再去持有一个锁,所以我们将put方法放在了synchronized语句块的外面。
private boolean isShutDown = false;
public void stop(){
synchronized(this) { isShutDown=true;}
}
/**
* 生产者生产消息,每个log的调用者都行相当于一个生产者
*
* @param msg
* @throws InterruptedException
*/
public void log(String msg) throws InterruptedException {
synchronized(this) {
if(isShutDown) {
return;
}
}
queue.put(msg);
}
经过这么以改进,当我们调用stop的时候,则生产者线程不会继续生产消息。我们可以将LoggerThread
改造如下:
public void run() {
try {
while (true) {
if(isShutDown) {//如果关闭则退出
break;
}
System.out.println(queue.take());
}
} catch (InterruptedException e) {//注意我们这里实在while循环外响应中断的。
System.out.println("response interception");
} finally {
System.out.println("log service is close="+queue.size());
}
}
那么这样就没问题了吗?答案是问题大了去了。比如我们调用了stop方法,此时LoggerThread
消费者正好在queue.take()
阻塞着,那么if(isShutDown) {break}
则永远不会执行,LoggerThread
是无法退出的。正确的做法是我们需要再次对stop
方法进行改造,改造如下:
public void stop(){
synchronized(this) { isShutDown=true;}
loggerThread.interrupt();
}
我们添加了loggerThread.interrupt();
,此时take方法发生阻塞时会响应中断,因为我们在while循环之外响应中断的,则跳出while循环,执行打印如下:
public static void main(String[] args) {
LogService logService = new LogService();
for(int i=0;i<90;i++) {
final int index =i;
Thread thread=new Thread(new Runnable() {
@Override
public void run() {
try {
logService.log("生产者生产消息===="+index);
} catch (InterruptedException e) {
System.out.println("response interception");
}
}
});
thread.start();
}
logService.start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
//两秒之后关闭日志系统
logService.stop();
}
执行打印如下:
response interception
log service is close=0
那么到此为止是不是就没问题了呢?当然仍然有问题,如果发生中断的时候,我们在while循环之外响应中断,消息队列中仍然有消息,我们就没法继续处理剩下的日志,那么日志服务的关闭势必造成日志的丢失,如果是比较关键的日志信息丢失了,我们都没地方哭去。
所以我们的日志服务系统仍然有优化的空间。
支持关闭的生产者-消费者日志服务(安全版本)
我们的目标是生产者生产多少消息,我们的日志系统就处理多少消息。当日志服务关闭的时候,生产者不能生产消息。消费者在处理完所有消息后,自行关闭。
为此我们继续改造,使用一个变量logCount,每调用一次log则logCount+1;注意logCount递增操作需要是原子的。
为此我们对log方法进行改造,改造如下:
private int logCount=0;
public void log(String msg) throws InterruptedException {
synchronized(this) {
if(isShutDown) {
return;
}
//消息递增
logCount++;
}
queue.put(msg);
}
处理完了成产者,我们需要对消费者进行改造,注意LoggerThread
之前的消费者捕获InterruptedException
时,是在while循环之外,我们现在需要放在循环之内。这样我们才能正确处理中断,确保队列里的消息处理完毕,改造后的LoggerThread
如下:
到此为止,本篇博文就结束了,在读《Java并发编程实战》这一章节的时候,还有点疑问为什么这么写。直到写完了这篇博客,边写博客边敲程序进行验证,才算是彻底了解了其中的意义。果然写博客还是很有帮助的。起码这边博客对博主自己深入理解Java并发编程,有很大帮助。
完整的代码如下:
package log;
import java.io.PrintWriter;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
public class LogService {
private final BlockingQueue<String> queue;
private final LoggerThread loggerThread;
public LogService() {
queue = new LinkedBlockingQueue<>(10);
loggerThread = new LoggerThread();
}
public void start() {
loggerThread.start();
}
private boolean isShutDown = false;
public void stop() {
synchronized (this) {
isShutDown = true;
}
loggerThread.interrupt();
}
private int logCount = 0;
public void log(String msg) throws InterruptedException {
synchronized (this) {
if (isShutDown) {
return;
}
logCount++;
}
queue.put(msg);
}
/**
* 消费者线程消费消息,该线程扮演的是消费者这一角色。
*/
private class LoggerThread extends Thread {
public LoggerThread() {
}
public void run() {
try {
while (true) {
try {
synchronized (LogService.this) {
if (isShutDown && logCount == 0) {
break;
}
}
System.out.println(queue.take());
synchronized (LogService.this) {
logCount--;
}
} catch (InterruptedException e) {
System.out.println("相应中断,while循环继续执行,除非满足退出条件");
}
}//end while
} finally {
//资源额释放
System.out.println("log service is close=" + queue.size());
}
}
}
public static void main(String[] args) {
LogService logService = new LogService();
for (int i = 0; i < 90; i++) {
final int index = i;
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
logService.log("生产者生产消息====" + index);
} catch (InterruptedException e) {
System.out.println("response interception");
}
}
});
thread.start();
}
logService.start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
logService.stop();
}
}
以上是关于Java并发编程实战之基于生产者消费者模式的日志服务读书笔记的主要内容,如果未能解决你的问题,请参考以下文章