分布式-信息方式-ActiveMQ静态网络连接信息回流功能

Posted caoyingjielxq

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式-信息方式-ActiveMQ静态网络连接信息回流功能相关的知识,希望对你有一定的参考价值。

 
                                             “丢失”的消息
有这样的场景, broker1和 broker2通过 netwoskconnector连接,一些 consumers连接到 broker1,
消费 broker2上的消息。消息先被 broker1从 broker2上消费掉,然后转发给这些 consumers。不幸的是转
发部分消息的时候 broker1重启了,这些 consumer发现 broker1连接失败,通过 failover连接到 broker2
上去了,但是有一部分他们还没有消费的消息被 broker2已经分发到了 broker1上去了。这些消息,就好
像是消失了,除非有消费者重新连接到 broker1上来消费。怎么办呢?
从5.6版起,在 destinationPolicy上新增的选项 replayWhenNoConsumers。这个选项使得 broker1
上有需要转发的消息但是没有消费者时,把消息回流到它原始的 broker,同时把 enableAudit设置为
false,为了防止消息回流后祓当做重复消息而不被分发,示例如下:

<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" enableAudit="false">
<networkBridgeFilterFactory>
<conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
</networkBridgeFilterFactory>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>

代码如下:

package test.mq.staitsnetwork;

 

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {
       public static void main(String[] args) throws JMSException, InterruptedException {
        ConnectionFactory   ConnectionFactory=new ActiveMQConnectionFactory(
                "tcp://192.168.145.100:61616"
                );
        Connection connection=ConnectionFactory.createConnection();
        connection.start();
    
        Session session=connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);
        Destination destination=session.createQueue("my_queue");
        MessageProducer Producer=session.createProducer(destination);
     
        for(int i=0;i<30;i++){
             TextMessage message=session.createTextMessage("message----"+i);
                //Thread.sleep(1000);  
                Producer.send(message);
        }
         session.commit();
         session.close();
         connection.close();    
    }
}

消费者1

package test.mq.staitsnetwork;

import java.util.Enumeration;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Receiver1{
    
    public static void main(String[] args) throws JMSException {
        ConnectionFactory   connectionFactory=new ActiveMQConnectionFactory(
                "tcp://192.168.145.100:61676"
                );
        for(int i=0;i<30;i++){
            Thread    t=new MyThread(connectionFactory);
            t.start();
            try {
                Thread.sleep(1000l);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }
}
class MyThread extends Thread{
         private ConnectionFactory   connectionFactory=null;
         public  MyThread(ConnectionFactory   connectionFactory){
         this.connectionFactory = connectionFactory;
         }
       public void run(){
            try {
                final Connection  connection = connectionFactory.createConnection();
                connection.start();
                Enumeration names=connection.getMetaData().getJMSXPropertyNames();
                 
                final Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); 
                Destination destination=session.createQueue("my_queue");
                MessageConsumer Consumer=session.createConsumer(destination);
                Consumer.setMessageListener(new MessageListener() {
                    @Override
                    public void onMessage(Message msg) {
                    TextMessage     txtmsg=(TextMessage) msg; 
                    try {
                        System.out.println("接收信息1--->"+txtmsg.getText());
                    } catch (JMSException e1) {
                        e1.printStackTrace();
                    }
                    try {
                        session.commit();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                    try {
                        session.close();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    }
                });
                 
                
                
                
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            
       }    
    }

 

消费者2

package test.mq.staitsnetwork;

import java.util.Enumeration;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Receiver2{
    
    public static void main(String[] args) throws JMSException {
        ConnectionFactory   connectionFactory=new ActiveMQConnectionFactory(
                "tcp://192.168.145.100:61616"
                );
        for(int i=0;i<30;i++){
            Thread    t=new MyThread2(connectionFactory);
            t.start();
            try {
                Thread.sleep(1000l);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }
}
class MyThread2 extends Thread{
         private ConnectionFactory   connectionFactory=null;
         public  MyThread2(ConnectionFactory   connectionFactory){
         this.connectionFactory = connectionFactory;
         }
       public void run(){
            try {
                final Connection  connection = connectionFactory.createConnection();
                connection.start();
                Enumeration names=connection.getMetaData().getJMSXPropertyNames();
                 
                final Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); 
                Destination destination=session.createQueue("my_queue");
                MessageConsumer Consumer=session.createConsumer(destination);
                Consumer.setMessageListener(new MessageListener() {
                    @Override
                    public void onMessage(Message msg) {
                    TextMessage     txtmsg=(TextMessage) msg; 
                    try {
                        System.out.println("接收信息2--->"+txtmsg.getText());
                    } catch (JMSException e1) {
                        e1.printStackTrace();
                    }
                    try {
                        session.commit();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                    try {
                        session.close();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    }
                });
                 
                
                
                
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            
       }    
    }

运行结果:

消费者1

技术分享图片

 

消费者2

技术分享图片

 










以上是关于分布式-信息方式-ActiveMQ静态网络连接信息回流功能的主要内容,如果未能解决你的问题,请参考以下文章

分布式-信息方式-ActiveMQ结合Spring

activemq连接池原理

如何禁用ActiveMQ服务器的trace方法

原创Activemq的几种通信方式总结

ActiveMQ(12):容错的链接和动态网络连接

ActiveMQ(10):ActiveMQ的静态网络链接