185 - 基于消息队列的异步执行程序

Posted 分享牛

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了185 - 基于消息队列的异步执行程序相关的知识,希望对你有一定的参考价值。

在阅读异步执行器设计部分时,很清楚,架构受消息队列的启发。异步执行程序的设计方式是,可以轻松使用消息队列来接管线程池的作业和异步作业的处理。

基准测试表明,对于线程池支持的异步执行程序,使用消息队列是优越的吞吐量。然而,它确实带有一个额外的架构组件,这当然会使设置,维护和监视更加复杂。对于许多用户来说,线程池支持的异步执行器的性能绰绰有余。然而,知道如果所需的性能增长还有其他选择是很好的。

目前,开箱即用支持的唯一选项是Spring的JMS。之所以支持Spring是因为Spring有一些非常好的功能,可以减轻线程和处理多个消息使用者的痛苦。但是,集成非常简单,可以轻松移植到任何消息队列实现或协议(Stomp,AMPQ等)。对于下一个应用程序的反馈意见是值得赞赏的。

当引擎创建一个新的异步作业时,消息被放在一个包含作业标识符的消息队列中(在事务提交事务监听器中,所以我们确信作业条目在数据库中)。消息使用者然后使用该作业标识符来获取作业,并执行作业。异步执行程序不会再创建线程池。它将从一个单独的线程中插入和查询定时器。当一个定时器触发时,它被移动到异步作业表,现在意味着消息也被发送到消息队列。该复位过期的线程也将解开的工作像往常一样,消息队列,可能失败了。现在不要解锁工作,而是要重新发送一条消息。异步执行程序不会再轮询异步作业。

该实现由两个类组成:

  • org.flowable.engine.impl.asyncexecutor.JobManager接口的实现,它将消息放入消息队列中,而不是将消息传递给线程池。
  • javax.jms.MessageListener实现,它使用消息队列中的消息,使用消息中的作业标识来获取并执行作业。

首先,将flowable-jms-spring-executor依赖项添加到您的项目中:

<dependency>
<groupId>org.flowable</groupId>
<artifactId>flowable-jms-spring-executor</artifactId>
<version>$flowable.version</version>
</dependency>

要启用基于消息队列的异步执行程序,在进程引擎配置中,需要执行以下操作:

  • 像往常一样,asyncExecutorActivate必须设置为true
  • asyncExecutorMessageQueueMode需要设置为true
  • 该org.flowable.spring.executor.jms.MessageBasedJobManager必须注射的JobManager

以下是使用ActiveMQ作为消息队列代理的基于Java的配置的完整示例。

有些事情要注意:

  • 该MessageBasedJobManager需要一个JmsTemplate的被注入配置有一个正确的connectionFactory。
  • 我们使用Spring 的MessageListenerContainer概念,因为这简化了线程和多个消费者。
@Configuration
public class SpringJmsConfig 
@Bean
public DataSource dataSource() 
// Omitted

@Bean(name = "transactionManager")
public PlatformTransactionManager transactionManager() 
DataSourceTransactionManager transactionManager = new DataSourceTransactionManager();
transactionManager.setDataSource(dataSource());
return transactionManager;

@Bean
public SpringProcessEngineConfiguration processEngineConfiguration() 
SpringProcessEngineConfiguration configuration = new SpringProcessEngineConfiguration();
configuration.setDataSource(dataSource());
configuration.setTransactionManager(transactionManager());
configuration.setDatabaseSchemaUpdate(SpringProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE);
configuration.setAsyncExecutorMessageQueueMode(true);
configuration.setAsyncExecutorActivate(true);
configuration.setJobManager(jobManager());
return configuration;

@Bean
public ProcessEngine processEngine() 
return processEngineConfiguration().buildProcessEngine();

@Bean
public MessageBasedJobManager jobManager() 
MessageBasedJobManager jobManager = new MessageBasedJobManager();
jobManager.setJmsTemplate(jmsTemplate());
return jobManager;

@Bean
public ConnectionFactory connectionFactory() 
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
activeMQConnectionFactory.setUseAsyncSend(true);
activeMQConnectionFactory.setAlwaysSessionAsync(true);
return new CachingConnectionFactory(activeMQConnectionFactory);

@Bean
public JmsTemplate jmsTemplate() 
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setDefaultDestination(new ActiveMQQueue("flowable-jobs"));
jmsTemplate.setConnectionFactory(connectionFactory());
return jmsTemplate;

@Bean
public MessageListenerContainer messageListenerContainer() 
DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
messageListenerContainer.setConnectionFactory(connectionFactory());
messageListenerContainer.setDestinationName("flowable-jobs");
messageListenerContainer.setMessageListener(jobMessageListener());
messageListenerContainer.setConcurrentConsumers(2);
messageListenerContainer.start();
return messageListenerContainer;

@Bean
public JobMessageListener jobMessageListener() 
JobMessageListener jobMessageListener = new JobMessageListener();
jobMessageListener.setProcessEngineConfiguration(processEngineConfiguration());
return jobMessageListener;


在上面的代码中,JobMessageListener和MessageBasedJobManager是flowable -jms-spring-executor模块中唯一的类。所有其他的代码都是来自Spring。因此,当想要将其移植到其他队列/协议时,必须移植这些类。

上面文章来自盘古BPM研究院:http://vue.pangubpm.com/
文章翻译提交:https://github.com/qiudaoke/flowable-userguide
了解更多文章可以关注微信公众号:

以上是关于185 - 基于消息队列的异步执行程序的主要内容,如果未能解决你的问题,请参考以下文章

Python开发模块:Celery 分布式异步消息任务队列

Python开发模块:Celery 分布式异步消息任务队列

基于异步消息队列List lpush-brpop(rpush-blpop)

Celery 基本使用

Redis基于(ListPubSubStream消费者组)实现消息队列,基于Stream结构实现异步秒杀下单

Celery + Flower + FastAPI + RabbitMQ ,Python实现异步消息队列和监控