如何在 Java 中使用 Apache ActiveMQ 实现基于条件的队列
Posted
技术标签:
【中文标题】如何在 Java 中使用 Apache ActiveMQ 实现基于条件的队列【英文标题】:How to implement condition based Queue with Apache ActiveMQ in Java 【发布时间】:2015-05-03 18:51:49 【问题描述】:我在 mysql Table 和 Java Program 的帮助下实现了一个队列。我想用 Apache ActiveMQ 实现以下程序,非常感谢任何建议。
表名:XXXXX
Col's : id | Msg | key_id |Status
----------------------------|------
| 1 | Msg1| 1 | 1
| 2 | Msg2| 2 | 1
| 3 | Msg3| 1 | 0
| 4 | Msg4| 1 | 0
| 5 | Msg5| 4 | 0
| 6 | Msg6| 3 | 0
while (true)
try
Thread.sleep(5000);
Fetch only one record from table XXXX whose key_id not in list and status is 0. Note list contains zero elements .
add key_id in list.
update status of fetched record to 1
process the fetched record ... some business logic
set status to 2 if business logic works as expected,else status as 3 if exception occured
catch (Exception ex)
如何使用 Apache ActiveMQ 实现此功能? 我是如何尝试的
package com.jms.activemq.examples;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Sender
private ConnectionFactory factory = null;
private Connection connection = null;
private Session session = null;
private Destination destination = null;
private MessageProducer producer = null;
public Sender()
public void sendMessage()
try
factory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_BROKER_URL);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("SAMPLEQUEUE");
producer = session.createProducer(destination);
TextMessage message = session.createTextMessage();
while (true)
try
Thread.sleep(5000);
Fetch only one record from table XXXX whose key_id not in list and status is 0. Note list contains zero elements .
add key_id in list.
update status of fetched record to 1
message.setText("some data from Database");
producer.send(message);
System.out.println("Sent: " + message.getText());
catch (Exception ex)
catch (JMSException e)
e.printStackTrace();
public static void main(String[] args)
Sender sender = new Sender();
sender.sendMessage();
和接收器
package com.jms.activemq.examples;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Receiver
private ConnectionFactory factory = null;
private Connection connection = null;
private Session session = null;
private Destination destination = null;
private MessageConsumer consumer = null;
public Receiver()
public void receiveMessage()
try
factory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_BROKER_URL);
connection = factory.createConnection();
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("SAMPLEQUEUE");
consumer = session.createConsumer(destination);
Message message = consumer.receive();
if (message instanceof TextMessage)
process the fetched record ... some business logic
set status to 2 if business logic works as expected,else status as 3 if exception occured
TextMessage text = (TextMessage) message;
System.out.println("Message is : " + text.getText());
catch (JMSException e)
e.printStackTrace();
public static void main(String[] args)
Receiver receiver = new Receiver();
receiver.receiveMessage();
非常感谢任何建议。
【问题讨论】:
而不是 -ve vote 请提出一些建议 @mins 谢谢,我已经编辑了 【参考方案1】:如果我正确理解您的需求,您可以使用简单的 JDBC 连接器,例如:
public class SimpleJDBCConnector
static final String JDBC_DRIVER = "org.postgresql.Driver";
static final String DB_URL = "jdbc:postgresql://localhost:5432/postgres";
static final String USER = "postgres";
static final String PASS = "admin";
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
public ResultSet executeQuery(String sql) throws SQLException
stmt = conn.createStatement();
return stmt.executeQuery(sql);
public void closeConnection() throws SQLException
conn.close();
public boolean connect() throws ClassNotFoundException, SQLException
Class.forName(JDBC_DRIVER);
conn = DriverManager.getConnection(DB_URL, USER, PASS);
return true;
还有以下 SQL 查询:
Set<Integer> set = new HashSet<Integer>();
int status=0;
int id=0;
int key_id=0;
String sql1 = "SELECT id,msg,key_id,status FROM xxxx WHERE key_id NOT IN ("
+set.toString().replaceAll("[\\]\\[\\ ]","")+") AND status = "+status+ + " LIMIT 1;";
String sql2 = "UPDATE xxxx SET status="+status+" WHERE id="+id;
要从响应中提取数据,您可以执行以下操作:
ResultSet rs = db.executeQuery(sql1);
while(rs.next())
id = rs.getInt("id");
String msg = rs.getString("msg");
key_id = rs.getInt("key_id");
status = rs.getInt("status");
【讨论】:
以上是关于如何在 Java 中使用 Apache ActiveMQ 实现基于条件的队列的主要内容,如果未能解决你的问题,请参考以下文章
java - 如何在java中使用apache math 3.0为直方图生成bin?
如何在 java 中使用 Apache spark 计算中位数和众数?
如何在 Java 中使用 Apache ActiveMQ 实现基于条件的队列
Apache Spark:如何使用 Java 在 dataFrame 中的空值列中插入数据