在 Java 中动态创建异步消息队列

Posted

技术标签:

【中文标题】在 Java 中动态创建异步消息队列【英文标题】:Dynamically creating asynchronous message queues in Java 【发布时间】:2010-12-11 03:47:10 【问题描述】:

我需要。我的用例是通过多个 SMTP 服务器发送电子邮件:我需要强制发送到同一 SMTP 服务器的电子邮件是按顺序处理的,但发送到不同 SMTP 服务器的电子邮件可能会同时处理。我过去使用过 JMS,但据我所知,它只允许在编译时创建队列,而我需要在运行时创建队列(每个 SMTP 服务器一个队列)。

我是否遗漏了有关 JMS 的某些内容,或者是否有其他一些我应该查看的工具/建议?

【问题讨论】:

您是专门使用 JMS 还是可以使用 java.util.concurrent 及其 ExecutorServices 来做这件事? 我没有专门使用JMS,所以我会看看ExecutorServices,谢谢。 【参考方案1】:

我同意 Adam 的观点,用例听起来像 JMS 是开销。 Java 内置功能足够:

package de.mhaller;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;

import org.junit.Assert;
import org.junit.Test;

public class Mailer 

    @Test
    public void testMailer() throws Exception 
        ExecutorService executor = Executors.newCachedThreadPool();
        ArrayList<Mail> log = new ArrayList<Mail>();
        LinkedBlockingDeque<Mail> incoming = new LinkedBlockingDeque<Mail>();

        // TODO: Put mails to be sent into the incoming queue
        incoming.offer(new Mail("foo1@localhost", "localhost"));
        incoming.offer(new Mail("foo2@otherhost", "otherhost"));
        incoming.offer(new Mail("foo3@otherhost", "otherhost"));
        incoming.offer(new Mail("foo4@localhost", "localhost"));

        Map<Mailserver, Queue<Mail>> queues = new HashMap<Mailserver, Queue<Mail>>();
        while (!incoming.isEmpty()) 
            Mail mail = incoming.pollFirst();
            Mailserver mailserver = findMailserver(mail);
            if (!queues.containsKey(mailserver)) 
                ArrayDeque<Mail> serverQueue = new ArrayDeque<Mail>();
                queues.put(mailserver, serverQueue);
                executor.execute(new SendMail(mailserver, serverQueue));
            
            Queue<Mail> slot = queues.get(mailserver);
            slot.offer(mail);
        

        assertMailSentWithCorrectServer(log);
    

    private void assertMailSentWithCorrectServer(ArrayList<Mail> log) 
        for (Mail mail : log) 
            if (!mail.server.equals(mail.sentBy.mailserver)) 
                Assert.fail("Mail sent by wrong server: " + mail);
            
        
    

    private Mailserver findMailserver(Mail mail) 
        // TODO: Your lookup logic which server to use
        return new Mailserver(mail.server);
    

    private static class Mail 
        String recipient;
        String server;
        SendMail sentBy;

        public Mail(String recipient, String server) 
            this.recipient = recipient;
            this.server = server;
        

        @Override
        public String toString() 
            return "mail for " + recipient;
        
    

    public static class SendMail implements Runnable 

        private final Deque<Mail> queue;
        private final Mailserver mailserver;

        public SendMail(Mailserver mailserver, Deque<Mail> queue) 
            this.mailserver = mailserver;
            this.queue = queue;
        

        @Override
        public void run() 
            while (!queue.isEmpty()) 
                Mail mail = queue.pollFirst();
                // TODO: Use SMTP to send the mail via mailserver
                System.out.println(this + " sent " + mail + " via " + mailserver);
                mail.sentBy = this;
            
        

    

    public static class Mailserver 
        String hostname;

        public Mailserver(String hostname) 
            this.hostname = hostname;
        

        @Override
        public String toString() 
            return hostname;
        

        @Override
        public int hashCode() 
            return hostname.hashCode();
        

        @Override
        public boolean equals(Object obj) 
            return hostname.equals(((Mailserver) obj).hostname);
        

    


【讨论】:

【参考方案2】:

JMS 本身作为规范在这个问题上相当沉默。大多数实现都允许您这样做,只是不是通过 JMS 本身,而是使用它们自己的 API。但是您将无法将诸如 MDB 之类的正式的东西连接到动态队列。相反,您需要管理自己的连接和侦听器。

【讨论】:

【参考方案3】:

上次我们在 WebSphere 环境中研究这个问题时,动态创建队列非常困难/不可能(我认为临时队列对您来说太短暂了)。尽管存在用于创建队列的 API,但它们之后需要重新启动服务器才能激活。然后就是刚才提到的MDB问题。

如果所有问题都可以通过额外的间接层级来解决,假设可用的打印机集相对较小,那么根据格言来做一个肮脏的解决方法怎么样?

创建队列 Printer01 到 Printer99(或一些较小的数字)。有一个将队列映射到真实打印机的“数据库”。随着对打印机的请求出现,您可以添加到映射表中。您可能需要一些 MDB 的开销来查看永远不会使用的队列,但除非您的打印机数量巨大,否则您是否可以负担得起?

【讨论】:

【参考方案4】:

为每个 SMTP 服务器创建一个队列,并将队列使用者(MDB 或消息侦听器)限制为 1 个

【讨论】:

【参考方案5】:

我已经使用 activemq 完成了这项工作 - 实际上我当时发布了一个关于此的问题,因为我有类似的担忧(当时的 JMS 文档声明不支持此功能)并确信它是受支持的。

【讨论】:

您是否有指向您的问题或描述如何实现此目标的文档的链接?

以上是关于在 Java 中动态创建异步消息队列的主要内容,如果未能解决你的问题,请参考以下文章

MFC SDI中 如何为动态创建的按钮添加消息处理函数

MFC SDI中 如何为动态创建的按钮添加消息处理函数

OSChina 本周软件推荐 —— zbus 轻量级服务总线/消息队列

在 Laravel/Lumen 中动态创建的工作守护进程队列

以动态消息为参数的 gRPC

Java中动态声明与绑定Rabbit MQ队列以及延迟队列的实现与使用