Groovy ActiveMQ 5.8嵌入式代理

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Groovy ActiveMQ 5.8嵌入式代理相关的知识,希望对你有一定的参考价值。

ActiveMQ 5.8 Groovy Embeded Broker Example - Laurence Toenjes - 5/14/2013
This example overcomes some limitations of the basic ActiveMQ embedded
brokers examples I found online

Some of the challenges were:
# Multiple instances on same machine and be able to use JMX.
# Running on a machine with less than 50G or 100G disk space
caused combinations of ActiveMQ errors or warnings.
# Groovy Grapes/Grab syntax to use that would work on pc and mac.

The broker in this example uses a nonpersistent store and
is multicast discoverable and should allow you to run multiple instances
of it (in separate processes of course) which is the reason for all the
code snips containing random port nums and random thread sleeps
to increase the odds of success of each new embedded broker process
to get a working set of port nums.
  1. #!/usr/bin/env groovy
  2.  
  3. @Grapes([
  4. @Grab(group='org.apache.activemq', module='activemq-all', version='5.8.0', transitive=false)
  5. ])
  6.  
  7. // file: EmbeddedBroker.groovy
  8.  
  9. /*
  10. ActiveMQ 5.8 Groovy Embeded Broker Example - Laurence Toenjes - 5/14/2013
  11. This example overcomes some limitations of the basic ActiveMQ embedded
  12. brokers examples I found online
  13.  
  14. Some of the challenges were:
  15. # Multiple instances on same machine and be able to use JMX.
  16. # Running on a machine with less than 50G or 100G disk space
  17. caused combinations of ActiveMQ errors or warnings.
  18. # Groovy Grapes/Grab syntax to use that would work on pc and mac.
  19.  
  20. The broker in this example uses a nonpersistent store and
  21. is multicast discoverable and should allow you to run multiple instances
  22. of it (in separate processes of course) which is the reason for all the
  23. code snips containing random port nums and random thread sleeps
  24. to increase the odds of success of each new embedded broker process
  25. to get a working set of port nums.
  26. */
  27.  
  28. import javax.management.ObjectName;
  29. import org.apache.activemq.broker.BrokerService;
  30. import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
  31.  
  32. import org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgentFactory;
  33. import org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgent; // http://activemq.apache.org/maven/apidocs/src-html/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.html#line.54
  34. import org.apache.activemq.transport.discovery.DiscoveryListener;
  35.  
  36. public final class EmbeddedBroker {
  37.  
  38. static random = new java.util.Random();
  39.  
  40. static def calcPid = { java.lang.management.ManagementFactory.getRuntimeMXBean().getName().split('@')[0].toInteger() } ;
  41.  
  42. static Integer javaPid = calcPid();
  43. static String sJavaPid = java.lang.management.ManagementFactory.getRuntimeMXBean().getName();
  44.  
  45. static String C_DEFAULT_DISCOVERY_URI_STRING = MulticastDiscoveryAgent.DEFAULT_DISCOVERY_URI_STRING;
  46.  
  47. private EmbeddedBroker() {} // not used - from Java example
  48.  
  49. static def sockets // to pre-allocate ports for ActiveMQ
  50. static def ports // randomly generated list of free ports
  51. static def calcMqPort = { ports.last() }
  52.  
  53. //
  54. /*
  55. SET _JAVA_OPTIONS=-Dcom.sun.management.jmxremote ^
  56. -Dcom.sun.management.jmxremote.port=51001 ^
  57. -Dcom.sun.management.jmxremote.local.only=false ^
  58. -Dcom.sun.management.jmxremote.authenticate=false ^
  59. -DmyJmxConnectorPort=41001
  60. */
  61.  
  62. def tryCounter = 1000;
  63. sockets = [];
  64.  
  65. def base = 4000; // lowest port num to try
  66. def portRange = 5000;
  67.  
  68. def calcPorts = {
  69. ports = [];
  70. def rnd = { base + random.nextInt(portRange) }
  71. def p = rnd();
  72. ports = (0 ..< 3).collect { ((p + it) as Integer) }
  73. // lets make activemq port same as pid so easy to use jconsole
  74. ports[-1] = javaPid
  75. ports; // return
  76. }
  77.  
  78. calcPorts();
  79. while ( tryCounter-- >= 0 ) {
  80. try {
  81. Thread.sleep random.nextInt( 100 );
  82. // sockets = ports.collect { new Socket(it) }
  83. ports.each { itPort -> sockets << new ServerSocket(itPort) }
  84. assert sockets.size() == ports.size(); // need at least 3
  85. } catch(Exception ex) {
  86. if ( !(ex instanceof java.net.BindException) ) {
  87. System.err.println ex
  88. }
  89. sockets.findAll { it != null }.each { itSocketToClose ->
  90. try { itSocketToClose.close(); } catch(Exception ex2) {}
  91. }
  92. sockets.clear();
  93. calcPorts();
  94. Thread.sleep( random.nextInt( 200 ) + 500 );
  95. }
  96. }
  97. Thread.sleep random.nextInt( 200 );
  98. sockets.each { it.close() }
  99.  
  100. def sm = [:] // for system map props
  101. sm.'com.sun.management.jmxremote.port' = ports[0].toString()
  102. sm.'com.sun.management.jmxremote.local.only' = 'false'
  103. sm.'com.sun.management.jmxremote.authenticate' = 'false'
  104. sm.'myJmxConnectorPort' = ports[1].toString()
  105.  
  106. // ports[0] is for com.sun.management.jmxremote.port
  107. // ports[1] is for broker.getManagementContext().setConnectorPort
  108.  
  109. sm.keySet().each { key -> System.properties[ key ] = sm[key] }
  110.  
  111. BrokerService broker
  112. def brokerCreated = false;
  113.  
  114. tryCounter = 100;
  115. while( (!brokerCreated) && (tryCounter-- >= 0) ) {
  116. try {
  117. broker = createBroker();
  118. brokerCreated = true;
  119.  
  120. // run forever
  121. Object lock = new Object();
  122. synchronized (lock) {
  123. lock.wait();
  124. }
  125.  
  126. break; //
  127. } catch(Exception ex) {
  128. println "### Oops: ${ex}"
  129. }
  130. } // end while
  131. }
  132.  
  133. public static BrokerService createBroker() throws Exception {
  134. def gi = groovy.inspect.swingui.ObjectBrowser.&inspect;
  135.  
  136. BrokerService broker = new BrokerService();
  137. broker.persistent = false; // SET THIS FIRST!!! - setting on url did not work for me
  138. broker.setUseShutdownHook(true);
  139.  
  140. // Stop ActiveMQ 5.8 Errors or Warnings when running on machines with
  141. // less than 50G to 100G of diskspace
  142. Long HundredGig = 107374182400L
  143. File fileVisitor = broker.tmpDataDirectory.canonicalFile;
  144. while( !fileVisitor.exists() ) {
  145. fileVisitor = new File(fileVisitor, '..').canonicalFile
  146. }
  147. if ( fileVisitor.usableSpace < HundredGig ) {
  148. broker.systemUsage.tempUsage.limit = fileVisitor.usableSpace/2;
  149. broker.systemUsage.storeUsage.limit = fileVisitor.usableSpace/2;
  150. }
  151. broker.systemUsage.setSendFailIfNoSpace(false);
  152. broker.systemUsage.setSendFailIfNoSpaceExplicitySet(true);
  153.  
  154. // String theBrokerSuffix = sJavaPid.replace('@','_');
  155. broker.brokerName = 'broker1'
  156.  
  157. broker.setUseJmx(true);
  158.  
  159. // sometimes set in bat/sh starter
  160. Integer myJmxConnectorPort = System.properties.'myJmxConnectorPort'.toString().toInteger();
  161. broker.getManagementContext().setConnectorPort( myJmxConnectorPort );
  162.  
  163. // !!! for jmx usage
  164. broker.setBrokerObjectName(
  165. BrokerMBeanSupport.createBrokerObjectName(broker.getManagementContext().getJmxDomainName(), broker.brokerName)
  166. )
  167.  
  168. def conn = broker.addConnector("tcp://0.0.0.0:${calcMqPort()}"); // use 0.0.0.0 , makes discovery work better
  169. // conn.name += "_port_${javaPid}"
  170. // for discovery
  171. conn.discoveryUri = new URI( "${C_DEFAULT_DISCOVERY_URI_STRING}?useLocalHost=false".trim() ); // optional add ?
  172.  
  173. broker.start();
  174. }
  175. }

以上是关于Groovy ActiveMQ 5.8嵌入式代理的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Spring 嵌入式 ActiveMQ 代理指定自定义 activemq.xml?

如何在 ActiveMq 嵌入式代理上启用 Web 控制台

在同一条消息中从嵌入式代理接收消息的最佳方式是啥?(ActiveMQ)

如何使用 Spring Boot 配置嵌入式 ActiveMQ 代理 URL

如何让我的 ActiveMQ 代理删除离线持久订阅者

使用 spring 集成自动配置 ActiveMQ