将消息发送到套接字端口并使用Spring Integration接收响应

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了将消息发送到套接字端口并使用Spring Integration接收响应相关的知识,希望对你有一定的参考价值。

现在的情况

  • 在linux服务器上,几个jar正在运行套接字 - 使用字符串消息进行侦听和响应
  • 在wildfly应用程序服务器中运行的新war正在将请求委托给这些套接字
  • WAR使用弹簧,特别是弹簧与注释集成

我有一个配置类,包含@Configuration / @EnableIntegration / @ IntegrationComponentScan服务

我创建了一个消息传递网关

@MessagingGateway(defaultRequestChannel = "testGateway")
public interface TestGateway{
    public Future<String> sendMessage(String in);
}

应用程序应发送请求并以客户端形式接收请求。我创建了一个null事件处理程序,因为应用程序应该只发送字符串并等待答案

@Bean
public MessageChannel testChannel() {
    return new DirectChannel();
}

@Bean
@ServiceActivator(inputChannel = "testGateway")
public MessageHandler testGate() {
    final TcpOutboundGateway gate = new TcpOutboundGateway();
    gate.setConnectionFactory(connectionFactory());
    gate.setReplyChannel(docServerChannel());
    return gate;
}

@Bean
public AbstractClientConnectionFactory connectionFactory() {
    final AbstractClientConnectionFactory connectionFactory = new TcpNetClientConnectionFactory("localhost", 5959);
    connectionFactory.setSoTimeout(300000);
    connectionFactory.setApplicationEventPublisher(new NullEventPublisher());
    connectionFactory.setSerializer(new DefaultSerializer());
    connectionFactory.setDeserializer(new DefaultDeserializer());
    return connectionFactory;
}

接收数据时,消息应转换为字符串,发送时应转换为字节

@MessageEndpoint
public static class TestMessage {

    @Transformer(inputChannel = "testChannel")
    public String convert(final byte[] bytes) {
        return new String(bytes);
    }

    @Transformer(inputChannel = "testGateway")
    public String convertResult(final byte[] bytes) {
        return new String(bytes);
    }

}

部署了应用程序,但响应总是超时。套接字正在运行。我只想要一个简单的直接双向连接:WAR < - > JAR。

有人可以帮忙还是给我一个暗示?

------ UPDATE-1 ----------

套接字正在接收消息,但由于套接字在发送消息后关闭,因此无法读取响应。

------ UPDATE-2 ----------

  • 这是一个错字。系统返回MessageHandler
  • 我已将工厂添加为spring managed bean
  • 我在遗留代码中加了' r n'
  • 该应用程序仍在抱怨“等待响应超时”

旧版服务器正在打开服务器套接字并向套接字发送消息

final OutputStream os = serverSocket.getOutputStream();
final PrintWriter pw = new PrintWriter(os, true);
final BufferedReader br = new BufferedReader(new InputStreamReader(serverSocket.getInputStream()));
final String incoming = br.readLine();
final String response= "ok
";
pw.println(response);
pw.flush();
Thread.sleep(5000);
pw.close();
serverSocket.close();

------ UPDATE-3 ----------

来自Spring的TcpOutboundGateway没有得到回复

        connection.send(requestMessage);
        Message<?> replyMessage = reply.getReply();
        if (replyMessage == null) {
答案

连接工厂需要是一个@Bean,以便Spring可以管理它。

public TcpInboundGateway testGate() {
    final AbstractClientConnectionFactory connectionFactory = new TcpNetClientConnectionFactory("localhost", 5959); // already running socket
    connectionFactory.setApplicationEventPublisher(new NullEventPublisher());
    final TcpOutboundGateway gate = new TcpOutboundGateway();
    gate.setConnectionFactory(connectionFactory);
    gate.setOutputChannelName("testChannel");
    return gate;
}

这不会编译;返回类型与您返回的内容不匹配。

假设这只是一个错字,并且bean实际上是一个出站网关,使用此配置,必须使用 (CRLF)终止回复。

the documentation;向下滚动到...

TCP是一种流媒体协议;这意味着必须为通过TCP传输的数据提供一些结构,因此接收器可以将数据划分为离散消息。连接工厂配置为使用(反)序列化器在消息有效负载和通过TCP发送的位之间进行转换。这是通过分别为入站和出站消息提供解串器和串行器来实现的。提供了许多标准(de)序列化器。

...并阅读标准反序列化器。使用您的配置,标准解串器正在等待终止 (CRLF)。

服务器代码有什么作用?

编辑

@SpringBootApplication
public class So49046888Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext ctx = SpringApplication.run(So49046888Application.class, args);
        String reply = ctx.getBean(TestGateway.class).sendMessage("foo").get();
        System.out.println(reply);
        Thread.sleep(10_000);
        ctx.close();
    }

    @Bean
    public ServerSocket serverSocket() throws IOException {
        return ServerSocketFactory.getDefault().createServerSocket(5959);
    }

    @Bean
    public ApplicationRunner runner(TaskExecutor exec) {
        return args -> {
            exec.execute(() -> {
                try {
                    while (true) {
                        Socket socket = serverSocket().accept();
                        final OutputStream os = socket.getOutputStream();
                        final PrintWriter pw = new PrintWriter(os, true);
                        final BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                        final String incoming = br.readLine();
                        System.out.println(incoming);
                        final String response= "ok
";
                        pw.print(response);
                        pw.flush();
                        Thread.sleep(5000);
                        pw.close();
                        socket.close();
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            });
        };
    }

    @Bean
    public TaskExecutor exec() {
        return new ThreadPoolTaskExecutor();
    }

    @Bean
    @ServiceActivator(inputChannel = "testGateway")
    public MessageHandler testGate() {
        final TcpOutboundGateway gate = new TcpOutboundGateway();
        gate.setConnectionFactory(connectionFactory());
        gate.setReplyChannelName("toString");
        gate.setRemoteTimeout(60_000);
        return gate;
    }

    @Transformer(inputChannel = "toString")
    public String transform(byte[] bytes) {
        return new String(bytes);
    }

    @Bean
    public AbstractClientConnectionFactory connectionFactory() {
        final AbstractClientConnectionFactory connectionFactory = new TcpNetClientConnectionFactory("localhost", 5959);
        connectionFactory.setSoTimeout(300000);
        return connectionFactory;
    }

    @MessagingGateway(defaultRequestChannel = "testGateway")
    public static interface TestGateway {
        public Future<String> sendMessage(String in);
    }

}

以上是关于将消息发送到套接字端口并使用Spring Integration接收响应的主要内容,如果未能解决你的问题,请参考以下文章

重新绑定 UDP 套接字

节点 Js 无法将消息发送到在线特定套接字

节点 Js 无法将消息发送到在线特定套接字

Unix - 如何向多个进程发送消息?

使用套接字将数据从 node.js 发送到 Java

多个客户端套接字到单个服务器 C++