简易版的生产者消费者实现业务异步事务分离

Posted diaobiyong

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了简易版的生产者消费者实现业务异步事务分离相关的知识,希望对你有一定的参考价值。

定义一个model类

/**
 * 版权所有:
 * 项目名称: 
 * 创建者: 
 * 创建日期: 2018年5月10日
 * 文件说明: 见类描述
 */
package com.huaxin.acws.component.message.model;

import java.util.Date;
import java.util.Map;

/**
 * @author
 *
 */
public class MessageModel 
    /**
     * 事件ID
     */
    private String eventId;
    /**
     * 事件类型
     */
    private String eventType;
    /**
     * 事件内容,保存数据库json格式,
     */
    private Map<String, Object> payload;
    /**
     * 预留,同步发送的失败消息,定时发送重试次数
     */
    private int retryCount = 0;
    /**
     * 预留,定时最后发送事件
     */
    private Date lastRetryTime = null;
    /**
     * 事件开始的事件
     */
    private Date startTime = null;
    
    /**
     * 有效性校验
     * @return
     */
    public boolean isValid() 
        if (this.getEventType() == null || this.payload == null) 
            return false;
        
        
        return true;
    
    /**
     * @return the eventId
     */
    public String getEventId() 
        return eventId;
    
    /**
     * @param eventId the eventId to set
     */
    public void setEventId(String eventId) 
        this.eventId = eventId;
    
    /**
     * @return the eventType
     */
    public String getEventType() 
        return eventType;
    
    /**
     * @param eventType the eventType to set
     */
    public void setEventType(String eventType) 
        this.eventType = eventType;
    
    /**
     * @return the payload
     */
    public Map<String, Object> getPayload() 
        return payload;
    
    /**
     * @param payload the payload to set
     */
    public void setPayload(Map<String, Object> payload) 
        this.payload = payload;
    
    /**
     * @return the retryCount
     */
    public int getRetryCount() 
        return retryCount;
    
    /**
     * @param retryCount the retryCount to set
     */
    public void setRetryCount(int retryCount) 
        this.retryCount = retryCount;
    
    /**
     * @return the lastRetryTime
     */
    public Date getLastRetryTime() 
        return lastRetryTime;
    
    /**
     * @param lastRetryTime the lastRetryTime to set
     */
    public void setLastRetryTime(Date lastRetryTime) 
        this.lastRetryTime = lastRetryTime;
    
    /**
     * @return the startTime
     */
    public Date getStartTime() 
        return startTime;
    
    /**
     * @param startTime the startTime to set
     */
    public void setStartTime(Date startTime) 
        this.startTime = startTime;
    

 

定义一个接口

/**
 * 版权所有:
 * 项目名称:
 * 创建者: 
 * 创建日期: 2018年5月10日
 * 文件说明: 见类描述
 */
package com.huaxin.acws.component.message.service;

import com.huaxin.acws.component.message.model.MessageModel;

/**
 * 事件消息实际发送类
 * 
 * @author dengxf
 */
public interface MessageHandler 
    /**
     * 是否支持消息类型处理
     * 
     * @param eventType
     * @return
     */
    boolean isSupportEventType(String eventType);
    /**
     * 消息处理
     * @param message
     */
    void hander(MessageModel message);
    

定义一个抽象基类---通过2张表来做消息处理,实现补发等操作

/**
 * 版权所有:
 * 项目名称: 
 * 创建者: 
 * 创建日期: 2018年5月10日
 * 文件说明: 见类描述
 */
package com.huaxin.acws.component.message.service;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import javax.annotation.Resource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;

import com.alibaba.fastjson.JSONObject;
import com.huaxin.acws.common.exception.AcwsGenerelException;
import com.huaxin.acws.common.util.DateUtils;
import com.huaxin.acws.component.message.dao.WfoHiEventDao;
import com.huaxin.acws.component.message.dao.WfoRuEventDao;
import com.huaxin.acws.component.message.model.MessageModel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 消息生产者基类
 * @author 
 */
public abstract class AbstractMessageProducerService 
    private ExecutorService threadPool = Executors.newFixedThreadPool(5);
    
    /**
     * 日志记录对象
     */
    private static final Logger logger = LoggerFactory.getLogger(AbstractMessageProducerService.class);
            
    /**
     * 未发送事件消息
     */
    @Resource
    private WfoRuEventDao wfoRuEventDao;
    /**
     * 已发送事件消息
     */
    @Resource
    private WfoHiEventDao wfoHiEventDao;
    
    /**
     * 发送消息
     * 
     * @param message 消息对象
     * @param handler 消息实际发送对象
     */
    protected void sendMessage(final MessageModel message, final MessageHandler handler) 
        if (!message.isValid()) 
            throw new AcwsGenerelException("消息类型或内容为空,发送失败");
        
        
        message.setStartTime(new Date());
        
        //1、MessageModel 保存
        final Map<String, Object> record = new HashMap<String, Object>();
        record.put("EVENT_TYPE", message.getEventType());
        record.put("EVENT_PAYLOAD", JSONObject.toJSONString(message.getPayload(), true));
        record.put("RETRY_COUNT", 0);
        record.put("START_TIME", message.getStartTime());
        
        String messageId = wfoRuEventDao.save(record);
        message.setEventId(messageId);
        record.put("WFO_RU_EVENT_ID", messageId);
                
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() 
            @Override
            public void afterCommit() 
                //https://segmentfault.com/a/1190000004235193
                threadPool.execute(new Runnable() 
                    @Override
                    public void run() 
                        try 
                            //1、发送事件
                            handler.hander(message);
                            logger.info("messageId=,发送成功", message.getEventId());
                            
                            //2、处理成功事件迁移历史表
                            record.put("END_TIME", new Date());
                            wfoHiEventDao.save(record);
                            wfoRuEventDao.deleteByPrimaryKey(message.getEventId());
                         catch (RuntimeException e) 
                            logger.error("发送消息错误,EventId=" + message.getEventId(), e);
                                                
                    
                );
//                try 
//                    //1、发送事件
//                    handler.hander(message);
//                    logger.info("messageId=,发送成功", message.getEventId());
//                    
//                    //2、处理成功事件迁移历史表
//                    record.put("END_TIME", new Date());
//                    wfoHiEventDao.save(record);
//                    wfoRuEventDao.deleteByPrimaryKey(message.getEventId());
//                 catch (RuntimeException e) 
//                    logger.error("发送消息错误,EventId=" + message.getEventId(), e);
//                
            
        );
    
    
    /**
     * 发送成功归档
     * 
     * @param messageId
     */
    protected void archiveForSuccess(String messageId) 
        Map<String, String> ruRecord = wfoRuEventDao.getByPrimaryKey(messageId);
        ruRecord.put("END_TIME", DateUtils.formatDate(new Date(), DateUtils.DATE_FORMAT_YMDHMS));
        
        wfoHiEventDao.save(ruRecord);
        wfoRuEventDao.deleteByPrimaryKey(messageId);
    
    
    /**
     * 发送失败更新重发计数
     * 
     * @param messageId
     */
    protected void updateCountForFailed(String messageId) 
        Map<String, String> messageMap = wfoRuEventDao.getByPrimaryKey(messageId);
        String retryCount = messageMap.get("RETRY_COUNT");

        Map<String, Object> record = new HashMap<String, Object>();
        record.put("WFO_RU_EVENT_ID", messageId);
        record.put("RETRY_COUNT", Integer.valueOf(retryCount) + 1);
        record.put("LAST_RETRY_TIME", new Date());
        
        wfoRuEventDao.save(record);
    

 

业务实现类

/**
 * 版权所有:
 * 项目名称:
 * 创建者: 
 * 创建日期: 2019年2月12日
 * 文件说明: demo
 */
package com.huaxin.gxgc.gxprocess.service;

import com.alibaba.fastjson.JSONObject;
import com.huaxin.acws.bpm.event.ProcessCompletedEvent;
import com.huaxin.acws.common.exception.AcwsGenerelException;
import com.huaxin.acws.component.message.model.MessageModel;
import com.huaxin.acws.component.message.service.AbstractMessageProducerService;
import com.huaxin.acws.component.message.service.MessageHandler;
import com.huaxin.gxgc.mq.constant.MqEventTypeEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;

/**
 * 
 * @author diaoby
 *
 */
@Service
public class GxMqProducerService extends AbstractMessageProducerService implements MessageHandler
    
    /** 日志实例 */
    private static final Logger logger = LoggerFactory.getLogger(GxMqProducerService.class);

    /* (non-Javadoc)
     * @see com.huaxin.acws.component.message.service.MessageHandler#isSupportEventType(java.lang.String)
     * @author diaoby
     */
    @Override
    public boolean isSupportEventType(String eventType) 
        return MqEventTypeEnum.MqEventType.PROCESS_COMPLETED_EVENT_GXPROCESSDEMO_MQ.getValue().equals(eventType);
    

    /* (non-Javadoc)
     * @see com.huaxin.acws.component.message.service.MessageHandler#hander(com.huaxin.acws.component.message.model.MessageModel)
     * @author diaoby
     */
    @Override
    public void hander(MessageModel message) 
        Map<String, Object> payload = message.getPayload();
        ProcessCompletedEvent processCompletedEvent = null;
        Object object = payload.get("PROCESS_COMPLETED_EVENT");
        if( object instanceof JSONObject) 
            //ProcessCompletedEvent中 source 不能为空,随便设置一个值
            ((JSONObject) object).put("source", "1");
            JSONObject jsonobject = (JSONObject) object;
            processCompletedEvent = JSONObject.toJavaObject(jsonobject,ProcessCompletedEvent.class);
         else 
            processCompletedEvent = (ProcessCompletedEvent) object;
        
        //假设前10次都不成功
        if(message.getRetryCount() >= 10) 
            //业务操作
            logger.info("appId=", processCompletedEvent.getAppId());
            logger.info("instanceId=" +processCompletedEvent.getInstanceId());
        else 
            logger.error("发送消息错误,EventId=" + message.getEventId());
            throw new AcwsGenerelException("GxMqProducerService流程结束事件失败");
        
    
    /**
     * 流程结束后触发完成后触发mq
     * @param processCompletedEvent
     * @author diaoby
     */
    public void completedProcess(ProcessCompletedEvent processCompletedEvent)
        Map<String, Object> payload = new HashMap<String, Object>();
        payload.put("PROCESS_COMPLETED_EVENT", processCompletedEvent);
        MessageModel message = new MessageModel();
        message.setEventType(MqEventTypeEnum.MqEventType.PROCESS_COMPLETED_EVENT_GXPROCESSDEMO_MQ.getValue());
        message.setPayload(payload);
        super.sendMessage(message, this);
    
    

本例子中 ProcessCompletedEvent 是定义的一个 流程完成事件,流程完成事件后触发后续业务,通过流程也业务分离

业务调用处

    /* (non-Javadoc)
     * @see com.huaxin.gxgc.process.service.IProcessCompleted#completedProcess(com.huaxin.acws.bpm.event.ProcessCompletedEvent)
     * @author diaoby
     */
    @Override
    public void completedProcess(ProcessCompletedEvent processCompletedEvent) 
        gxMqProducerService.completedProcess(processCompletedEvent);
    

 

以上是关于简易版的生产者消费者实现业务异步事务分离的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ 事务型消息 + 异步扣减库存实现

RocketMQ 事务型消息 + 异步扣减库存实现

消费者实现应用内分布式事务

通过阻塞队列实现生产者和消费者异步解耦

基于ReentrantLock实现简易生产者消费者模型

RocketMQ 消息详解