使用 Play 框架的 JMS/ActiveMQ 异常
Posted
技术标签:
【中文标题】使用 Play 框架的 JMS/ActiveMQ 异常【英文标题】:JMS/ActiveMQ exception using with Play framework 【发布时间】:2016-12-10 22:50:52 【问题描述】:我正在开发一个使用 ActiveMQ 和 Play Framework v2.4.2(Java 版本)的消息传递系统来向最终用户发送电子邮件。我是 JMS/ActiveMQ 技术的新手。 我只是在 ActiveMQ 站点使用this Hello World example 作为起点。
我创建了一个如下的测试类来测试使用 Play Framework 运行 ActiveMQ,一切正常:
public class ActiveMQMailApp
public static void main(String[] args) throws Exception
setup();
MailConsumer.initService();
for (int i =0;i<11;i++) MailProducer.sendMail(fakeMail());
public static void setup()
FakeApplication fakeApplication = Helpers.fakeApplication();
Helpers.start(fakeApplication);
private static Mail fakeMail() throws InterruptedException
Thread.sleep(1000);
SimpleDateFormat sdf = new SimpleDateFormat("YYYY-MM-dd hh:mm:ss");
return new Mail( "noreply@abc.com", "receiver@gmail.com", "A Test Email", "<html><body><p>Date: <b> "+sdf.format(new Date())+" </b></p></body></html>");
但是当我在主应用程序中使用这个确切的代码时,抛出了这个异常:
javax.jms.JMSException: Could not create Transport. Reason: java.lang.RuntimeException: Fatally failed to create SystemUsageorg/apache/activemq/protobuf/BufferInputStream
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:36)
at org.apache.activemq.ActiveMQConnectionFactory.createTransport(ActiveMQConnectionFactory.java:332)
at org.apache.activemq.ActiveMQConnectionFactory.createActiveMQConnection(ActiveMQConnectionFactory.java:345)
at org.apache.activemq.ActiveMQConnectionFactory.createActiveMQConnection(ActiveMQConnectionFactory.java:303)
at org.apache.activemq.ActiveMQConnectionFactory.createConnection(ActiveMQConnectionFactory.java:243)
at ir.iais.salary.services.MailProducer.run(MailProducer.java:35)
Caused by: java.lang.RuntimeException: Fatally failed to create SystemUsageorg/apache/activemq/protobuf/BufferInputStream
at org.apache.activemq.broker.BrokerService.getSystemUsage(BrokerService.java:1159)
... 5 more
Caused by: java.io.IOException: org/apache/activemq/protobuf/BufferInputStream
at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:39)
... 11 more
Caused by: java.lang.NoClassDefFoundError: org/apache/activemq/protobuf/BufferInputStream
at org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter.<init>(KahaDBPersistenceAdapter.java:65)
... 13 more
Caused by: java.lang.ClassNotFoundException: org.apache.activemq.protobuf.BufferInputStream
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
我的 MailProducer 和 MailConsumer 类是这样的:
public class MailProducer implements Runnable
public static final String AMQ_MAIL_QUEUE = "MAIL";
public static final String BROKER_URL = "vm://localhost?broker.useJmx=false&persistent=false";
private Mail mail;
public MailProducer(Mail mail)
this.mail = mail;
public static void sendMail(Mail mail)
Thread brokerThread = new Thread(new MailProducer(mail));
brokerThread.setDaemon(false);
brokerThread.start();
@Override
public void run()
try
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue(AMQ_MAIL_QUEUE);
// Create a MessageProducer from the Session to the Topic or Queue
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Create a messages
TextMessage textMessage = session.createTextMessage(new Gson().toJson(mail));
// Tell the producer to send the message
System.out.println("Sent message: "+ new Gson().toJson(mail) + " : " + Thread.currentThread().getName());
producer.send(textMessage);
// Clean up
session.close();
connection.close();
catch (Exception e)
System.out.println("Caught: " + e);
e.printStackTrace();
public class MailConsumer implements Runnable, ExceptionListener
private static final Logger logger = getLogger(MailConsumer.class);
private static Thread mailConsumerService;
public static synchronized void initService()
MailConsumer mailConsumer = Play.application().injector().instanceOf(MailConsumer.class);
if (mailConsumerService != null)
logger.info("STOPPING MailConsumer thread.");
mailConsumerService.interrupt();
logger.info("Starting MailConsumer thread.");
mailConsumerService = new Thread(mailConsumer);
mailConsumerService.setDaemon(true);
mailConsumerService.setName("MailConsumer Service");
mailConsumerService.start();
logger.info("MailConsumer thread started.");
@Inject
private MailerClient mailerClient;
@Override
public void run()
try
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(MailProducer.BROKER_URL);
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
connection.setExceptionListener(this);
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue(MailProducer.AMQ_MAIL_QUEUE);
// Create a MessageConsumer from the Session to the Topic or Queue
MessageConsumer consumer = session.createConsumer(destination);
while (!Thread.currentThread().isInterrupted())
// Wait for a message
Message message = consumer.receive();
if (message instanceof TextMessage)
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Received: " + text);
Mail mail = new Gson().fromJson(text, Mail.class);
Email email = new Email();
email.setFrom(mail.getFrom());
email.setTo(mail.getTo());
email.setSubject(mail.getSubject());
email.setBodyHtml(mail.getBodyHtml());
System.out.println("sending email...");
mailerClient.send(email);
System.out.println("email sent!");
else
System.out.println("Received: " + message);
logger.info("message type: "+message.getClass().getSimpleName());
logger.info("MailConsumer interrupted.");
consumer.close();
session.close();
connection.close();
catch (Exception e)
if (e instanceof InterruptedException)
logger.info("MailConsumer thread interrupted.");
else
logger.error(e.getLocalizedMessage(), e);
public synchronized void onException(JMSException ex)
System.out.println("JMS Exception occured. Shutting down client.");
logger.error("ErrorCode=" + ex.getErrorCode() + " , " + ex.getMessage(), ex);
我在主应用程序中这样调用 MailProducer:
public Result sendTestMail()
if(!DevStatus.gI().isInDebugMode()) return badRequest("You'r not in Development Env.");
SimpleDateFormat sdf = new SimpleDateFormat("YYYY-MM-dd hh:mm:ss");
Mail mail = new Mail("noreply@abc.com", "receiver@gmail.com", "A Test Email", "<html><body><p>Date: <b> " + sdf.format(new Date()) + " </b></p></body></html>");
MailProducer.sendMail(mail);
return ok("email sent! "+ sdf.format(new Date()));
看来问题是org.apache.activemq.protobuf.BufferInputStream
不在类路径中。我在 build.sbt 中添加了"org.apache.activemq.protobuf" % "activemq-protobuf" % "1.1"
,但没有任何改变。我还通过将 persistent=false
添加到代理 URI 来禁用 ActiveMQ 持久性,但它不起作用。
我能做什么?将 ActiveMQ 与 Play Framework 一起用作 JMS 是否有意义?或者有一些更好的 JMS 可以与 Play Framework 一起使用?阿卡呢?!!
编辑:我的 ActiveMQ 相关部门是:
"org.apache.activemq" % "activemq-broker" % "5.13.4",
"org.apache.activemq" % "activemq-client" % "5.13.4",
"org.apache.activemq" % "activemq-kahadb-store" % "5.13.4",
"org.apache.activemq.protobuf" % "activemq-protobuf" % "1.1",
编辑 2:我用 "org.apache.activemq" % "activemq-all" % "5.14.0"
替换了上面的依赖项,主应用程序开始工作!我首先认为问题已经解决并且与 ActiveMQ 包有关,但我意识到 ActiveMQMailApp 测试类现在抛出与上面相同的异常!我在一个新的简单 maven 项目(不是 play 框架)中运行了这个测试类,一切正常!恐怕这个错误稍后会再次出现。到底发生了什么?!
【问题讨论】:
请提供依赖列表 我知道它不能回答您的问题。不使用akka或只是未来并使用它发送电子邮件的理由是什么?此外,您似乎在每次线程运行时都在创建 activemq 连接工厂 - 如果您获得一个实例并在需要时重用它,那就更好了。此外,当您使用 activemq-all 依赖项时会发生什么 - 您显示的异常表明缺少 protobuf 和 kahadb 依赖项(来自 SystemUsage 的 getPersistenceAdapter())。暂时关闭持久性,直到应用正常运行,然后再使用持久性。 @youhans 只是为了确保:您是否运行过sbt update
或activator update
?关于 Akka/ActiveMQ 问题,看看this 是否有帮助。除非需要使用 ActiveMQ,否则您应该尝试 Akka
@alihaider,我需要这个用例的队列持久性,所以我需要一些可持久的队列,比如 JMS 代理。现在在每个消费请求中创建 activemq 连接工厂不是问题,因为消息的数量很少。最后我将依赖项更改为 activemq-all 并且发生了一些奇怪的事情。我将在修改后的问题中解释它们。
【参考方案1】:
将此依赖项添加到您的 pom 中。
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-kahadb-store</artifactId>
<scope>runtime</scope>
</dependency>
请参阅this 问题报告。
【讨论】:
我将activemq-kahadb-store
依赖项更改为 "org.apache.activemq" % "activemq-kahadb-store" % "5.13.4" % "runtime"
,但它不起作用。
我还需要在上面的依赖中指定版本如果出现以下错误:
java.io.IOException: org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter
at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:40) ~[activemq-client-5.15.10.jar:5.15.10]
at org.apache.activemq.broker.BrokerService.createPersistenceAdapter(BrokerService.java:2507) [activemq-broker-5.15.10.jar:5.15.10]
at org.apache.activemq.broker.BrokerService.getPersistenceAdapter(BrokerService.java:1267) [activemq-broker-5.15.10.jar:5.15.10]
at org.apache.activemq.broker.BrokerService.getSystemUsage(BrokerService.java:1179) [activemq-broker-5.15.10.jar:5.15.10]
....
java.lang.RuntimeException: Fatally failed to create SystemUsageorg.apache.activemq.store.kahadb.KahaDBPersistenceAdapter
at org.apache.activemq.broker.BrokerService.getSystemUsage(BrokerService.java:1190) ~[activemq-broker-5.15.10.jar:5.15.10]
at org.apache.activemq.broker.BrokerService.checkMemorySystemUsageLimits(BrokerService.java:2178) ~[activemq-broker-5.15.10.jar:5.15.10]
您应该将@jumpnett 发布的依赖项添加到您的pom 中
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-kahadb-store</artifactId>
<scope>runtime</scope>
</dependency>
【讨论】:
这没有回答 OPs 的问题,但确实为我提供了我的问题的答案。嗯。【参考方案3】:这个问题是由于 sbt 使用 ivy 作为解析系统而引入的,出于某种原因activemq-protobuf
决定使用包装类型maven-plugins
。
虽然 maven 将此类工件解析为用于编译的 jar,但 ivy(或者这是因为 sbt 我不太确定)会将其解析为类型 maven-plugins
而不是类型 jar
,因此 sbt 将忽略此依赖项,因为存在此依赖项中没有 jar。
修复的方法是显式引入一个jar类型的artifact:
libraryDependencies += "org.apache.activemq.protobuf" % "activemq-protobuf" % "1.1" jar()
【讨论】:
以上是关于使用 Play 框架的 JMS/ActiveMQ 异常的主要内容,如果未能解决你的问题,请参考以下文章
使用 JMS/ActiveMQ 并发同步请求-回复 - 模式/库?
深入浅出JMS--ActiveMQ简单的HelloWorld实例