在@MessageDriven bean 中使用 amazon sqs - 池化/并行处理
Posted
技术标签:
【中文标题】在@MessageDriven bean 中使用 amazon sqs - 池化/并行处理【英文标题】:using amazon sqs in a @MessageDriven bean - pooling / parallel processing 【发布时间】:2017-05-01 12:18:28 【问题描述】:我们需要在我们的 Java EE 应用程序中使用队列,因为它是一个基于云的应用程序(部署在 OpenShift Online 上),我们喜欢使用 amazon sqs。
如果我正确理解 JMS / Java EE 接收部分的理论,@MessageDriven
bean 由 Java EE 容器管理,因此可以并行创建许多 bean 实例(根据最大池大小),如果传入消息的数量很高。这当然是处理高负载的一大好处。
但是,我看不出我们如何以这种方式将 aws sqs 集成到 Java EE 应用程序中。我知道http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-java-message-service-jms-client.html的异步接收器示例:
class MyListener implements MessageListener
@Override
public void onMessage(Message message)
try
// Cast the received message as TextMessage and print the text to screen.
if (message != null)
System.out.println("Received: " + ((TextMessage) message).getText());
catch (JMSException e)
e.printStackTrace();
然后:
// Create a consumer for the 'TestQueue'.
MessageConsumer consumer = session.createConsumer(queue);
// Instantiate and set the message listener for the consumer.
consumer.setMessageListener(new MyListener());
// Start receiving incoming messages.
connection.start();
这是官方的异步接收器示例 - 它不是 @MessageDriven
bean。很明显,我们需要在某个地方使用凭据进行身份验证(通过创建 SQSConnectionFactory,然后是连接,然后是会话 - 这在示例中也有很好的描述)。
但我强烈认为这个示例不会并行处理消息 - 即只有一个 bean 实例正在处理队列,这对于可扩展、高负载的应用程序来说不是一个好的解决方案。
a) 我们如何使用 Amazon SQS 实现真正的 Java EE 方式? 我只是找到了很多 Spring 的例子。但它必须是 Java EE 7。
b) 我们使用 Wildfly(当前为 8.2.1)。是否也可以让 Wildfly 在内部管理与 AWS 和应用程序的连接,我们可以像使用应用程序服务器管理队列一样使用队列(与数据库访问的数据源相同的方法)?
得到stdunbar回答后的结论:
我喜欢做的事情似乎不可能以“正确的方式”进行。所以我该怎么做?将ManagedExecutorService
实现为描述为“包装”队列的stdunbar? - 然而,这意味着也有一个本地队列,这对于应用程序来说不是一个好的情况,它应该是可扩展的?!
什么是替代品?我们在 OpenShift Online 上运行该应用程序。用例如实例化自己的装备可能会更好。 ApacheMQ 墨盒...当然有很多缺点,比如成本,我们负责“基础设施”。
说实话,在这种情况下,我对 AWS 真的很失望......
【问题讨论】:
【参考方案1】:通常,要使 MDB“连接”到远程 JMS Q,您需要一个资源适配器 (RA)。从理论上讲,这种基于 JMS 规范实现的 RA 应该与任何符合规范的 JMS 提供程序一起使用,因此理论上您应该能够重用例如 this implementation。
然而,正如上述项目的自述文件所说,您应该使用由特定 JMS 提供者提供的 RA,而不是通用的。不幸的是,亚马逊没有提供一个:(
然而,最近,some awesome guy 创建了一个unofficial open-source one。我只是在评估它,稍后会根据我的经验更新这个答案。 (非常欢迎来自此 RA 的其他用户的评论)
【讨论】:
嘿@morgwai 关于您如何将 generic-jms-ra 资源适配器用于 Java EE MDB 与 AWS SQS 的任何更新 我不打算使用通用的,而是我的回答中提到的非官方开源的。这是我下周的任务,敬请期待;) 有人用消息驱动 bean 和 AWS SQS 解决了这个问题吗?有什么例子吗?【参考方案2】:Payara Cloud Connectors 似乎很新,但看起来很有希望。不知道这是否也适用于其他容器。据我了解,它基于 JCA 适配器。
【讨论】:
【参考方案3】:我不认为我的解决方案是正确的 JAVA EE,但在我的情况下它可以工作。
配置:
@Singleton
public class SqsMessageManager
private Integer numberOfReceivers = 3;
public static SQSConnection connection = null;
public static Queue queue = null;
@Inject
SqsMessageReceiver sqsMessageReceiver;
public void init()
try
SQSConnectionFactory connectionFactory =
SQSConnectionFactory.builder()
.withRegion(Region.getRegion(Regions.EU_WEST_1))
.withAWSCredentialsProvider(new EnvironmentVariableCredentialsProvider())
.build();
connection = connectionFactory.createConnection();
queue = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createQueue("myQueue");
for (int i = 0; i < numberOfReceivers; i++)
connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(queue).setMessageListener(sqsMessageReceiver);
connection.start();
catch (JMSException e)
e.getStackTrace();
然后是发件人:
@Dependent
public class SqsMessageSender
MessageProducer producer = null;
Session senderSession = null;
@PostConstruct
public void createProducer()
try
// open new session and message producer
senderSession = SqsMessageManager.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = senderSession.createProducer(SqsMessageManager.queue);
catch(JMSException | NullPointerException e)
;
@PreDestroy
public void destroy()
try
// close session
producer.close();
senderSession.close();
catch(JMSException e)
// sends a message to aws sqs queue
public void sendMessage(String txt)
try
TextMessage textMessage = senderSession.createTextMessage(txt);
producer.send(textMessage);
catch (JMSException e)
e.getStackTrace();
接收者:
@Dependent
public class SqsMessageReceiver implements MessageListener
public void onMessage(Message inMessage)
...
【讨论】:
我不完全了解您的 numberOfReceivers 是如何工作的。你在同一个对象上创建了多个监听器(你注入sqsMessageReceiver
,实际上是一个实例)?
我已经完成了几乎相同的示例,但是如果在 SqsMessageReceiver 中声明 EntityManager 类,则会出现问题。当我喜欢持久化某些东西时,我会得到: javax.persistence.TransactionRequiredException: JBAS011469: Transaction is required to perform this operation(使用事务或扩展持久性上下文)。有谁知道如何解决这个问题?【参考方案4】:
根据一些老docs I found
容器允许消息驱动的 bean 类的多个实例同时运行,从而允许同时处理消息流。
通过使用 Amazon JMS 集成以及声明性 MDB,您应该可以开始了。我不会使用 setMessageListener 接口。由于您使用的是 Wildfly 8.x / EE7,因此您可以使用 JMS 的声明式版本:
@MessageDriven(activationConfig = /* your config - i.e. queue name, etc */ )
public class MyListener implements MessageListener
@Override
public void onMessage(Message message)
这允许容器根据需要创建尽可能多的实例。请注意,Wildfly 中可能需要对 JMS 参数进行一些调整。
附带说明,让 Amazon 库负责读取 SQS 队列。我已经开始滚动我自己的阅读器,认为我可以把它串起来。但我发现,您不能使用 AWS Java 库和从队列中读取的多个线程,因为您几乎每次都会得到重复读取。我有 4 个线程读取 SQS 队列,并会收到 4 条相同的消息。我终于改成一个阅读器,将消息放入 LinkedBlockingDeque 以供其他一些线程使用。
我展示的一切都是纯 Java/EE。
编辑 在玩弄了 Amazon SQS/JMS 集成之后,我觉得使用它是在浪费时间。它仅适用于 JMS 1.1,因此它使用旧的 JMS 语法和 JNDI。此外,它仅适用于队列,不适用于主题。
我强烈建议您创建自己的实现。一个 ManagedExecutorService,它运行具有短 SQS 读取超时的队列读取器线程。每次循环都会从 SQS 队列中读取消息并将消息放入 JMS 队列或主题中。
很抱歉让您对此抱有希望 - 亚马逊只是没有得到足够的维护以使其值得。
【讨论】:
谢谢你,stdunbar。我很高兴听到这应该可行 - 但是,我不知道如何发挥所需的参数(凭证)。我在哪里放置 AWS 凭证?我如何实现你的笔记As a side note, let the Amazon libraries take care of the reading of the SQS queue
? - 我认为使用@MessageDriven
进行注释会使应用程序服务器读取队列?! - 如果您能更具体地说明如何在代码中集成 AWS SQS,我将不胜感激。
我怀疑你是对的。这些都不是好消息。如果我理解正确,它只是“不是最新的”的 SDK,而不是服务本身? - 我用结论和进一步的问题更新了问题。
标有 b) 的问题仍未得到解答。为了获得更多的想法,我开始了赏金活动......到目前为止还是谢谢[+1]!以上是关于在@MessageDriven bean 中使用 amazon sqs - 池化/并行处理的主要内容,如果未能解决你的问题,请参考以下文章