分布式技术之 单线程消息队列 SingularUpdateQueue
Posted BBinChina
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式技术之 单线程消息队列 SingularUpdateQueue相关的知识,希望对你有一定的参考价值。
场景
高并发开发技术中经常会使用到多线程、线程池等技术,那我们为啥还搞个单线程消息队列呢?不直接怼多线程搞?
不同的模式适用于不同的场景,当使用多线程的开发模式时,适用于CPU密集型作业,比如使用多线程提高任务并发处理能力,而当应用WAL 机制写log文件时,需要解决的问题是如何将状态更新指令顺序的迅速地写入文件,如果采用多线程,反而需要对文件进行锁来保证写入的顺序。
为了写文件时不阻塞工作线程,通常我们采用消息队列来缓冲写入log文件的指令,再通过线程异步消费来写入文件。
解决思路
实现同步队列单线程消费模式,多个客户端并发将指令写入队列workqueue,再由工作线程 worker thread进行消费刷盘
java类图如下:
//继承thread作为常驻线程进行workqueue的消费,workqueue采用ArrayBlockingQueue,内部实现了锁用于多线程环境,同时定义了isRunning来控制线程消费,handler是消息处理逻辑
public class SingularUpdateQueue<Req, Res> extends Thread{
private ArrayBlockingQueue<RequestWrapper<Req, Res>> workQueue = new ArrayBlockingQueue<RequestWrapper<Req, Res>>(100);
private Function<Req, Res> handler;
private volatile boolean isRunning = false;
//提供对外接口用于投递消息
public CompletableFuture<Res> submit(Req request) {
try {
var requestWrapper = new RequestWrapper<Req, Res>(request);
workQueue.put(requestWrapper);
return requestWrapper.getFuture();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
//工作线程消费队列消息
private Optional<RequestWrapper<Req, Res>> take() {
try {
//arrayBlockingQueue可阻塞线程避免线程空转,可定义超时时间
return Optional.ofNullable(workQueue.poll(300, TimeUnit.MICROSECONDS));
} catch (InterruptedException e) {
return Optional.empty();
}
}
@Override
public void run() {
isRunning = true;
while(isRunning) {
//RequestWrrappers是对消息的封装,用于返回一个future,采用异步方式实现同步调用思维
Optional<RequestWrapper<Req, Res>> item = take();
item.ifPresent(requestWrapper -> {
try {
Res response = handler.apply(requestWrapper.getRequest());
requestWrapper.complete(response);
} catch (Exception e) {
requestWrapper.completeExceptionally(e);
}
});
}
}
public void shutdown() {
this.isRunning = false;
}
}
采用RequestWrapper封装请求,当消费请求返回future时,请求端可以根据future的返回进行下一步逻辑处理,这种同步思维方式更方便业务处理,在c++实现中,我比较喜欢使用lambda封装下一步操作后作为请求的处理回调,而不是采用promise future机制
class RequestWrapper<Req, Res> {
private final CompletableFuture<Res> future;
private final Req request;
public RequestWrapper(Req request) {
this.request = request;
this.future = new CompletableFuture<Res>();
}
public CompletableFuture<Res> getFuture() {return future;}
public Req getRequest() {
return request;
}
public void complete(Res response) {
future.complete(response);
}
public void completeExceptionally(Exception e) {
e.printStackTrace();
getFuture().completeExceptionally(e);
}
}
那么,如何运用到我们的WAL程序中呢?可采用如下类图实现:
代码实现如下:
public class WalRequestConsumer implements Consumer<Message<RequestOrResponse>> {
//创建消息异步处理队列
private final SingularUpdateQueue<Message<RequestOrResponse>, Message<RequestOrResponse>> walWriterQueue;
private final WriteAheadLog wal;
public WalRequestConsumer(Config config) {
this.wal = WriteAheadLog.openWAL(config);
//队列消费处理逻辑
walWriterQueue = new SingularUpdateQueue<>((message) -> {
wal.writeEntry(serialize(message));
return responseMessage(message);
});
//启动队列工作线程
startHandling();
}
private void startHandling() { this.walWriterQueue.start(); }
@Override
public void accept(Message message) {
//通过返回的future,在消息被消费后可以执行下一步操作sendResponse
CompletableFuture<Message<RequestOrResponse>> future = walWriterQueue.submit(message);
future.whenComplete((responseMessage, error) -> {
sendResponse(responseMessage);
});
}
}
队列选择
多个客户端可并发调用accpt,而队列的submit内部采用arrayBlockingQueue保证了线程安全,但是其有很明显的缺点,arrayBlockQueue底层采用数组实现,在声明时确定了数组的大小,当队列消费过慢而生产过快导致队列填满时,会造成生产线程的阻塞。我们也可以采用其他的几种队列来替换:
- Akka actor使用的 ConcurrentLinkedQueue:
底层采用链表方式,在内存满足情况下添加消息,不会阻塞生产线程,其线程安全机制主要是出入队时保证cas原子操作,直到出入队成功。
- zookeep 和 kafak 使用的LinkedBlockingDeque:
可以指定容器大小来避免数据无限填充,不设置大小时默认为Integer.MAX_VALUE,其底层是一个双向链表,可以实现FIFO、FILO,其线程安全机制是使用一个可重入锁和这个锁生成的两个条件对象进行并发控制(classic two-condition algorithm)
- LMAX的高性能队列 Disruptor中使用的 RindBuffer,下一篇文章着重讲解RindBuffer
总结:
java.util.concurrent.ArrayBlockingQueue有两个方法添加元素。如果数组已满,Put方法将阻塞生产者。如果队列满了,add方法抛出IllegalStateException,但不会阻塞生产者,我们需要结合使用场景考量我们需要阻塞生产者,还是抛出异常反馈消费情况。
扩展
我们的消费逻辑如果有对外其他调用时(将调用视为事件),同理的可以将其解耦丢自另外的一个SingularUpdateQueue,我在生产环境的模拟交易所中,会将OMS、TMS等模块继承于SingularUpdateQueue,每个模块进行独立的工作线程处理。
以上是关于分布式技术之 单线程消息队列 SingularUpdateQueue的主要内容,如果未能解决你的问题,请参考以下文章