套接字阻止在 Java 服务器和 Python 客户端之间发送消息

Posted

技术标签:

【中文标题】套接字阻止在 Java 服务器和 Python 客户端之间发送消息【英文标题】:Socket blocks sending messages between a Java server and Python client 【发布时间】:2021-12-23 22:01:01 【问题描述】:

我需要在本地 Windows 机器上的 Java 应用程序和 python 脚本之间传递一些数据字符串。因此,我决定使用与 TCP 通信的 Java 套接字服务器与 python 客户端进行通信。 Java 创建了两个线程来处理本地主机端口 9998 和 9999 上的两个套接字连接。我使用端口 9998 来处理传入消息,而我使用端口 9999 来处理发送消息。 我的两个应用程序在发送/接收的前几条消息中运行顺利,并且在某些时候,它停止了将字符串从 Java 发送到 Python 的调用。 这是我的代码的一部分:

这个 Java 类处理套接字服务器的创建和通信

    public class ServerSocketConnection 

    private int port;
    private Socket socket;
    private ServerSocket serverSocket;
    private Logger logger;
    private BufferedWriter out;
    private BufferedReader in;

    public ServerSocketConnection(int port) 
        this.port = port;
        logger = App.getLogger();
    

    // Create a server for a socket connection
    public void createServer() 
        try 
            // Create a server socket
            serverSocket = new ServerSocket(port);
            // Socket creation
            socket = serverSocket.accept();
            // Create a print writer
            out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
            // Create a buffered reader
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
         catch (IOException e) 
            logger.severe("Error creating server socket");
        
    

    // Close the server socket
    public void closeServer() 
        try 
            serverSocket.close();
         catch (IOException e) 
            logger.severe("Error closing server socket");
        
    

    public void sendMessage(String message) 
        try 
            // Sending the byte lenght of the message
            byte[] ptext = message.getBytes("UTF-8");
            send(String.valueOf(ptext.length));
            // Sending the message
            send(message);
         catch (IOException e) 
            logger.severe("Error sending message:" + e.getMessage());
        
    

    private void send(String message) throws IOException 
        out.write(message);
        out.newLine();
        out.flush();
    

    public String receiveMessage() 
        try 
            return in.readLine();
         catch (IOException e) 
            logger.severe("Error receiving message");
            return null;
        
    

这是处理消息发送的 Java 线程。它从其他线程共享的队列中获取要发送的消息。

public class SendToPlatform implements Runnable 

    private static final int PORT = 9999;
    private Thread worker;
    private AtomicBoolean running;
    private AtomicBoolean stopped = new AtomicBoolean(false);
    private BlockingQueue<String> queueOut;
    private Logger logger;
    private ServerSocketConnection serverSocketConnection;

    public SendToPlatform(BlockingQueue<String> queueOut, AtomicBoolean running) 
        this.queueOut = queueOut;
        this.running = running;
        this.logger = App.getLogger();
        serverSocketConnection = new ServerSocketConnection(PORT);
    

    public void run() 
        stopped.set(false);
        serverSocketConnection.createServer();
        while (running.get()) 
            socketSender();
        
        stopped.set(true);
    

    private void socketSender() 
        if (!queueOut.isEmpty()) 
            String element = null;
            try 
                element = queueOut.poll(1000, TimeUnit.MILLISECONDS);
             catch (InterruptedException e) 
                logger.severe("SendToPlatform: InterruptedException: " + e.getMessage());
            
            serverSocketConnection.sendMessage(element);
        
    

这是用于从 Java 套接字服务器接收消息的 python 线程:

    def __socket_reading_no_server(self, queue_input : queue.Queue, is_running : bool):
        HOST = "localhost"
        PORT = 9999
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.connect((HOST, PORT))

        while is_running:
            data = s.recv(4)
            message_size = int(data.decode('UTF-8').strip())
            data = s.recv(min(message_size + 2, 1024))
            message = data.decode('UTF-8').strip()
            queue_input.put(message)
        s.close()

并且这个方法是作为一个线程启动的,带有这些指令:

input_thread = threading.Thread(target=self.__socket_reading_no_server , args =(self.__queue_input, self.__running, ), daemon=True)
input_thread.start()

通过调试、记录和使用 Wireshark 来了解我的代码中的问题,我得出结论,我有一个反复出现的问题,out.write 指令在正确发送大约 10 条消息后在发送消息时阻塞。当我关闭套接字连接时,挂起的消息被释放。我尝试使用PrintWriterDataOutputStream 而不是BufferedWriter,,但发生了同样的问题。在发送字符串以适应s.recv() 大小之前,我尝试不发送消息的长度,但发生了同样的问题。 我是套接字编程的新手,可能我做了一些非常错误的事情,但我找不到问题出在哪里。也许有更好的方法在我不知道的进程之间传递数据而不是套接字?

@absuu 回答后编辑

在应用答案中建议的更正后,我仍然在尝试写入套接字时在发送方法阻塞中遇到out.write 的相同问题。我编辑我的代码如下:

public class ServerSocketConnection 
    [...]
    public void sendMessage(String message) 
        try 
            send(message);
         catch (IOException e) 
            logger.severe("Error sending message:" + e.getMessage());
        
    

    private void send(String message) throws IOException 
        message += "\r\n";
        byte[] ptext = message.getBytes("UTF-8");
        out.write(String.format("%2d",ptext.length));
        out.write("\r\n");
        out.flush();
        out.write(new String(ptext));
        logger.info("Data sent");
        out.flush();
    

我还增加了s.recv 的大小,但没有任何改变

【问题讨论】:

此处阅读过多,但在我看来,您的“信息”不一致。您将长度作为消息长度的文本表示形式发送,就像在 UTF-16 中一样,尽管我看不出您实际上是在 UTF-16 中发送消息。同时,在 Python 方面,我看到了关于 UTF-8 的讨论。 您发送消息长度的方式存在缺陷。您正在发送长度的字符串表示形式。如果您的字符串是 3 个字符,那么您将发送“3”。如果是 10 个字符,您将发送“10”。但是,在服务器端,您总是读取 4 个字节。换句话说,这种可靠工作的潜力受到消息长度 >=1000 和 this answer我几天前写的一个类似的问题 @BrutusForcus 我按照您链接的答案更改了我的代码,但在正确发送某些消息后,我仍然遇到套接字阻塞流的相同问题。我在python中使用from multiprocessing.connection import Listenerread_bytes方法并按照你的建议更改了Java,但没有任何改变。 您仍然在不恰当地发送消息长度。看看我之前提到的答案中的 Java 客户端代码 【参考方案1】:

TL;DR,请参阅下面的代码更正


您提出任何问题之前,需要注意以下几点:

    将服务器的数据编码从 UTF-16 更改为 UTF-8为什么? 任何端点之间的数据传输都依赖于consistency,即服务器/客户端应用程序的数据编码。当您的服务器 (ServerSocketConnection) 发送使用 UTF-16 编码的消息时,您的客户端 (__socket_reading_no_server) 正在接收使用 UTF-8 编码的消息。即使您的客户端能够接收来自服务器的所有消息,它也无法识别它们。例如,UTF-16 将字符串 "5" 编码为字节 [0,53],对于 UTF-8,结果是 [53](假设大端字节序)。详情请参阅Wikipedia。 不要使用out.newLine()。请改用out.write("\r\n")为什么? newline() 的行为与平台有关,这导致分别返回一个或两个字符,用于类 Unix 操作系统或 Windows 操作系统。它依赖于line.separator 系统属性,您可以参考Java doc 了解更多详细信息。 您的data = s.recv(4) 设置了一个约束,即您的客户端一次最多读取4 个字节,这很危险。 为什么? 因为根据Python doc,您的4 不是客户端接收的实际字节数,而是要接收的最大数据量。此外,理论上客户端最多只能接收 9999 个字节(字节 1~4:'9')的下一条传入消息。

对于您的问题“...发送...时阻塞的指令”:不幸的是,这里没有提供错误消息,我们无法准确推断出整个事情的哪一部分错误的。但是,我们可以推断,这更有可能是Java实现网络套接字的结果,因为您遇到的情况可能在C网络编程中(即原始套接字)中并不常见,即阻塞 根据POSIX 对write 系统调用的定义,在连续传输字节期间不会发生(请注意,大多数高级语言最终都会调用write 系统调用来发送字节):

成功完成后,write() 和 pwrite() 应返回实际写入与 fildes 关联的文件的字节数。此数字不得大于 nbyte。否则,应返回-1并设置errno以指示错误。

也就是说,在调用write 将字节发送到流缓冲区之后,它只会返回一些东西,而不是阻塞

网络套接字的Java实现相当复杂,这绝对不是Java的错。事实上,如果我们能正确使用套接字,那么那些晦涩难懂的错误就会消失。例如,根据我的测试,在应用以下更正后,您的应用程序运行良好


代码更正:

    ServerSocketConnection / byte[] ptext = message.getBytes("UTF-16"); -> byte[] ptext = message.getBytes("UTF-8"); ServerSocketConnection / send(String.valueOf(ptext.length)); -> send(String.format("%2d",ptext.length)); ServerSocketConnection / out.newLine() -> out.write("\r\n")

测试

服务器:

BlockingQueue<String> q = new ArrayBlockingQueue<String>(20);
q.add("str 1");
q.add("str 2");
q.add("str 3");
serverSocketConnection.sendMessage(element);
logger.info("element:"+element);  // debug
########################################################## Server Outputs  
Nov 11, 2021 11:05:51 PM SendToPlatform socketSender
INFO: element:str 1
Nov 11, 2021 11:05:51 PM SendToPlatform socketSender
INFO: element:str 2
Nov 11, 2021 11:05:51 PM SendToPlatform socketSender
INFO: element:str 3

客户:

print("message: %s" % message)   # debug
# queue_input.put(message)
########################################################## Client Outputs
message: str 1
message: str 2
message: str 3

编辑:

我要强调的另一件事是,尽管我们实际上并不知道您的应用程序会做什么,但通过网络套接字进行简单的基于长度的消息传输是可能的。也许它不是很实用和健壮,但它绝对是可能的。以下是对您的代码的一些更详细的更正:

ServerSocketConnection

// ............. other parts stay unchanged
public void sendMessage(String message) 
    try 
        int msgLen = message.getBytes("UTF-8").length;
        send(String.format("%3d", msgLen));     // tell client the message size
        send(message);      // send actual message
     catch (IOException e) 
        logger.severe("Error sending message:" + e.getMessage());
    


private void send(String message) throws IOException 
    out.write(message);
    out.flush();

// ............. other parts stay unchanged

__socket_reading_no_server

# ............. other parts stay unchanged
while is_running:
    data = s.recv(3)
    message_size = int(data.decode('UTF-8').strip())
    data = s.recv(min(message_size, 1024))
    message = data.decode('UTF-8').strip()
    print("incoming message:[%s]" % message)
# ............. other parts stay unchanged

非常非常非常重要

这里我们有 M=3,这限制了你的任何消息都应该是一个 最多有 999 个字符的字符串!也就是说,如果您希望您的应用程序正常工作,您的每条消息,比如String msg,都应该满足msg.length &lt;= 999

【讨论】:

我尝试了您的更正,并仔细阅读了您的回答。感谢您的澄清,我将编辑我的问题以解决您的问题。但是,在实施您的更正后,我仍然遇到发送时 java 阻塞的相同问题。我没有任何错误消息要显示,因为它没有给我一个。它只会阻止out.write 调用,直到我关闭流。 @liogiu2 你试过我发布的测试用例了吗?如果这些测试工作正常,则问题可能与消息队列中的字符串长度有关。我提供的建议仅对 My Tests 负责,它使用一些相对较短的字符串作为输入。 str 1str 2 需要 5 个字符,这就是我建议 String.format("%2d",len) 的原因,即 len("5\r\n")=4。同样,如果你的字符串大小为 N,并且 N 最多为 M 位,那么你应该改用String.format("%Md",len),加上s.recv(M),加上删除第4行out.write("\r\n"); @liogiu2 见编辑。希望这部分最终能消除障碍。【参考方案2】:

这是一个 Java 客户端和 Python 服务器的“精简”实现,它演示了一种发送任意长度消息(在本例中为字符串)的有效机制:

import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;

public class Client 

    public static void main(String[] args) throws IOException 
        String message = "Talking isn't doing. It is a kind of good deed to say well; and yet words are not deeds.";
        try (Socket socket = new Socket("localhost", 7070)) 
            try (DataOutputStream out = new DataOutputStream(socket.getOutputStream())) 
                byte[] dts = message.getBytes();
                out.writeInt(dts.length);
                out.write(dts);
            
        
    

请注意客户端在发送实际消息之前如何将即将到来的消息的长度作为 32 位整数发送。

from multiprocessing.connection import Listener

def main():
    listener = Listener(address=('0.0.0.0', 7070), family='AF_INET', authkey=None)
    with listener.accept() as conn:
        print(conn.recv_bytes().decode())
            

if __name__ == '__main__':
    main()

连接类期望使用客户端实现的协议(在本例中为 Java)接收消息 - 即,一个 32 位整数,它给出了要遵循的数据量。

我希望这可以澄清问题

【讨论】:

我无法找到表明 recv_bytes() 需要 32 位整数长度后跟那么多字节的文档。能给个链接吗? @PresidentJamesK.Polk 我也找不到任何文档。并非所有与 Python 相关的内容都有很好的文档记录。我描述的协议是众所周知的,我只是猜测它可能正在使用该策略。我能够凭经验确定我是对的。此外,如果您考虑一下,如果不是这种情况,我的答案中的 2 个代码示例根本无法工作。

以上是关于套接字阻止在 Java 服务器和 Python 客户端之间发送消息的主要内容,如果未能解决你的问题,请参考以下文章

Python OpenCV线程

Python学习日志——TCP网络应用程序开发流程

如果防火墙打开,Java 7 会阻止 Windows Vista 和 7 上的 FTP 传输。有任何想法吗?

是否可以让 3 台服务器使用 Python 套接字在一个端口(比如端口 48000)上相互通信

如何使用 Java 客户端和 Python 服务器通过套接字创建 IPC?

带有 Java 客户端和 Python 服务器的数据报套接字