在@MessageDriven bean 中使用 amazon sqs - 池化/并行处理

Posted

技术标签:

【中文标题】在@MessageDriven bean 中使用 amazon sqs - 池化/并行处理【英文标题】:using amazon sqs in a @MessageDriven bean - pooling / parallel processing 【发布时间】:2017-05-01 12:18:28 【问题描述】:

我们需要在我们的 Java EE 应用程序中使用队列,因为它是一个基于云的应用程序(部署在 OpenShift Online 上),我们喜欢使用 amazon sqs。

如果我正确理解 JMS / Java EE 接收部分的理论,@MessageDriven bean 由 Java EE 容器管理,因此可以并行创建许多 bean 实例(根据最大池大小),如果传入消息的数量很高。这当然是处理高负载的一大好处。

但是,我看不出我们如何以这种方式将 aws sqs 集成到 Java EE 应用程序中。我知道http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-java-message-service-jms-client.html的异步接收器示例:

class MyListener implements MessageListener 

    @Override
    public void onMessage(Message message) 
        try 
            // Cast the received message as TextMessage and print the text to screen.
            if (message != null) 
                System.out.println("Received: " + ((TextMessage) message).getText());
            
         catch (JMSException e) 
            e.printStackTrace();
        
    

然后:

// Create a consumer for the 'TestQueue'.
MessageConsumer consumer = session.createConsumer(queue);

// Instantiate and set the message listener for the consumer.
consumer.setMessageListener(new MyListener());

// Start receiving incoming messages.
connection.start();

这是官方的异步接收器示例 - 它不是 @MessageDriven bean。很明显,我们需要在某个地方使用凭据进行身份验证(通过创建 SQSConnectionFactory,然后是连接,然后是会话 - 这在示例中也有很好的描述)。 但我强烈认为这个示例不会并行处理消息 - 即只有一个 bean 实例正在处理队列,这对于可扩展、高负载的应用程序来说不是一个好的解决方案。

a) 我们如何使用 Amazon SQS 实现真正的 Java EE 方式? 我只是找到了很多 Spring 的例子。但它必须是 Java EE 7。

b) 我们使用 Wildfly(当前为 8.2.1)。是否也可以让 Wildfly 在内部管理与 AWS 和应用程序的连接,我们可以像使用应用程序服务器管理队列一样使用队列(与数据库访问的数据源相同的方法)?

得到stdunbar回答后的结论: 我喜欢做的事情似乎不可能以“正确的方式”进行。所以我该怎么做?将ManagedExecutorService 实现为描述为“包装”队列的stdunbar? - 然而,这意味着也有一个本地队列,这对于应用程序来说不是一个好的情况,它应该是可扩展的?! 什么是替代品?我们在 OpenShift Online 上运行该应用程序。用例如实例化自己的装备可能会更好。 ApacheMQ 墨盒...当然有很多缺点,比如成本,我们负责“基础设施”。

说实话,在这种情况下,我对 AWS 真的很失望......

【问题讨论】:

【参考方案1】:

通常,要使 MDB“连接”到远程 JMS Q,您需要一个资源适配器 (RA)。从理论上讲,这种基于 JMS 规范实现的 RA 应该与任何符合规范的 JMS 提供程序一起使用,因此理论上您应该能够重用例如 this implementation。

然而,正如上述项目的自述文件所说,您应该使用由特定 JMS 提供者提供的 RA,而不是通用的。不幸的是,亚马逊没有提供一个:(

然而,最近,some awesome guy 创建了一个unofficial open-source one。我只是在评估它,稍后会根据我的经验更新这个答案。 (非常欢迎来自此 RA 的其他用户的评论)

【讨论】:

嘿@morgwai 关于您如何将 generic-jms-ra 资源适配器用于 Java EE MDB 与 AWS SQS 的任何更新 我不打算使用通用的,而是我的回答中提到的非官方开源的。这是我下周的任务,敬请期待;) 有人用消息驱动 bean 和 AWS SQS 解决了这个问题吗?有什么例子吗?【参考方案2】:

Payara Cloud Connectors 似乎很新,但看起来很有希望。不知道这是否也适用于其他容器。据我了解,它基于 JCA 适配器。

【讨论】:

【参考方案3】:

我不认为我的解决方案是正确的 JAVA EE,但在我的情况下它可以工作。

配置:

@Singleton
public class SqsMessageManager

    private Integer numberOfReceivers = 3;

    public static SQSConnection connection = null;
    public static Queue queue = null;

    @Inject
    SqsMessageReceiver sqsMessageReceiver;

    public void init()
    
        try
        
            SQSConnectionFactory connectionFactory =
                    SQSConnectionFactory.builder()
                            .withRegion(Region.getRegion(Regions.EU_WEST_1))
                            .withAWSCredentialsProvider(new EnvironmentVariableCredentialsProvider())
                            .build();

            connection = connectionFactory.createConnection();

            queue = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createQueue("myQueue");

            for (int i = 0; i < numberOfReceivers; i++)
                connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(queue).setMessageListener(sqsMessageReceiver);

            connection.start();
        
        catch (JMSException e)
        
            e.getStackTrace();
        
    

然后是发件人:

@Dependent
public class SqsMessageSender

    MessageProducer producer = null;
    Session senderSession = null;

    @PostConstruct
    public void createProducer()
        try
        
            // open new session and message producer
            senderSession = SqsMessageManager.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            producer = senderSession.createProducer(SqsMessageManager.queue);
        catch(JMSException | NullPointerException e)
            ;
        
    

    @PreDestroy
    public void destroy()
        try
        
            // close session
            producer.close();
            senderSession.close();
        catch(JMSException e)

        
    

    // sends a message to aws sqs queue
    public void sendMessage(String txt)
    
        try
        
            TextMessage textMessage = senderSession.createTextMessage(txt);
            producer.send(textMessage);
        
        catch (JMSException e)
        
            e.getStackTrace();
        
    

接收者:

@Dependent
public class SqsMessageReceiver implements MessageListener

    public void onMessage(Message inMessage) 
        ...
    

【讨论】:

我不完全了解您的 numberOfReceivers 是如何工作的。你在同一个对象上创建了多个监听器(你注入sqsMessageReceiver,实际上是一个实例)? 我已经完成了几乎相同的示例,但是如果在 SqsMessageReceiver 中声明 EntityManager 类,则会出现问题。当我喜欢持久化某些东西时,我会得到: javax.persistence.TransactionRequiredException: JBAS011469: Transaction is required to perform this operation(使用事务或扩展持久性上下文)。有谁知道如何解决这个问题?【参考方案4】:

根据一些老docs I found

容器允许消息驱动的 bean 类的多个实例同时运行,从而允许同时处理消息流。

通过使用 Amazon JMS 集成以及声明性 MDB,您应该可以开始了。我不会使用 setMessageListener 接口。由于您使用的是 Wildfly 8.x / EE7,因此您可以使用 JMS 的声明式版本:

@MessageDriven(activationConfig =  /* your config - i.e. queue name, etc */ )
public class MyListener implements MessageListener 
    @Override
    public void onMessage(Message message) 
    

这允许容器根据需要创建尽可能多的实例。请注意,Wildfly 中可能需要对 JMS 参数进行一些调整。

附带说明,让 Amazon 库负责读取 SQS 队列。我已经开始滚动我自己的阅读器,认为我可以把它串起来。但我发现,您不能使用 AWS Java 库和从队列中读取的多个线程,因为您几乎每次都会得到重复读取。我有 4 个线程读取 SQS 队列,并会收到 4 条相同的消息。我终于改成一个阅读器,将消息放入 LinkedBlockingDeque 以供其他一些线程使用。

我展示的一切都是纯 Java/EE。

编辑 在玩弄了 Amazon SQS/JMS 集成之后,我觉得使用它是在浪费时间。它仅适用于 JMS 1.1,因此它使用旧的 JMS 语法和 JNDI。此外,它仅适用于队列,不适用于主题。

我强烈建议您创建自己的实现。一个 ManagedExecutorService,它运行具有短 SQS 读取超时的队列读取器线程。每次循环都会从 SQS 队列中读取消息并将消息放入 JMS 队列或主题中。

很抱歉让您对此抱有希望 - 亚马逊只是没有得到足够的维护以使其值得。

【讨论】:

谢谢你,stdunbar。我很高兴听到这应该可行 - 但是,我不知道如何发挥所需的参数(凭证)。我在哪里放置 AWS 凭证?我如何实现你的笔记As a side note, let the Amazon libraries take care of the reading of the SQS queue? - 我认为使用@MessageDriven 进行注释会使应用程序服务器读取队列?! - 如果您能更具体地说明如何在代码中集成 AWS SQS,我将不胜感激。 我怀疑你是对的。这些都不是好消息。如果我理解正确,它只是“不是最新的”的 SDK,而不是服务本身? - 我用结论和进一步的问题更新了问题。 标有 b) 的问题仍未得到解答。为了获得更多的想法,我开始了赏金活动......到目前为止还是谢谢[+1]!

以上是关于在@MessageDriven bean 中使用 amazon sqs - 池化/并行处理的主要内容,如果未能解决你的问题,请参考以下文章

@MessageDriven 事务和重新传递语义

Java面试题——随机题,看看哪些你会哪些你不会

如何使用后面的代码创建自定义注释

如何使 MDB 激活规范上下文属性可配置?

在过滤器 bean 类中使用一些 bean?

spring工具类中注入使用bean