ActiveMQ配置ssl安全连接

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ配置ssl安全连接相关的知识,希望对你有一定的参考价值。

参考技术A

**生成服务端私钥,并导入到服务端keyStore文件中,此操作生成broker1.ks文件,保存服务端私钥,供服务端使用。 **

**根据服务端私钥导出服务端证书,此操作生成broker_cert文件,该文件为服务端的证书。 **

**导入服务端证书到客户端的Trust keyStore中。此操作生成client.ts文件,保存服务端证书,供客户端使用。 **

**生成客户端私钥,并且导入到客户端keyStore文件中,此操作生成client1.ks文件,保存客户端私钥,供客户端使用。 **

根据客户端私钥导出客户端证书。

导入客户端证书到服务端的Trust keyStore中,此操作生成broker1.ts文件,保存客户端证书,供服务端使用。

证书到此生成完毕

将broker1.ks,borker1.ts,放在$activemq.base/conf目录下

==$activemq.base== activemq的根目录

在transportConnectors同级元素中(broker元素内)添加sslContext元素

transportConnectors中添加ssl连接

重启activemq

Spring Boot SSL TCPClient ~ StompBrokerRelayMessageHandler ~ ActiveMQ ~ Undertow

【中文标题】Spring Boot SSL TCPClient ~ StompBrokerRelayMessageHandler ~ ActiveMQ ~ Undertow【英文标题】: 【发布时间】:2016-03-23 22:10:16 【问题描述】:

我正在尝试构建一个基于Spring Websocket Demo 运行ActiveMQ 作为具有Undertow 的STOMP 消息代理的websocket 消息传递应用程序。该应用程序在不安全的连接上运行良好。但是,我很难将 STOMP Broker Relay 配置为使用 SSL 连接进行转发。

如 Spring WebSocket Docs 中所述...

上述配置中的“STOMP 代理中继”是一个 Spring MessageHandler,它通过将消息转发到外部消息代理来处理消息。为此,它与代理建立 TCP 连接,将所有消息转发给它,然后通过 WebSocket 会话将从代理接收到的所有消息转发给客户端。本质上,它充当双向转发消息的“中继”。

此外,文档声明了我对 reactor-net 的依赖...

请添加对 org.projectreactor:reactor-net 的依赖以进行 TCP 连接管理。

问题是我当前的实现没有通过 SSL 初始化 NettyTCPClient,因此 ActiveMQ 连接失败并出现 SSLException。


[r.i.n.i.n.t.NettyTcpClient:307] » CONNECTED: 
[id: 0xcfef39e9, /127.0.0.1:17779 => localhost/127.0.0.1:8442]
...
[o.a.a.b.TransportConnection.Transport:245] » 
Transport Connection to: tcp://127.0.0.1:17779 failed:
javax.net.ssl.SSLException: Unrecognized SSL message, plaintext connection?
...

因此,我尝试研究 Project Reactor Docs 来设置连接的 SSL 选项,但我没有成功。

此时我发现StompBrokerRelayMessageHandler 在Reactor2TcpClient 中默认初始化NettyTCPClient,但它似乎无法配置。

我们将不胜感激。

SSCCE


app.props

spring.activemq.in-memory=true
spring.activemq.pooled=false
spring.activemq.broker-url=stomp+ssl://localhost:8442
server.port=8443
server.ssl.enabled=true
server.ssl.protocol=tls
server.ssl.key-alias=undertow
server.ssl.key-store=classpath:undertow.jks
server.ssl.key-store-password=xxx
server.ssl.trust-store=classpath:undertow_certs.jks
server.ssl.trust-store-password=xxx

WebSocket配置

//... 
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer 

    private static final Logger log = LoggerFactory.getLogger(WebSocketConfig.class);

    private final static String KEYSTORE = "/activemq.jks";
    private final static String KEYSTORE_PASS = "xxx";
    private final static String KEYSTORE_TYPE = "JKS";
    private final static String TRUSTSTORE = "/activemq_certs.jks";
    private final static String TRUSTSTORE_PASS = "xxx";

    private static String getBindLocation() 
        return "stomp+ssl://localhost:8442?transport.needClientAuth=false";
    

    @Bean(initMethod = "start", destroyMethod = "stop")
    public SslBrokerService activeMQBroker() throws Exception 

        final SslBrokerService service = new SslBrokerService();
        service.setPersistent(false);

        KeyManager[] km = SecurityManager.getKeyManager();
        TrustManager[] tm = SecurityManager.getTrustManager();

        service.addSslConnector(getBindLocation(), km, tm, null);
        final ActiveMQTopic topic = new ActiveMQTopic("jms.topic.test");
        service.setDestinations(new ActiveMQDestination[]topic);

        return service;
    


    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) 
        config.enableStompBrokerRelay("/topic").setRelayHost("localhost").setRelayPort(8442);
        config.setApplicationDestinationPrefixes("/app");
    

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) 
        registry.addEndpoint("/welcome").withSockJS();
        registry.addEndpoint("/test").withSockJS();
    

   private static class SecurityManager  
   //elided...
   


根据Rossens 建议解决。这是任何有兴趣的人的实现细节。


WebSocket配置

@Configuration
public class WebSocketConfig extends DelegatingWebSocketMessageBrokerConfiguration 
    ...
    @Bean
    public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() 
      StompBrokerRelayMessageHandler handler = (StompBrokerRelayMessageHandler) super.stompBrokerRelayMessageHandler();
      ConfigurationReader reader = new StompClientDispatcherConfigReader();
      Environment environment = new Environment(reader).assignErrorJournal();
      TcpOperations<byte[]> client = new Reactor2TcpClient<>(new StompTcpClientSpecFactory(environment,"localhost", 8443));
      handler.setTcpClient(client);
      return handler;
    

StompTCPClientSpecFactory

private static class StompTcpClientSpecFactory
        implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> 

    private static final Logger log = LoggerFactory.getLogger(StompTcpClientSpecFactory.class);

    private final String host;
    private final int port;
    private final String KEYSTORE = "src/main/resources/tcpclient.jks";
    private final String KEYSTORE_PASS = "xxx";
    private final String KEYSTORE_TYPE = "JKS";
    private final String TRUSTSTORE = "/src/main/resources/tcpclient_certs.jks";
    private final String TRUSTSTORE_PASS = "xxx";
    private final String TRUSTSTORE_TYPE = "JKS";
    private final Environment environment;

    private final SecurityManager tcpManager = new SecurityManager
            .SSLBuilder(KEYSTORE, KEYSTORE_PASS)
            .keyStoreType(KEYSTORE_TYPE)
            .trustStore(TRUSTSTORE, TRUSTSTORE_PASS)
            .trustStoreType(TRUSTSTORE_TYPE)
            .build();

    public StompTcpClientSpecFactory(Environment environment, String host, int port) 
        this.environment = environment;
        this.host = host;
        this.port = port;
    

    @Override
    public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(
            Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) 

        return tcpClientSpec
                .ssl(new SslOptions()
                        .sslProtocol("TLS")
                        .keystoreFile(tcpManager.getKeyStore())
                        .keystorePasswd(tcpManager.getKeyStorePass())
                        .trustManagers(tcpManager::getTrustManager)
                        .trustManagerPasswd(tcpManager.getTrustStorePass()))
                .codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
                .env(this.environment)
                .dispatcher(this.environment.getCachedDispatchers("StompClient").get())
                .connect(this.host, this.port);
    

【问题讨论】:

无论谁对此投了反对票,我真的想要一个解释......没有胆量。 内部允许 (github.com/reactor/reactor-io/blob/master/reactor-net/src/test/…) 但我正在检查它们是否暴露。 【参考方案1】:

对于所有寻找更新解决方案的人,我设法以更简洁的方式解决了问题。只需使用 SSL 创建和使用自己的 TCP 客户端:

import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.stomp.StompReactorNettyCodec;
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
class WebsocketConfiguration implements WebSocketMessageBrokerConfigurer 

  private final WebsocketProperties properties;

  @Override
  public void registerStompEndpoints(StompEndpointRegistry registry) 
    registry.addEndpoint("/ws").setAllowedOrigins("*");
    registry.addEndpoint("/ws").withSockJS();
  

  @Override
  public void configureMessageBroker(MessageBrokerRegistry registry) 

    ReactorNettyTcpClient<byte[]> tcpClient = new ReactorNettyTcpClient<>(configurer -> configurer
            .host(properties.getRelayHost())
            .port(properties.getRelayPort())
            .secure(), new StompReactorNettyCodec());

    registry.enableStompBrokerRelay("/queue", "/topic")
            .setAutoStartup(true)
            .setSystemLogin(properties.getClientLogin())
            .setSystemPasscode(properties.getClientPasscode())
            .setClientLogin(properties.getClientLogin())
            .setClientPasscode(properties.getClientPasscode())
            .setTcpClient(tcpClient);

    registry.setApplicationDestinationPrefixes("/app");
    

在我的情况下(略有不同),我创建了两个 ReactorNettyTcpClient 作为 Bean 的实现,并根据环境选择一个带/不带 SSL 的实现。

依赖关系:

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.4</version>
        <relativePath/>
    </parent>
    .
    .
    .
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-stomp</artifactId>
        <version>5.16.2</version>
    </dependency>
    <dependency>
        <groupId>io.projectreactor.netty</groupId>
        <artifactId>reactor-netty</artifactId>
        <version>1.0.8</version>
    </dependency>

我希望目前正在尝试解决此问题的任何人都觉得它很有用。

【讨论】:

【参考方案2】:

@amoebob 答案很好,但线程没有正确关闭。每次打开来自客户端的连接时,都会打开一个新线程并且永远不会关闭。我在生产中发现了这个问题,并花了几天时间来解决它。所以我建议你改变 StompTcpFactory 以提高线程重用:

import io.netty.channel.EventLoopGroup;
import org.springframework.messaging.Message;
import org.springframework.messaging.simp.stomp.Reactor2StompCodec;
import org.springframework.messaging.simp.stomp.StompDecoder;
import org.springframework.messaging.simp.stomp.StompEncoder;
import org.springframework.messaging.tcp.reactor.Reactor2TcpClient;
import reactor.Environment;
import reactor.core.config.ReactorConfiguration;
import reactor.io.net.NetStreams;
import reactor.io.net.Spec;
import reactor.io.net.config.SslOptions;
import reactor.io.net.impl.netty.NettyClientSocketOptions;

public class StompTcpFactory implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> 

  private final Environment environment;
  private final EventLoopGroup eventLoopGroup;
  private final String host;
  private final int port;
  private final boolean ssl;

  public StompTcpFactory(String host, int port, boolean ssl) 
    this.host = host;
    this.port = port;
    this.ssl = ssl;
    this.environment = new Environment(() -> new ReactorConfiguration(emptyList(), "sync", new Properties()));
    this.eventLoopGroup = Reactor2TcpClient.initEventLoopGroup();
  

  @Override
  public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) 
    return tcpClientSpec
            .env(environment)
            .options(new NettyClientSocketOptions().eventLoopGroup(eventLoopGroup))
            .codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
            .ssl(ssl ? new SslOptions() : null)
            .connect(host, port);
  


【讨论】:

我正在使用 Reactor2TcpClient.initEventLoopGroup(); 获得私人访问权限Reactor 2.0.8-RELEASE 您应该在 spring-messaging 中从 org.springframework.messaging.tcp.reactor 中找到 Reactor2TcpClient。我正在使用版本4.3.13 不错,明白了。我仍在使用Spring Boot 1.4.2,因此必须手动检查并升级Spring Messaging 的一些依赖版本。法国万岁! 像魅力一样工作!这个解决方案的票数肯定比@amoebob 的答案高。【参考方案3】:

我需要使用带有 Java 8 的 Spring Messaging 4.2.5 来保护到 RabbitMQ 的 STOMP 代理中继,但发现问题的后续代码已经过时。

在启动我的应用程序时,我提供了信任库环境属性来信任内部自签名证书颁发机构。 java -Djavax.net.ssl.trustStore=/etc/pki/java/server.jks -Djavax.net.ssl.trustStorePassword=xxxxx -jar build/libs/server.war

根据罗森的回答,我改变了

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer 

@Configuration
public class WebSocketConfig extends DelegatingWebSocketMessageBrokerConfiguration 

然后,在那个WebSocketConfig 中,我提供了我自己的AbstractBrokerMessageHandler bean:

@Bean
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() 
    AbstractBrokerMessageHandler handler = super.stompBrokerRelayMessageHandler();
    if (handler instanceof StompBrokerRelayMessageHandler) 
        ((StompBrokerRelayMessageHandler) handler).setTcpClient(new Reactor2TcpClient<>(
                new StompTcpFactory("127.0.0.1", 61614, true)
        ));
    
    return handler;

instanceof 条件是为了简化单元测试中NoOpBrokerMessageHandler 的使用。

最后,下面是上面使用的 StompTcpFactory 的实现:

public class StompTcpFactory implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> 

    private final Environment environment = new Environment(new SynchronousDispatcherConfigReader());
    private final String host;
    private final int port;
    private final boolean ssl;

    public StompTcpFactory(String host, int port, boolean ssl) 
        this.host = host;
        this.port = port;
        this.ssl = ssl;
    

    @Override
    public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) 
        return tcpClientSpec
                .env(environment)
                .codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
                .ssl(ssl ? new SslOptions() : null)
                .connect(host, port);
    

    private static class SynchronousDispatcherConfigReader implements ConfigurationReader 
        @Override
        public ReactorConfiguration read() 
            return new ReactorConfiguration(Collections.emptyList(), "sync", new Properties());
        
    


【讨论】:

很好的修复!真的不应该这么难。在返回 tcpClientSpec 对象之前,我必须为 VM 设置默认 SSLContext,因为反应器代码会导致经典的错误假设密钥库作为物理文件存在于磁盘上。 完美!我花了几个小时尝试使用 SSL 连接将代理中继与 amazonMQ(aws)连接,但没有成功。您的配置就像一个魅力。 spring-messaging 中是否存在未解决的问题? @MartinChoraine 在这里也一样!惊人的。顺便说一句,它不喜欢连接字符串提供的stomp+ssl:// 前缀。 @jordan.baucke 我发布了一个新答案,因为这段代码几乎完美,但线程处理存在问题。 同意@MartinChoraine 这个解决方案是有问题的,会对生产环境造成严重的麻烦。【参考方案4】:

StompBrokerRelayMessageHandler 有一个可以设置的 tcpClient 属性。但是看起来我们没有通过WebSocketMessageBrokerConfigurer 设置公开它。

您可以删除 @EnableWebSocketMessageBroker 并改为扩展 DelegatingWebSocketMessageBrokerConfiguration。它实际上是相同的,但您现在直接从提供所有 bean 的配置类扩展。

这允许您随后覆盖 stompBrokerRelayMessageHandler() bean 并直接设置其 TcpClient 属性。只需确保覆盖方法标记为@Bean

【讨论】:

谢谢罗森...让我试一试先生。 只是想感谢您的帮助...花了我一段时间,但我终于成功了...你的摇滚伙伴 :)

以上是关于ActiveMQ配置ssl安全连接的主要内容,如果未能解决你的问题,请参考以下文章

Netty5使用自签证书实现SSL安全连接

ESP32入门基础之安全连接SMP-client

ESP32入门基础之安全连接SMP-server

Linux 半连接队列,全连接队列

你知道TCP的半连接与全连接队列吗?

TCP 半连接队列和全连接队列