如何使用故障转移传输处理 Activemq 的最大帧大小异常

Posted

技术标签:

【中文标题】如何使用故障转移传输处理 Activemq 的最大帧大小异常【英文标题】:How to handle Activemq's max frame size exception with failover transport 【发布时间】:2017-05-19 22:14:13 【问题描述】:

我正在开发一个使用 activemq 来交换消息的应用程序,有些消息太大以至于我想取消。

我们使用带有两个 ActiveMQ 实例(主/从)的 activemq 故障转移传输。代理本身对消息有 100mb 帧大小限制。

问题是:如果我尝试发送大于 100mb 的消息,ActiveMQ 服务器将关闭连接。此时,故障转移传输将尝试重新连接并再次发送消息,从而创建一个无限循环。

客户端记录以下内容:

2017-01-05 09:19:11.910  WARN 14680 --- [0.1:61616@57025] o.a.a.t.failover.FailoverTransport       : Transport (tcp://localhost:61616) failed , attempting to automatically reconnect: 

java.io.EOFException: null
    at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[na:1.8.0_91]
    at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:267) ~[activemq-client-5.13.4.jar:5.13.4]
    at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240) ~[activemq-client-5.13.4.jar:5.13.4]
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) ~[activemq-client-5.13.4.jar:5.13.4]
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) ~[activemq-client-5.13.4.jar:5.13.4]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]

2017-01-05 09:19:11.921  INFO 14680 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport       : Successfully reconnected to tcp://localhost:61616
2017-01-05 09:19:11.923  WARN 14680 --- [0.1:61616@57026] o.a.a.t.failover.FailoverTransport       : Transport (tcp://localhost:61616) failed , attempting to automatically reconnect: 

java.io.EOFException: null
    at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[na:1.8.0_91]
    at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:267) ~[activemq-client-5.13.4.jar:5.13.4]
    at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240) ~[activemq-client-5.13.4.jar:5.13.4]
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) ~[activemq-client-5.13.4.jar:5.13.4]
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) ~[activemq-client-5.13.4.jar:5.13.4]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_91]

同时记录activeMQ实例:

2017-01-05 09:19:11,909 | WARN  | Transport Connection to: tcp://127.0.0.1:57025 failed: java.io.IOException: Frame size of 363 MB larger than max allowed 100 MB | org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ Transport: tcp:///127.0.0.1:57025@61616
2017-01-05 09:19:11,922 | WARN  | Transport Connection to: tcp://127.0.0.1:57026 failed: java.io.IOException: Frame size of 363 MB larger than max allowed 100 MB | org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ Transport: tcp:///127.0.0.1:57026@61616

我尝试设置一个 TransportListener 来验证我是否可以捕获这种情况,但我只收到一个 transportInterupted 事件,没有任何分类器。

我阅读了有关故障转移传输 (http://activemq.apache.org/failover-transport-reference.html) 的文档,也许我可以使用 maxReconnectAttempts,但我知道在更常见的情况下(例如服务器暂时不可用)会有几个缺点。

如何检测这种情况,避免客户端和服务器之间的无限连接循环?

【问题讨论】:

【参考方案1】:

如你所说

最大重新连接尝试次数 -1 | 0 从 ActiveMQ 5.6:默认为 -1,永远重试。 0 表示禁用重新连接,例如:尝试连接一次。在 ActiveMQ 之前 5.6:默认为0,永远重试。所有 ActiveMQ 版本:值 >0 表示发生错误之前的最大重新连接尝试次数 发回给客户。

因此,如果您希望您的传输侦听器在重试失败后收到传输失败通知,因为您的消息的大小您需要将 maxReconnectAttempts 设置为 > 0 值,然后当最大重试次数达到传输侦听器的 onException 方法将使用 IOException 作为参数调用,但正如您所说,要验证它是由于最大大小还是其他问题并不容易。

如果您想在发送前检查消息大小,您可以在运行时通过 jmx 访问代理端的 uri 中配置 maxFrameSize 并获取 BrokerViewMBean 实例并调用 getTransportConnectorByType 方法@ 987654321@ 这将返回在 activemq.xml 中配置的 uri,您可以对其进行解析以检索 maxFrameSize。

JMXServiceURL url = new     JMXServiceURL("service:jmx:rmi:///jndi/rmi://hist:1099/jmxrmi");
JMXConnector jmxc = JMXConnectorFactory.connect(url);
MBeanServerConnection conn = jmxc.getMBeanServerConnection(); 

ObjectName activeMq = new ObjectName("org.apache.activemq:Type=Broker,BrokerName=localhost");

BrokerViewMBean mbean = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(conn, activeMq, BrokerViewMBean.class, true);
String uri = mbean.getTransportConnectorByType("tcp");// or ("ssl") 
String[] pairs = uri.split("&");
for (String pair : pairs) 
    if (pair.startsWith("wireFormat.maxFrameSize")) 
        int idx = pair.indexOf("=");
        System.out.println(URLDecoder.decode(pair.substring(idx + 1), "UTF-8"));
    

http://activemq.apache.org/maven/apidocs/org/apache/activemq/broker/jmx/BrokerViewMBean.html#getTransportConnectors-- 将返回传输名称作为键和 uri 作为值的映射

要获得更好的消息大小,您可以这样做:

        OpenWireFormat opf = new OpenWireFormat();
        opf.setTightEncodingEnabled(true);
        ByteSequence tab = opf.marshal(message);
        System.out.println(tab.length);

您的业务必须是这样的:

import java.io.IOException;
import java.net.URLDecoder;
import java.util.HashMap;
import java.util.Map;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.management.MBeanServerConnection;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.failover.FailoverTransport;
import org.apache.activemq.util.ByteSequence;

public class SimpleSenderMaxSizeManager 

    private static Connection conn = null;
    private static boolean transportChanged;
    private static Long MAX_FRAME_SIZE;

    public static void main(String[] args) throws JMSException 
        try 
            SimpleSenderMaxSizeManager.updateMaxSize("host1");
            ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
                    "failover:(tcp://host1:5670,tcp://host2:5671)?randomize=false");
            cf.setTransportListener(new TransportListener() 

                @Override
                public void transportResumed() 
                    if (transportChanged) 
                        transportChanged = false;
                        try 
                            SimpleSenderMaxSizeManager.updateMaxSize(null);
                         catch (Exception e) 
                        
                    
                

                @Override
                public void transportInterupted() 
                    transportChanged = true;
                

                @Override
                public void onException(IOException error) 
                

                @Override
                public void onCommand(Object command) 
                
            );
            conn = cf.createConnection();
            ActiveMQSession session = (ActiveMQSession) conn.createSession(false,
                    ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
            MessageProducer producer = session.createProducer(session.createQueue("TEST"));
            conn.start();
            ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage("test");
            OpenWireFormat opf = new OpenWireFormat();
            opf.setTightEncodingEnabled(true);
            ByteSequence tab = opf.marshal(msg);
            System.out.println(tab.length);
            if (tab.length >= MAX_FRAME_SIZE) 
                throw new RuntimeException(tab.length + ">=" + MAX_FRAME_SIZE);
            
            producer.send(msg);
         catch (Exception e) 
            e.printStackTrace();
         finally 
            if (conn != null) 
                try 
                    conn.close();
                 catch (Exception e) 
                
            
        
    

    protected static void updateMaxSize(String host) throws Exception 
        JMXConnector jmxc = null;
        try 
            String jmxHost = host;
            String scheme = null;
            if (conn == null) 
                scheme = "tcp";
             else 
                org.apache.activemq.transport.TransportFilter responseCorrelator = (TransportFilter) ((ActiveMQConnection) conn)
                        .getTransport();
                TransportFilter mutexTransport = (TransportFilter) responseCorrelator.getNext();
                FailoverTransport failoverTransport = (FailoverTransport) mutexTransport.getNext();
                while (failoverTransport.getConnectedTransportURI() == null) 
                    try 
                        Thread.sleep(100);
                     catch (Exception e) 
                    
                
                scheme = failoverTransport.getConnectedTransportURI().getScheme();
                jmxHost = failoverTransport.getConnectedTransportURI().getHost();
            
            JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + jmxHost + ":1099/jmxrmi");
            Map<String, String[]> env = new HashMap<>();
            String[] creds =  "admin", "admin" ;
            env.put(JMXConnector.CREDENTIALS, creds);
            jmxc = JMXConnectorFactory.connect(url, env);
            MBeanServerConnection conn = jmxc.getMBeanServerConnection();
            ObjectName activeMq = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost");
            BrokerViewMBean mbean = MBeanServerInvocationHandler.newProxyInstance(conn, activeMq, BrokerViewMBean.class,
                    true);
            String value = mbean.getTransportConnectorByType(scheme);
            String[] pairs = value.split("&");
            for (String pair : pairs) 
                if (pair.contains("wireFormat.maxFrameSize")) 
                    int idx = pair.indexOf("=");
                    System.out.println(URLDecoder.decode(pair.substring(idx + 1), "UTF-8"));
                    MAX_FRAME_SIZE = Long.valueOf(URLDecoder.decode(pair.substring(idx + 1), "UTF-8"));
                    MAX_FRAME_SIZE -= 1000;// security for JMS headers added by
                                            // session on sending
                
            
         finally 
            if (jmxc != null) 
                try 
                    jmxc.close();
                 catch (Exception e) 
                
            
        
    

【讨论】:

非常好的答案。我认为解决方案将通过 JMX 检查正确的帧大小。问题:opf.marshal(message);将序列化整个消息只是为了获得它的大小?如果我有一个巨大的消息大小,这可能是一个缺点(尽管我使用 getBytes("UTF-8") 只是为了测试。 是的 opf.marshal(message);与 AMQ 客户端在发送之前使用的相同,它将带有标头的整个消息编组为接近实际大小。在所有情况下,AMQ 稍后都会使用此方法,我认为您最好知道它是否工作正常【参考方案2】:

我不相信这是可能的。您正在尝试将错误处理分类为不会在故障转移后冒泡的异常:传输。如果您超过最大客户端数,可能会发生相同类型的异常。

在发送前检查邮件大小听起来像是一个可行的选择。

是否存在尺寸检查不符合您要求的原因?

public String mySendMessage(String body) 
....
if(body.length > MAX_ALLOWED) .. 
   throw new Exception.. or log.. or other
else
   producer.send(session.createTextMessage(body));

【讨论】:

对我来说问题是我们在客户端驱动程序和 activemq 之间创建了一个我无法处理也无法干预的循环。 关于您的问题:发送前检查消息大小的问题是我不知道 activemq 实例允许的最大大小是多少。在其他工作中,我可以更改 activemq 的配置,但我也需要更改应用程序的配置。它更容易出错。 您如何无法访问消息负载的大小? (有关一些 JMS-API 伪代码,请参阅我上面的编辑)

以上是关于如何使用故障转移传输处理 Activemq 的最大帧大小异常的主要内容,如果未能解决你的问题,请参考以下文章

Mule JMS ActiveMQ 传输失败到故障转移

Activemq 宕机解决方案

ActiveMQ 主从故障转移丢失消息

ActiveMQ-cpp:尽管发生故障转移,但连接丢失

ActiveMQ NMS:当代理关闭时,connection.start() 使用故障转移协议挂起

ActiveMQ 经典到 ActiveMQ Artemis 故障转移不起作用