185 - 基于消息队列的异步执行程序
Posted 分享牛
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了185 - 基于消息队列的异步执行程序相关的知识,希望对你有一定的参考价值。
在阅读异步执行器设计部分时,很清楚,架构受消息队列的启发。异步执行程序的设计方式是,可以轻松使用消息队列来接管线程池的作业和异步作业的处理。
基准测试表明,对于线程池支持的异步执行程序,使用消息队列是优越的吞吐量。然而,它确实带有一个额外的架构组件,这当然会使设置,维护和监视更加复杂。对于许多用户来说,线程池支持的异步执行器的性能绰绰有余。然而,知道如果所需的性能增长还有其他选择是很好的。
目前,开箱即用支持的唯一选项是Spring的JMS。之所以支持Spring是因为Spring有一些非常好的功能,可以减轻线程和处理多个消息使用者的痛苦。但是,集成非常简单,可以轻松移植到任何消息队列实现或协议(Stomp,AMPQ等)。对于下一个应用程序的反馈意见是值得赞赏的。
当引擎创建一个新的异步作业时,消息被放在一个包含作业标识符的消息队列中(在事务提交事务监听器中,所以我们确信作业条目在数据库中)。消息使用者然后使用该作业标识符来获取作业,并执行作业。异步执行程序不会再创建线程池。它将从一个单独的线程中插入和查询定时器。当一个定时器触发时,它被移动到异步作业表,现在意味着消息也被发送到消息队列。该复位过期的线程也将解开的工作像往常一样,消息队列,可能失败了。现在不要解锁工作,而是要重新发送一条消息。异步执行程序不会再轮询异步作业。
该实现由两个类组成:
- org.flowable.engine.impl.asyncexecutor.JobManager接口的实现,它将消息放入消息队列中,而不是将消息传递给线程池。
- javax.jms.MessageListener实现,它使用消息队列中的消息,使用消息中的作业标识来获取并执行作业。
首先,将flowable-jms-spring-executor依赖项添加到您的项目中:
<dependency>
<groupId>org.flowable</groupId>
<artifactId>flowable-jms-spring-executor</artifactId>
<version>$flowable.version</version>
</dependency>
要启用基于消息队列的异步执行程序,在进程引擎配置中,需要执行以下操作:
- 像往常一样,asyncExecutorActivate必须设置为true
- asyncExecutorMessageQueueMode需要设置为true
- 该org.flowable.spring.executor.jms.MessageBasedJobManager必须注射的JobManager
以下是使用ActiveMQ作为消息队列代理的基于Java的配置的完整示例。
有些事情要注意:
- 该MessageBasedJobManager需要一个JmsTemplate的被注入配置有一个正确的connectionFactory。
- 我们使用Spring 的MessageListenerContainer概念,因为这简化了线程和多个消费者。
@Configuration
public class SpringJmsConfig
@Bean
public DataSource dataSource()
// Omitted
@Bean(name = "transactionManager")
public PlatformTransactionManager transactionManager()
DataSourceTransactionManager transactionManager = new DataSourceTransactionManager();
transactionManager.setDataSource(dataSource());
return transactionManager;
@Bean
public SpringProcessEngineConfiguration processEngineConfiguration()
SpringProcessEngineConfiguration configuration = new SpringProcessEngineConfiguration();
configuration.setDataSource(dataSource());
configuration.setTransactionManager(transactionManager());
configuration.setDatabaseSchemaUpdate(SpringProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE);
configuration.setAsyncExecutorMessageQueueMode(true);
configuration.setAsyncExecutorActivate(true);
configuration.setJobManager(jobManager());
return configuration;
@Bean
public ProcessEngine processEngine()
return processEngineConfiguration().buildProcessEngine();
@Bean
public MessageBasedJobManager jobManager()
MessageBasedJobManager jobManager = new MessageBasedJobManager();
jobManager.setJmsTemplate(jmsTemplate());
return jobManager;
@Bean
public ConnectionFactory connectionFactory()
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
activeMQConnectionFactory.setUseAsyncSend(true);
activeMQConnectionFactory.setAlwaysSessionAsync(true);
return new CachingConnectionFactory(activeMQConnectionFactory);
@Bean
public JmsTemplate jmsTemplate()
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setDefaultDestination(new ActiveMQQueue("flowable-jobs"));
jmsTemplate.setConnectionFactory(connectionFactory());
return jmsTemplate;
@Bean
public MessageListenerContainer messageListenerContainer()
DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
messageListenerContainer.setConnectionFactory(connectionFactory());
messageListenerContainer.setDestinationName("flowable-jobs");
messageListenerContainer.setMessageListener(jobMessageListener());
messageListenerContainer.setConcurrentConsumers(2);
messageListenerContainer.start();
return messageListenerContainer;
@Bean
public JobMessageListener jobMessageListener()
JobMessageListener jobMessageListener = new JobMessageListener();
jobMessageListener.setProcessEngineConfiguration(processEngineConfiguration());
return jobMessageListener;
在上面的代码中,JobMessageListener和MessageBasedJobManager是flowable -jms-spring-executor模块中唯一的类。所有其他的代码都是来自Spring。因此,当想要将其移植到其他队列/协议时,必须移植这些类。
上面文章来自盘古BPM研究院:http://vue.pangubpm.com/
文章翻译提交:https://github.com/qiudaoke/flowable-userguide
了解更多文章可以关注微信公众号:
以上是关于185 - 基于消息队列的异步执行程序的主要内容,如果未能解决你的问题,请参考以下文章
基于异步消息队列List lpush-brpop(rpush-blpop)