Python Socket 接收大量数据

Posted

技术标签:

【中文标题】Python Socket 接收大量数据【英文标题】:Python Socket Receive Large Amount of Data 【发布时间】:2013-07-14 02:58:33 【问题描述】:

当我尝试接收大量数据时,它会被切断,我必须按 Enter 键才能获取其余数据。起初我能够增加一点,但它仍然不会收到所有的。如您所见,我增加了 conn.recv() 上的缓冲区,但它仍然没有获得所有数据。它在某个点将其切断。我必须在我的 raw_input 上按 enter 才能接收其余数据。无论如何我可以一次获取所有数据吗?这是代码。

port = 7777
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('0.0.0.0', port))
sock.listen(1)
print ("Listening on port: "+str(port))
while 1:
    conn, sock_addr = sock.accept()
    print "accepted connection from", sock_addr
    while 1:
        command = raw_input('shell> ')
        conn.send(command)
        data = conn.recv(8000)
        if not data: break
        print data,
    conn.close()

【问题讨论】:

【参考方案1】:

TCP/IP 是基于流的 协议,而不是基于消息的 协议。无法保证一个对等方的每个send() 调用都会导致另一个对等方接收到发送的确切数据的单个recv() 调用——它可能会收到数据碎片,由于数据包的原因,它会被分成多个recv() 调用碎片化。

您需要在 TCP 之上定义自己的基于消息的协议,以区分消息边界。然后,要阅读一条消息,您可以继续调用recv(),直到您阅读了整条消息或发生错误。

发送消息的一种简单方法是在每条消息前面加上它的长度。然后读取消息,首先读取长度,然后读取那么多字节。您可以这样做:

def send_msg(sock, msg):
    # Prefix each message with a 4-byte length (network byte order)
    msg = struct.pack('>I', len(msg)) + msg
    sock.sendall(msg)

def recv_msg(sock):
    # Read message length and unpack it into an integer
    raw_msglen = recvall(sock, 4)
    if not raw_msglen:
        return None
    msglen = struct.unpack('>I', raw_msglen)[0]
    # Read the message data
    return recvall(sock, msglen)

def recvall(sock, n):
    # Helper function to recv n bytes or return None if EOF is hit
    data = bytearray()
    while len(data) < n:
        packet = sock.recv(n - len(data))
        if not packet:
            return None
        data.extend(packet)
    return data

然后你可以使用send_msgrecv_msg函数来发送和接收整个消息,并且它们不会在网络级别拆分或合并数据包的任何问题。

【讨论】:

我不确定我是否完全理解这一点。我了解应该发生的事情,但我似乎无法理解。我收到Exception: Socket EOF trying to recv 4 bytes 我正在使用以下内容:pastebin.com/raw.php?i=AvdN5RyW @user2585107:尝试更新版本,它使用return None,而不是在流结束时引发异常。 在将packet 添加到datarecv() 之前不应该是.decode()ed 可以同时接收字节和字符串吗? 谢谢兄弟,我的头发快秃了:D data += packet 行会使接收大型邮件变得非常缓慢。最好先使用data = bytearray(),然后再使用data.extend(packet)【参考方案2】:

您可以将其用作:data = recvall(sock)

def recvall(sock):
    BUFF_SIZE = 4096 # 4 KiB
    data = b''
    while True:
        part = sock.recv(BUFF_SIZE)
        data += part
        if len(part) < BUFF_SIZE:
            # either 0 or end of data
            break
    return data

【讨论】:

这适用于检测“文件结束”,但不适用于保持连接和检测消息结束。仅当对等方关闭其套接字部分或至少关闭一半时,才会到达“文件结尾”。 如果收到的字符串少于 4096 个字符,它将再次循环并使用sock.recv() 重新检查更多数据。这将挂起,因为没有更多数据进入。如果part 的长度小于RECV_BUFFER 的长度,那么代码可以安全地跳出循环。 @JadedTuna,似乎没有修复。 “part = sock.recv(BUFF_SIZE)”这一行似乎是一个阻塞调用,因此一旦收到完整的消息,执行就会在这一行挂起。 这段代码应该被固定为好像 len(part) 这似乎错误地假设 TCP 套接字一端的一次发送对应于另一端发送的字节数的一次接收(参见例如 here 或 here)。因此,即使客户端使用一个send 发送正好4kb,服务器也可能在第一个recv 处获得第一个,例如1kb,这将导致while 中断。【参考方案3】:

接受的答案很好,但是对于大文件来说会很慢 - 字符串是一个不可变的类,这意味着每次使用 + 符号时都会创建更多对象,使用 list 作为堆栈结构会更多高效。

这应该会更好

while True: 
    chunk = s.recv(10000)
    if not chunk: 
        break
    fragments.append(chunk)

print "".join(fragments)

【讨论】:

这正是将我的大型二进制文件下载速度提高了大约 30-50 倍的原因...谢谢【参考方案4】:

大多数答案都描述了某种recvall() 方法。如果您在接收数据时的瓶颈是在for 循环中创建字节数组,我对在recvall() 方法中分配接收数据的三种方法进行了基准测试:

字节串法:

arr = b''
while len(arr) < msg_len:
    arr += sock.recv(max_msg_size)

列表方法:

fragments = []
while True: 
    chunk = sock.recv(max_msg_size)
    if not chunk: 
        break
    fragments.append(chunk)
arr = b''.join(fragments)

预分配bytearray方法:

arr = bytearray(msg_len)
pos = 0
while pos < msg_len:
    arr[pos:pos+max_msg_size] = sock.recv(max_msg_size)
    pos += max_msg_size

结果:

【讨论】:

【参考方案5】:

您可能需要多次调用 conn.recv() 才能接收所有数据。由于 TCP 流不维护帧边界(即它们仅作为原始字节流工作,而不是结构化消息流),因此不能保证一次调用它会带来所有发送的数据.

有关该问题的其他描述,请参阅 this answer。

请注意,这意味着您需要通过某种方式了解何时收到所有数据。如果发送方总是准确发送 8000 个字节,您可以计算到目前为止您已收到的字节数,然后从 8000 中减去该字节数以了解还剩多少要接收;如果数据是可变大小的,则可以使用各种其他方法,例如让发件人在发送消息之前发送一个字节数的标头,或者如果正在发送的是 ASCII 文本,您可以查找换行符或 NUL 字符。

【讨论】:

【参考方案6】:

免责声明:在极少数情况下您确实需要这样做。如果可能,请使用现有的应用层协议或定义您自己的协议,例如。在每条消息之前加上一个固定长度的整数,指示后面的数据长度,或者用 '\n' 字符终止每条消息。 (Adam Rosenfield 的 answer 很好地解释了这一点)

话虽如此,有一种方法可以读取套接字上的所有可用数据。但是,依赖这种通信方式是个坏主意,因为它会带来丢失数据的风险。使用此解决方案时要格外小心,并且只有在阅读以下说明后才能使用。

def recvall(sock):
    BUFF_SIZE = 4096
    data = bytearray()
    while True:
        packet = sock.recv(BUFF_SIZE)
        if not packet:  # Important!!
            break
        data.extend(packet)
    return data

现在if not packet: 行是绝对关键的! 这里的许多答案建议使用像 if len(packet) &lt; BUFF_SIZE: 这样的条件,该条件已损坏,很可能会导致您过早关闭连接并丢失数据。它错误地假设 TCP 套接字一端的一次发送对应于另一端发送的字节数的一次接收。它不是。 即使仍有数据等待接收,sock.recv(BUFF_SIZE) 很有可能会返回小于BUFF_SIZE 的块。 对here 和@987654323 的问题有很好的解释@。

使用上述解决方案如果连接的另一端写入数据的速度比您读取的速度慢,您仍然有丢失数据的风险。您可能只是简单地使用您的所有数据并在有更多数据时退出。有一些方法需要使用并发编程,但这是它自己的另一个主题。

【讨论】:

【参考方案7】:

使用生成器函数的变体(我认为它更像 Python):

def recvall(sock, buffer_size=4096):
    buf = sock.recv(buffer_size)
    while buf:
        yield buf
        if len(buf) < buffer_size: break
        buf = sock.recv(buffer_size)
# ...
with socket.create_connection((host, port)) as sock:
    sock.sendall(command)
    response = b''.join(recvall(sock))

【讨论】:

如果响应小于缓冲区大小,那似乎不起作用。 @Shadur,这很有趣,当你尝试它时会发生什么?你能分享代码来重现这个问题吗?正如所写,recvall 应该产生每个接收到的缓冲区的内容,无论大小如何,只要它不为空。 从添加的调试语句来看,它会吸入第一个块中的整个响应,然后挂起等待下一个块。下面的“chunck”答案有同样的问题,我最后通过第二次测试来修复它,看看 chunk 的长度是否小于缓冲区大小。我将测试这是否也可以解决您的解决方案。 -- 编辑:确实如此。【参考方案8】:

你可以使用序列化来做到这一点

from socket import *
from json import dumps, loads

def recvall(conn):
    data = ""
    while True:
    try:
        data = conn.recv(1024)
        return json.loads(data)
    except ValueError:
        continue

def sendall(conn):
    conn.sendall(json.dumps(data))

注意:如果您想使用上面的代码共享文件,您需要将其编码/解码为 base64

【讨论】:

【参考方案9】:

修改 Adam Rosenfield 的代码:

import sys


def send_msg(sock, msg):
    size_of_package = sys.getsizeof(msg)
    package = str(size_of_package)+":"+ msg #Create our package size,":",message
    sock.sendall(package)

def recv_msg(sock):
    try:
        header = sock.recv(2)#Magic, small number to begin with.
        while ":" not in header:
            header += sock.recv(2) #Keep looping, picking up two bytes each time

        size_of_package, separator, message_fragment = header.partition(":")
        message = sock.recv(int(size_of_package))
        full_message = message_fragment + message
        return full_message

    except OverflowError:
        return "OverflowError."
    except:
        print "Unexpected error:", sys.exc_info()[0]
        raise

不过,我强烈鼓励使用原始方法。

【讨论】:

【参考方案10】:

对于在您事先不知道数据包长度的情况下寻找答案的其他人。 这是一个简单的解决方案,一次读取 4096 个字节,并在收到少于 4096 个字节时停止。但是,在接收到的数据包的总长度正好是 4096 字节的情况下它不起作用 - 然后它会再次调用 recv() 并挂起。

def recvall(sock):
    data = b''
    bufsize = 4096
    while True:
        packet = sock.recv(bufsize)
        data += packet
        if len(packet) < bufsize:
            break
    return data

【讨论】:

【参考方案11】:

我认为这个问题已经得到了很好的回答,但我只是想添加一个使用 Python 3.8 和新赋值表达式(海象运算符)的方法,因为它在风格上很简单。

import socket

host = "127.0.0.1"
port = 31337
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host,port))
s.listen()
con, addr = s.accept()
msg_list = []

while (walrus_msg := con.recv(3)) != b'\r\n':
    msg_list.append(walrus_msg)

print(msg_list)

在这种情况下,从套接字接收 3 个字节并立即分配给walrus_msg。一旦套接字接收到b'\r\n',它就会中断循环。 walrus_msg 添加到 msg_list 并在循环中断后打印。此脚本是基本脚本,但经过测试,可与 telnet 会话一起使用。

注意:(walrus_msg := con.recv(3)) 周围的括号是必需的。如果没有这个,while walrus_msg := con.recv(3) != b'\r\n': 会将walrus_msg 评估为True,而不是套接字上的实际数据。

【讨论】:

【参考方案12】:

此代码在 32 次迭代中从套接字编程-python 中从服务器接收的缓冲区中读取 1024*32(=32768) 个字节:

jsonString = bytearray()

for _ in range(32):

    packet = clisocket.recv(1024)
    if not packet:
       break
    jsonString.extend(packet)

数据驻留在 jsonString 变量中

【讨论】:

以上是关于Python Socket 接收大量数据的主要内容,如果未能解决你的问题,请参考以下文章

C# 多线程 大量数据实时接收\解析\存储 问题

第 9 章 网络应用开发

在 Python 中通过套接字发送大量数据的正确方法是啥?

如何使用 Socket.io 实时更新大量数据

Web Socket 消息未全部接收

python用socket 接收数据问题?