将消息发送到套接字端口并使用Spring Integration接收响应
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了将消息发送到套接字端口并使用Spring Integration接收响应相关的知识,希望对你有一定的参考价值。
现在的情况
- 在linux服务器上,几个jar正在运行套接字 - 使用字符串消息进行侦听和响应
- 在wildfly应用程序服务器中运行的新war正在将请求委托给这些套接字
- WAR使用弹簧,特别是弹簧与注释集成
我有一个配置类,包含@Configuration / @EnableIntegration / @ IntegrationComponentScan服务
我创建了一个消息传递网关
@MessagingGateway(defaultRequestChannel = "testGateway")
public interface TestGateway{
public Future<String> sendMessage(String in);
}
应用程序应发送请求并以客户端形式接收请求。我创建了一个null事件处理程序,因为应用程序应该只发送字符串并等待答案
@Bean
public MessageChannel testChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "testGateway")
public MessageHandler testGate() {
final TcpOutboundGateway gate = new TcpOutboundGateway();
gate.setConnectionFactory(connectionFactory());
gate.setReplyChannel(docServerChannel());
return gate;
}
@Bean
public AbstractClientConnectionFactory connectionFactory() {
final AbstractClientConnectionFactory connectionFactory = new TcpNetClientConnectionFactory("localhost", 5959);
connectionFactory.setSoTimeout(300000);
connectionFactory.setApplicationEventPublisher(new NullEventPublisher());
connectionFactory.setSerializer(new DefaultSerializer());
connectionFactory.setDeserializer(new DefaultDeserializer());
return connectionFactory;
}
接收数据时,消息应转换为字符串,发送时应转换为字节
@MessageEndpoint
public static class TestMessage {
@Transformer(inputChannel = "testChannel")
public String convert(final byte[] bytes) {
return new String(bytes);
}
@Transformer(inputChannel = "testGateway")
public String convertResult(final byte[] bytes) {
return new String(bytes);
}
}
部署了应用程序,但响应总是超时。套接字正在运行。我只想要一个简单的直接双向连接:WAR < - > JAR。
有人可以帮忙还是给我一个暗示?
------ UPDATE-1 ----------
套接字正在接收消息,但由于套接字在发送消息后关闭,因此无法读取响应。
------ UPDATE-2 ----------
- 这是一个错字。系统返回MessageHandler
- 我已将工厂添加为spring managed bean
- 我在遗留代码中加了' r n'
- 该应用程序仍在抱怨“等待响应超时”
旧版服务器正在打开服务器套接字并向套接字发送消息
final OutputStream os = serverSocket.getOutputStream();
final PrintWriter pw = new PrintWriter(os, true);
final BufferedReader br = new BufferedReader(new InputStreamReader(serverSocket.getInputStream()));
final String incoming = br.readLine();
final String response= "ok
";
pw.println(response);
pw.flush();
Thread.sleep(5000);
pw.close();
serverSocket.close();
------ UPDATE-3 ----------
来自Spring的TcpOutboundGateway没有得到回复
connection.send(requestMessage);
Message<?> replyMessage = reply.getReply();
if (replyMessage == null) {
连接工厂需要是一个@Bean
,以便Spring可以管理它。
public TcpInboundGateway testGate() {
final AbstractClientConnectionFactory connectionFactory = new TcpNetClientConnectionFactory("localhost", 5959); // already running socket
connectionFactory.setApplicationEventPublisher(new NullEventPublisher());
final TcpOutboundGateway gate = new TcpOutboundGateway();
gate.setConnectionFactory(connectionFactory);
gate.setOutputChannelName("testChannel");
return gate;
}
这不会编译;返回类型与您返回的内容不匹配。
假设这只是一个错字,并且bean实际上是一个出站网关,使用此配置,必须使用
(CRLF)终止回复。
见the documentation;向下滚动到...
TCP是一种流媒体协议;这意味着必须为通过TCP传输的数据提供一些结构,因此接收器可以将数据划分为离散消息。连接工厂配置为使用(反)序列化器在消息有效负载和通过TCP发送的位之间进行转换。这是通过分别为入站和出站消息提供解串器和串行器来实现的。提供了许多标准(de)序列化器。
...并阅读标准反序列化器。使用您的配置,标准解串器正在等待终止
(CRLF)。
服务器代码有什么作用?
编辑
@SpringBootApplication
public class So49046888Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext ctx = SpringApplication.run(So49046888Application.class, args);
String reply = ctx.getBean(TestGateway.class).sendMessage("foo").get();
System.out.println(reply);
Thread.sleep(10_000);
ctx.close();
}
@Bean
public ServerSocket serverSocket() throws IOException {
return ServerSocketFactory.getDefault().createServerSocket(5959);
}
@Bean
public ApplicationRunner runner(TaskExecutor exec) {
return args -> {
exec.execute(() -> {
try {
while (true) {
Socket socket = serverSocket().accept();
final OutputStream os = socket.getOutputStream();
final PrintWriter pw = new PrintWriter(os, true);
final BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
final String incoming = br.readLine();
System.out.println(incoming);
final String response= "ok
";
pw.print(response);
pw.flush();
Thread.sleep(5000);
pw.close();
socket.close();
}
}
catch (Exception e) {
e.printStackTrace();
}
});
};
}
@Bean
public TaskExecutor exec() {
return new ThreadPoolTaskExecutor();
}
@Bean
@ServiceActivator(inputChannel = "testGateway")
public MessageHandler testGate() {
final TcpOutboundGateway gate = new TcpOutboundGateway();
gate.setConnectionFactory(connectionFactory());
gate.setReplyChannelName("toString");
gate.setRemoteTimeout(60_000);
return gate;
}
@Transformer(inputChannel = "toString")
public String transform(byte[] bytes) {
return new String(bytes);
}
@Bean
public AbstractClientConnectionFactory connectionFactory() {
final AbstractClientConnectionFactory connectionFactory = new TcpNetClientConnectionFactory("localhost", 5959);
connectionFactory.setSoTimeout(300000);
return connectionFactory;
}
@MessagingGateway(defaultRequestChannel = "testGateway")
public static interface TestGateway {
public Future<String> sendMessage(String in);
}
}
以上是关于将消息发送到套接字端口并使用Spring Integration接收响应的主要内容,如果未能解决你的问题,请参考以下文章