消费者实现应用内分布式事务

Posted qwangxiao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消费者实现应用内分布式事务相关的知识,希望对你有一定的参考价值。

通过《消费者实现应用内分布式事务》、《生产者实现应用内分布式事务管理》、《实现持久订阅消费者》三个章节的实践,其实我们已经可以通过消息队列实现多应用的分布式事务,应用内的事务保证了消息不会被重复生产消费、持久化订阅保证了消息一定会被消费(进入死信队列特殊处理),但其对于业务来说耦合性还是太强,在进行业务处理的同时消息处理名,其采用的仍然是应用内的事务处理,并不适合在大型高性能高并发系统实践,那么本章将通过本地事务+消息队列+外部事件定义表+定时任务实现解耦。
(目前主要实现微服务下最终一致性事务的方案主要是:可靠事件;补偿模式;TCC(Try-Confirm-Cancel);三种,本案例为可靠事件方式)

场景描述:
本场景就拿最常用的转账业务阐述:
在工行ICBC有账号Card001,其中存于500元;
在中行BOC有账号Card002,其中也存有500元;
此时从Card001账号转账至Card002账号300元。

系统设计:
工行系统ICBCPro,该工程主要实现两个功能(实现转出金额生成转账事件;定时任务发出转账事件至消息队列),主要参考《生产者实现应用内分布式事务管理》实现;
中行系统BOCPro,该工程主要实现两个功能(从消息队列下载转账事件;定时任务对转账事件处理并更新转入账号金额),主要参考《消费者实现应用内分布式事务》实现;
此场景仅需要通过P2P消息模式即可。

构建ICBCPro工程
A、实现转出金额生成转账事件
1、构建数据库相关表以及基础数据:
转出账号数据

转出事件记录

消息队列

消息控制台

2、执行单元测试代码实现转账,此时账户扣除与转账事件记录均在本地事务内:
//ICBC中账户card001转出300元

[java] view plain copy
@Test
public void tranfer(){
EventLog eventLog = new EventLog();
eventLog.setAmount(new BigDecimal(300));
eventLog.setFromcard("card001");
eventLog.setTocard("card002");
eventLog.setEventstate(EventState.NEW);
eventLog.setTransferDate(new Date());
eventLogService.transfer(eventLog,new BigDecimal(300));
}

账户信息:

事件记录


B、定时任务发出转账事件至消息队列
对于事件记录表,我们可以定义一个定时任务,将所有的NEW状态事件全部发出,此时需要保证消息的可靠性,采用XA事务实现,但已经不影响我们业务的响应了,实现解耦、快速响应,下面贴出核心实现代码:
1、首选实现数据排它锁场景下的查询与更新:
[java] view plain copy
/**
* 在排它锁场景下数据更新,保证数据的可靠性
*/
@Override
public void updateEventstateById(String id, EventState eventState) {
EventLog eventLog=findEventLog4Update(id);
eventLog.setEventstate(eventState);
emJ1.merge(eventLog);
}

/**
* 实现排它锁查询
*/
@Override
public EventLog findEventLog4Update(String id){
EventLog eventLog=emJ1.find(EventLog.class, id, LockModeType.PESSIMISTIC_WRITE);
return eventLog;
}

2、在service定义查询所有NEW状态的事件、并采用XA事务管理NEW状态事件的发送与更新(为了验证了事务生效,设定了一个fromcard为空的数据触发异常),在异常情况下我们也需要保证countDownLatch执行,避免线程阻塞:
[java] view plain copy
@Service
public class EventLogService www.shashuiyule.cn{
@Autowired
private EventLogRepository eventLogRepository;
@Resource(name="jmsQueueMessagingTemplate")
private JmsMessagingTemplate jmsQueueMessagingTemplate;
@Autowired
@Qualifier("icbc2boc")
private Queue icbc2boc;
....

/**
* 根据eventstate获取EventLog数据集
* @param eventstate
* @return
*/
@Transactional(transactionManager="transactionManager1",propagation=Propagation.SUPPORTS,readOnly=true)
public List<EventLog> findByEventState(EventState eventstate){
return eventLogRepository.findByEventstate(eventstate);
}

/**
* XA事务
* @param id
* @param eventstate
*/
@Transactional(transactionManager="transactionManagerJTA",propagation=Propagation.REQUIRES_NEW)
public void transferToMQ(EventLog eventLog,EventState eventstate,CountDownLatch countDownLatch){
try {
System.out.println(Thread.currentThread().getName()+"本次处理数据:"+eventLog.getFromcard(www.ctxpty.com)+"、"+eventLog.getEventstate());
//再次数据库查询判断,此时用到排它锁--在两个定时任务连续执行,一旦出现程序提交事务命令至数据库,
//但数据库还未执行,此时我们全表查询的结果中当前数据行仍为修改前数据,故会造成重复消费
eventLog=eventLogRepository.findEventLog4Update(eventLog.getId());
if(EventState.Publish.equals(eventLog.getEventstate())){
System.out.println(Thread.currentThread().getName()+"数据:"+eventLog.getFromcard()+"无需处理");
return;
}
//payload
jmsQueueMessagingTemplate.convertAndSend(icbc2boc,eventLog);
eventLogRepository.updateEventstateById(eventLog.getId(), eventstate);
//构造异常场景验证XA事务
if(eventLog.getFromcard()==null){
System.out.println(Thread.currentThread().getName()+"数据异常,不处理");
System.out.println(1/0);
}else{
System.out.println(Thread.currentThread().getName()+":"+eventLog.getFromcard()+"数据处理成功");
}
} finally {
countDownLatch.countDown();
}
}
}
3、定义Job,实现转出任务,并通过线程池异步处理待处理事件集合,通过并发提高处理性能,通过countDownLatch保证了每个任务所有线程处理完成后启动下一次任务;
[html] view plain copy
/**
* 转出任务
* @author song
*/
@PersistJobDataAfterExecution
@DisallowConcurrentExecution //保证每次任务执行完毕,设置为串行执行
public class TransferJob extends QuartzJobBean {
private Logger logger=LoggerFactory.getLogger(TransferJob.class);
@Autowired
@Qualifier("quartzThreadPool")
private ThreadPoolTaskExecutor quartzThreadPool;
@Autowired
private EventLogService eventLogService;

@Override
protected void executeInternal(ww.hjshidpt.com JobExecutionContext jobExecutionContext) throws JobExecutionException {
logger.info("本次批处理开始");
//获取所有未发送状态的Event
List<EventLog> list=eventLogService.findByEventState(EventState.NEW);
//
final CountDownLatch countDownLatch=new CountDownLatch(list.size());

//遍历发送
for(final EventLog eventLog:list){
//通过线程池提交任务执行,大大提高处理集合效率
quartzThreadPool.submit(new Runnable() {

@Override
public void run() {
eventLogService.transferToMQ(eventLog,EventState.Publish,countDownLatch);
}
});
}

//保证所有线程执行完成后退出
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("本次批处理完成");
}
}
4、定义转出任务、触发器、调度器以及处理线程池:
[html] view plain copy
@Bean(name="tranferJob")
public JobDetailFactoryBean tranferJob(www.dashuju178.com){
JobDetailFactoryBean factoryBean=new JobDetailFactoryBean();
//定义任务类
factoryBean.setJobClass(TransferJob.class);
//表示任务完成之后是否依然保留到数据库,默认false
factoryBean.setDurability(true);
//为Ture时当Quartz服务被中止后,再次启动或集群中其他机器接手任务时会尝试恢复执行之前未完成的所有任务,默认false
factoryBean.setRequestsRecovery(true);
return factoryBean;
}

/**
* 注册job1的触发器
* @return
*/
@Bean(name="transferJobTrigger")
public CronTriggerFactoryBean transferJobTrigger(){
//触发器
CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
cronTriggerFactoryBean.setCronExpression("*/5 * * * * ?");
cronTriggerFactoryBean.setJobDetail(tranferJob().getObject());
//调度工厂实例化后,经过5秒开始执行调度
cronTriggerFactoryBean.setStartDelay(30000);
cronTriggerFactoryBean.setGroup("tranfer");
cronTriggerFactoryBean.setName("tranfer");
return cronTriggerFactoryBean;
}

/**
* 调度工厂,加载触发器,并设置自动启动、启动时延
* @return
*/
@Bean(name="transferSchedulerFactoryBean")
public SchedulerFactoryBean transferSchedulerFactoryBean(){
//调度工厂
SchedulerFactoryBean schedulerFactoryBean= new SchedulerFactoryBean();
schedulerFactoryBean.setConfigLocation(new ClassPathResource("quartz.properties"));
schedulerFactoryBean.setApplicationContextSchedulerContextKey("applicationContextKey");
//集群Cluster下设置dataSource
// schedulerFactoryBean.setDataSource(dataSource);
//QuartzScheduler启动时更新己存在的Job,不用每次修改targetObject后删除qrtz_job_details表对应记录了
schedulerFactoryBean.setOverwriteExistingJobs(true);
//QuartzScheduler延时启动20S,应用启动完后 QuartzScheduler 再启动
schedulerFactoryBean.setStartupDelay(20);
//自动启动
schedulerFactoryBean.setAutoStartup(true);
schedulerFactoryBean.setTriggers(transferJobTrigger().getObject());
//自定义的JobFactory解决job中service的bean注入
schedulerFactoryBean.setJobFactory(jobFactory);
return schedulerFactoryBean;
}

/**
* 用于处理待转账数据发至消息队列的线程池
* @return
*/
@Bean(name="quartzThreadPool")
public ThreadPoolTaskExecutor getThreadPoolTaskExecutor(){
ThreadPoolTaskExecutor pool=new ThreadPoolTaskExecutor();
pool.setCorePoolSize(10);
pool.setQueueCapacity(100);
pool.setMaxPoolSize(10);
pool.setKeepAliveSeconds(10);
//避免应用关闭,任务没有执行完成,起到shutdownhook钩子的作用
pool.setWaitForTasksToCompleteOnShutdown(true);
//空闲时核心线程也不退出
pool.setAllowCoreThreadTimeOut(false);
//设置拒绝策略,不可执行的任务将被抛弃
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
return pool;
}

小结
特别注意:
1、周期时间刚好两个定时任务连续执行,出现java程序提交事务紧接第二个任务启动,但数据库未完成命令,此时后续任务已经查询数据,全表过滤能够再次获取未提交数据行原始数据,会造成二次消费,故需要对其采用排它锁方式,二次查询判断后决定是否消费,从而规避二次消费问题;
2、注意在Service中不能随便catch异常,避免分布式事务未回滚,造成重复消费;
3、通过CountDownLatch,实现任务线程等待所有的子任务线程执行完毕后方可退出本次任务,执行下一个任务,故其一定要在finally中实现countdown,避免造成任务线程阻塞;
4、需要设置OpenEntityManagerInViewInterceptor拦截器,避免提示session过早关闭问题;
5、数据库DataSource必须定义好destroyMethod,避免程序关闭,事务还未提交的情况下出现连接池已经关闭;
6、设置好连接池需要等待已提交任务完成后方可shutdown;

优化空间:
1、根据数据特征进行任务分割,比如自增ID场景下,根据0、1、2等最后一位尾数分割不同的定时任务,配置任务集群,从而实现分布式高可用集群处理;
2、在数据查询处理过程中,优化sql,提高单次查询性能;
3、添加独立的定时任务,将Publish已消费数据转储,减轻单表压力;
4、目前已经加入线程池异步处理数据集合,提高单次任务执行效率;
5、一旦数据库压力比较大的情况下,也可以将Event分库操作,减轻服务器数据库连接、IO压力;
6、采用微服务模式,将两个功能实现服务分离;
7、也可以在定时任务中添加比如50MS的sleep时长,保证数据库服务器端事务提交成功,取消排它锁将进一步提高性能较小数据库死锁问题;

遗留问题:
1、在开发环境下,手动关闭程序MQ连接会过早关闭,修改数据后事务未提交,出现mysql数据库行已经被执行排他锁;

构建BOCPro工程
A、从消息队列下载转账事件
1、构建数据库BOC数据库相关表以及基础数据:

事件表暂时为空

消息队列,有一条转账数据


2、配置队列消息模板:
[html] view plain copy
@Configuration
public class JmsMessageConfiguration {
@Autowired
@Qualifier(value="jmsQueueTemplate")
private JmsTemplate jmsQueueTemplate;

/**
* 定义点对点队列
* @return
*/
@Bean(name="icbc2boc")
public Queue queue() {
return new ActiveMQQueue("icbc2boc");
}

/**
* 创建处理队列消息模板
* @return
*/
@Bean(name="jmsQueueMessagingTemplate")
public JmsMessagingTemplate jmsQueueMessagingTemplate() {
JmsMessagingTemplate jmsMessagingTemplate =new JmsMessagingTemplate(jmsQueueTemplate);
//通过MappingJackson2MessageConverter实现Object转换
jmsMessagingTemplate.setMessageConverter(new MappingJackson2MessageConverter());
return new JmsMessagingTemplate(jmsQueueTemplate);
}

}
3、配置监听器,监听转账事件消息:
[html] view plain copy
@Component
public class TraferIn {

@Autowired
@Qualifier("icbc2boc")
private Queue queue;
@Autowired
@Qualifier("jmsQueueMessagingTemplate")
private JmsMessagingTemplate jmsQueueMessagingTemplate;
@Autowired
private EventLogService eventLogService;

/**
* 定义监听转账事件监听
* @param text
* @throws Exception
*/
@JmsListener(destination = "icbc2boc",containerFactory="jmsListenerContainerFactory4Queue")//ActiveMQ.DLQ
public void receiveQueue(EventLog eventLog) throws Exception {
System.out.println("接受到的事件数据:"+eventLog.toString());
eventLogService.mq2transfer(eventLog, new BigDecimal(300));
}
}

4、采用分布式事务管理下载的消息队列事件,模拟事务失效,验证成功:
[html] view plain copy
/**
* XA事务
* @param eventLog
* @param amount
*/
@Transactional(transactionManager="transactionManagerJTA",propagation=Propagation.REQUIRED)
public void mq2transfer(EventLog eventLog,BigDecimal amount){
//保存事件日志
eventLogRepository.saveEvetLog(eventLog);
// System.out.println(1/0);
}
5、需要采用XA事务,故我们不能直接通过EventLogRepository保存数据,定义自定义保存方法:
[html] view plain copy
/**
* 采用分布式事务数据源保存事件
*/
@Override
public EventLog saveEvetLog(EventLog eventLog) {
return emJ1.merge(eventLog);
}
6、启动程序监听后,收到事件

数据库添加了一条NEW状态事件

消费后消息队列被清空

 

 

B、定时任务对转账事件处理并更新转入账号金额

以上是关于消费者实现应用内分布式事务的主要内容,如果未能解决你的问题,请参考以下文章

给予消息队列实现分布式事务

求救,分布式事务怎么处理

RocketMQ解决分布式事务

MySQL 中基于 XA 实现的分布式事务

调用dubbo服务时事务配置在哪

rabbit_mq实现分布式事务