交易订单请求同步转异步的一种解决办法
Posted forwrader
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了交易订单请求同步转异步的一种解决办法相关的知识,希望对你有一定的参考价值。
最开始,先描述一下交易场景:
- 商户发起请求到交易系统,等待交易系统的返回;
- 交易系统接收请求并按渠道要求组装报文发送到银行渠道;
- 渠道接收信息,通过mq的方式异步返回通知结果到交易系统;
- 交易系统通过监听mq队列,接收到渠道返回的交易结果后,把mq的异步消息转化为同步的消息返回给商户;
- 商户获取到交易结果,完成交易;
一般情况下,我们是用 HttpClient 的方式,将商户上送的交易信息组装报文后,通过 post 方式发送到渠道,然后 HttpClient 返回的流信息就是对应的交易结果。这个就是一个同步的交易请求。
现在要解决的问题是,把请求发送到渠道后,渠道不是立即返回信息,而是通过 mq 的方式异步返回结果,而在这个请求过程中,商户还在页面上等待返回交易结果,这就需要交易系统做一个异步转同步的处理,把结果同步的返回给页面等待的商户。
解决思路:
- 通过使用 Object 的 wait 方式,让交易系统的请求线程等待指定的时间
- 在 mq 队列监听接收到渠道返回的交易结果后,根据交易信息中的 sequenceNumber 字段,关联找到正在 wait 阻塞状态的线程,并调用 notifyAll 方法唤醒线程
- 交易请求线程被唤醒后,就按照同步的逻辑组装报文返回给商户端
实现过程
- 建立一个类,来封装交易对象信息,后面对这个交易对象调用 wait 方法
public class TransactionInfo {
// 流水号, 在异步结果中关联找到原请求的信息
private String sequenceNumber;
// 原交易对象
private Object originalTrxObj;
// 异步交易结果
private Object trxResultObj;
public TransactionInfo(String sequenceNumber, Object originalTrxObj) {
this.sequenceNumber = sequenceNumber;
this.originalTrxObj = originalTrxObj;
}
- 建立一个同步管理类,来管理交易请求封装的信息对象,这里使用的是一个 Map 进行封装
public class SyncFactory {
/**
* Key : 交易的请求流水号
* Value :交易的请求信息的封装
*/
private Map<String, TransactionInfo> map = new ConcurrentHashMap<String, TransactionInfo>();
public synchronized TransactionInfo put(String seqNum, Object req){
TransactionInfo info = new TransactionInfo(seqNum, req);
map.put(seqNum, info);
return info;
}
}
- 交易系统接收到商户发起的交易请求时,阻塞线程,调用 SyncFactory 缓存交易信息,当然要先获取流水号,要填充那个 Map 对象的嘛
// 这里自定义实现获取交易流水号的逻辑
String seqNum = getSeqNum();
// 调用 SyncFactory 的方法,缓存交易信息
TransactionInfo info = syncFactory.put(seqNum, trxReqObj);
// 调用 wait 方法,自定义自己合适的 timeout, 不可能等交易结果等上几小时的嘛
synchronized (info) {
info.wait(syncTimeOut * 1000);
}
- 在 mq 监听消费者的逻辑中, 获取异步结果,根据流水号 seqNum 来调用 notifyAll 唤醒阻塞的线程
public class TransactionResultHttpHandler implements HttpHandler {
@Override
public void handle(HttpExchange he) throws IOException {
// 自定义从异步结果中获取交易流水号的逻辑
String seqNum = getNumFromResult();
// 根据上面获取到的流水号,取到缓存的交易信息对象
TransactionInfo info = syncFactory.put(seqNum, trxReqObj);
// 将异步结果的交易结果信息
synchronized (info) {
info .setTrxResultObj(res);
info .notifyAll();
}
}
}
整个异步转同步的过程大概就是这个样子。
然后补充两点这个设计的问题。
- 在 TransactionInfo 类中,可以增加一个表示这个信息缓存的时间信息。用来判断当结果返回或者是等待结果超时之后,从 SyncFactory 中把对应的信息给移除掉。
// 未返回结果, 判断是否超过自定义的 syncTimeOut, 从 Map 中移除
public boolean expired(int timeout){
return (System.currentTimeMillis()- info.getLiveTime() > syncTimeOut);
}
// 有回结果后, 判断如果在自定义的 syncTimeOut 时间内就返回的话,把 seqNum 对应的交易信息从 Map 中移除
if(System.currentTimeMillis()-info.getLiveTime()< syncTimeOut){
syncFactory.remove(seqNum)
}
- 第二个问题就是这个设计有个很明显的缺陷,在调用 wait 方法的时候,线程阻塞等待请求结果的返回,如果在同一时间出现并发数较大的情况,线程池一下盛满,直接把应用搞挂了。当然这个问题就是这整个设计实现方式的问题了。暂时没有找到其他的异步转同步的解决办法,所以第二个问题没有解决。
大概就是这些。有其他办法的,可以留个链接哇,我也再找其他解决思路~~~
以上是关于交易订单请求同步转异步的一种解决办法的主要内容,如果未能解决你的问题,请参考以下文章