ActiveMQ的静态网络配置

Posted xiaoliangup

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ的静态网络配置相关的知识,希望对你有一定的参考价值。

static networkConnector是用于创建一个静态的配置对于网络中的多个Broker做集群,这种协议用于复合url,一个复合url包括多个url地址。

<networkConnectors>
             <networkConnector name="local network"  duplex="true"
              uri="static://(tcp://192.168.174.104:61616,tcp://192.168.174.104:61676)"/>
</networkConnectors>

常用networkConnector配置的可用属性:

  conduitSubscriptions :默认true,是否把同一个broker的多个consumer当做一个来处理

  duplex :默认false,设置是否能双向通信


消息发送代码
技术分享图片
public class JmsSend {
    
    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616");
        Connection connection = connectionFactory.createConnection();
        
        connection.start();
        
        Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        Destination queue=session.createQueue("my-queue4");
        
    
        MessageProducer producer=session.createProducer(queue);
        
        
        for(int i=0 ; i<20 ; i++){
             TextMessage message=session.createTextMessage("message"+i);
             //message.setStringProperty("queue", "queue"+i);
             //message.setJMSType("1");
             producer.send(message);
        }
        session.commit();
        session.close();
        
        connection.close();
        
    }

}
View Code

192.168.174.104:61616 broker1 接收测试代码
技术分享图片
public class JmsReceiver1 {
    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "tcp://192.168.174.104:61616");
        

        for (int i=0; i<10 ;i++){
            new Myhread1(connectionFactory).start();
            
            Thread.sleep(1000);
            
        }
        
        

    }

}

class Myhread1 extends Thread {
    
    private ConnectionFactory connectionFactory ;
    
    public Myhread1(ConnectionFactory connectionFactory) {
        super();
        this.connectionFactory = connectionFactory;
    }


    public void run() {
         
         
        try {
            
            final Connection connection = connectionFactory.createConnection();
            connection.start();
            
            
            final Session session = connection.createSession(Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);
            
            Destination queue = session.createQueue("my-queue4");

            MessageConsumer consumer = session.createConsumer(queue);

            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage msg = (TextMessage) message;
                    try {
                        System.out.println("1======"+msg.getText());
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    
                    try {
                        session.commit();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    
                    try {
                        session.close();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    
                    
                }
            });
            
            
        } catch (JMSException e) {
            e.printStackTrace();
        }
        
    }

}
View Code

192.168.174.104:61676 broker2 接收测试代码
技术分享图片
public class JmsReceiver2 {
    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                "tcp://192.168.174.104:61676");
        

        for (int i=0; i<10 ;i++){
            new Myhread2(connectionFactory).start();
            
            Thread.sleep(1000);
            
        }
        
        

    }

}

class Myhread2 extends Thread {
    
    private ConnectionFactory connectionFactory ;
    
    public Myhread2(ConnectionFactory connectionFactory) {
        super();
        this.connectionFactory = connectionFactory;
    }


    public void run() {
         
        
        try {
            
            final Connection connection  = connectionFactory.createConnection();
            connection.start();
            
            
            final Session session = connection.createSession(Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);
            
            Destination queue = session.createQueue("my-queue4");

            MessageConsumer consumer = session.createConsumer(queue);

            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage msg = (TextMessage) message;
                    try {
                        System.out.println("2======"+msg.getText());
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    
                    try {
                        session.commit();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    
                    try {
                        session.close();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    
                    
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    
                }
            });
            
            
        } catch (JMSException e) {
            e.printStackTrace();
        }
        
    }

}
View Code

 

“丢失”的消息

broker1和broker2通过networkConnector连接,一些consumers连接到broker2,消费broker1上的消息。消息先被broker2从broker1上消费掉,然后转发给这些consumers。不幸的是转发部分消息的时候broker2重启了,这些consumers发现broker2连接失败,通过failover连接到broker1上去了,但是有一部分他们还没有消费的消息被broker1已经分发到了broker2上去了。这些消息,就好像是消失了。

broker1 中my-queue4 接收到20条消息。

技术分享图片

broker1通过静态网络与broker2连接,与broker2相连的消费者消费后,broker1中Number of Pending Messages为0,即消息先被broker2从broker1上消费掉。

技术分享图片

一些consumers连接到broker1,没法从broker1获取消息消费。

 技术分享图片

 

针对“丢失”的消息,配置replayWhenNoConsumers选项

这个选项使得broker1上有需要转发的消息但是没有消费者时,把消息回流到它原始的broker。同时把enableAudit设置为false,为了防止消息回流后被当做重复消息而不被分发。

<policyEntries>
        <policyEntry queue=">" enableAudit="false">
                <networkBridgeFilterFactory>
                        <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
                </networkBridgeFilterFactory>
        </policyEntry>
</policyEntries>

 




以上是关于ActiveMQ的静态网络配置的主要内容,如果未能解决你的问题,请参考以下文章

ActiveMQ(10):ActiveMQ的静态网络链接

在没有启用咨询支持的情况下,代理的 activemq 静态网络是不是会停止转发消息?

分布式-信息方式-ActiveMQ静态网络连接信息回流功能

ActiveMQ配置用户认证信息

ActiveMQ配置用户认证信息

Activemq集群搭建