如何在 Java 中使用 Apache ActiveMQ 实现基于条件的队列

Posted

技术标签:

【中文标题】如何在 Java 中使用 Apache ActiveMQ 实现基于条件的队列【英文标题】:How to implement condition based Queue with Apache ActiveMQ in Java 【发布时间】:2015-05-03 18:51:49 【问题描述】:

我在 mysql Table 和 Java Program 的帮助下实现了一个队列。我想用 Apache ActiveMQ 实现以下程序,非常感谢任何建议。

表名:XXXXX

Col's : id | Msg |  key_id  |Status
----------------------------|------
      |  1 | Msg1|   1      |  1 
      |  2 | Msg2|   2      |  1
      |  3 | Msg3|   1      |  0
      |  4 | Msg4|   1      |  0
      |  5 | Msg5|   4      |  0
      |  6 | Msg6|   3      |  0


while (true) 

        try 
            Thread.sleep(5000);
            Fetch only one record from table XXXX whose key_id not in list and status is 0. Note list contains zero elements .
            add key_id in list.
            update status of fetched record to 1
            process the fetched record ... some business logic
            set status to 2 if business logic works as expected,else status as 3 if exception occured


             catch (Exception ex) 


如何使用 Apache ActiveMQ 实现此功能? 我是如何尝试的

        package com.jms.activemq.examples;

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender 

private ConnectionFactory factory = null;
private Connection connection = null;
private Session session = null;
private Destination destination = null;
private MessageProducer producer = null;

public Sender() 



public void sendMessage() 

    try 
        factory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_BROKER_URL);
        connection = factory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        destination = session.createQueue("SAMPLEQUEUE");
        producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage();

        while (true) 

            try 
                Thread.sleep(5000);
            Fetch only one record from table XXXX whose key_id not in list and status is 0. Note list contains zero elements .
            add key_id in list.
            update status of fetched record to 1
                message.setText("some data from Database");
                producer.send(message);

                System.out.println("Sent: " + message.getText());
             catch (Exception ex) 

            
        


        catch (JMSException e) 
        e.printStackTrace();
    
    



    public static void main(String[] args) 
        Sender sender = new Sender();
        sender.sendMessage();

    

    

和接收器

        package com.jms.activemq.examples;

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;

    public class Receiver 

        private ConnectionFactory factory = null;
        private Connection connection = null;
        private Session session = null;
        private Destination destination = null;
        private MessageConsumer consumer = null;

        public Receiver() 

        

        public void receiveMessage() 
            try 
                factory = new ActiveMQConnectionFactory(
                        ActiveMQConnection.DEFAULT_BROKER_URL);
                connection = factory.createConnection();
                connection = factory.createConnection();
                connection.start();
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                destination = session.createQueue("SAMPLEQUEUE");
                consumer = session.createConsumer(destination);
                Message message = consumer.receive();

                if (message instanceof TextMessage) 
          process the fetched record ... some business logic
          set status to 2 if business logic works as expected,else status as 3 if exception occured

                    TextMessage text = (TextMessage) message;
                    System.out.println("Message is : " + text.getText());

                
             catch (JMSException e) 
                e.printStackTrace();
            
        

        public static void main(String[] args) 
            Receiver receiver = new Receiver();
            receiver.receiveMessage();
        
    

非常感谢任何建议。

【问题讨论】:

而不是 -ve vote 请提出一些建议 @mins 谢谢,我已经编辑了 【参考方案1】:

如果我正确理解您的需求,您可以使用简单的 JDBC 连接器,例如:

public class SimpleJDBCConnector 
    static final String JDBC_DRIVER = "org.postgresql.Driver";
    static final String DB_URL = "jdbc:postgresql://localhost:5432/postgres";
    static final String USER = "postgres";
    static final String PASS = "admin";

    Connection conn = null;
    Statement stmt = null;
    ResultSet rs = null;

    public ResultSet executeQuery(String sql) throws SQLException 
        stmt = conn.createStatement();
        return stmt.executeQuery(sql);
    

    public void closeConnection() throws SQLException 
        conn.close();
    

    public boolean connect() throws ClassNotFoundException, SQLException 
            Class.forName(JDBC_DRIVER);
            conn = DriverManager.getConnection(DB_URL, USER, PASS);
        return true;
    

还有以下 SQL 查询:

Set<Integer> set = new HashSet<Integer>();
int status=0;
int id=0;
int key_id=0;

String sql1 = "SELECT id,msg,key_id,status FROM xxxx WHERE key_id NOT IN ("
        +set.toString().replaceAll("[\\]\\[\\ ]","")+") AND status = "+status+ + " LIMIT 1;";

String sql2 = "UPDATE xxxx SET status="+status+" WHERE id="+id;

要从响应中提取数据,您可以执行以下操作:

ResultSet rs = db.executeQuery(sql1);
        while(rs.next())
            id  = rs.getInt("id");
            String msg = rs.getString("msg");
            key_id = rs.getInt("key_id");
            status = rs.getInt("status");
        

【讨论】:

以上是关于如何在 Java 中使用 Apache ActiveMQ 实现基于条件的队列的主要内容,如果未能解决你的问题,请参考以下文章

java - 如何在java中使用apache math 3.0为直方图生成bin?

如何在 java 中使用 Apache spark 计算中位数和众数?

如何在 Java 中使用 Apache ActiveMQ 实现基于条件的队列

Apache Spark:如何使用 Java 在 dataFrame 中的空值列中插入数据

如何使用 Apache Commons Codec 在 Java 中解码 JWT(标题和正文)?

在 Java 中,如何创建等效的 Apache Avro 容器文件,而不必强制使用文件作为媒介?