Python上的多线程websocket服务器
Posted
技术标签:
【中文标题】Python上的多线程websocket服务器【英文标题】:Multi-threaded websocket server on Python 【发布时间】:2013-10-13 20:05:22 【问题描述】:请帮助我改进此代码:
import base64
import hashlib
import threading
import socket
class WebSocketServer:
def __init__(self, host, port, limit, **kwargs):
"""
Initialize websocket server.
:param host: Host name as IP address or text definition.
:param port: Port number, which server will listen.
:param limit: Limit of connections in queue.
:param kwargs: A dict of key/value pairs. It MAY contains:<br>
<b>onconnect</b> - function, called after client connected.
<b>handshake</b> - string, containing the handshake pattern.
<b>magic</b> - string, containing "magic" key, required for "handshake".
:type host: str
:type port: int
:type limit: int
:type kwargs: dict
"""
self.host = host
self.port = port
self.limit = limit
self.running = False
self.clients = []
self.args = kwargs
def start(self):
"""
Start websocket server.
"""
self.root = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.root.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.root.bind((self.host, self.port))
self.root.listen(self.limit)
self.running = True
while self.running:
client, address = self.root.accept()
if not self.running: break
self.handshake(client)
self.clients.append((client, address))
onconnect = self.args.get("onconnect")
if callable(onconnect): onconnect(self, client, address)
threading.Thread(target=self.loop, args=(client, address)).start()
self.root.close()
def stop(self):
"""
Stop websocket server.
"""
self.running = False
def handshake(self, client):
handshake = 'HTTP/1.1 101 Switching Protocols\r\nConnection: Upgrade\r\nUpgrade: websocket\r\nSec-WebSocket-Accept: %s\r\n\r\n'
handshake = self.args.get('handshake', handshake)
magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
magic = self.args.get('magic', magic)
header = str(client.recv(1000))
try:
res = header.index("Sec-WebSocket-Key")
except ValueError:
return False
key = header[res + 19: res + 19 + 24]
key += magic
key = hashlib.sha1(key.encode())
key = base64.b64encode(key.digest())
client.send(bytes((handshake % str(key,'utf-8')), 'utf-8'))
return True
def loop(self, client, address):
"""
:type client: socket
"""
while True:
message = ''
m = client.recv(1)
while m != '':
message += m
m = client.recv(1)
fin, text = self.decodeFrame(message)
if not fin:
onmessage = self.args.get('onmessage')
if callable(onmessage): onmessage(self, client, text)
else:
self.clients.remove((client, address))
ondisconnect = self.args.get('ondisconnect')
if callable(ondisconnect): ondisconnect(self, client, address)
client.close()
break
def decodeFrame(self, data):
if (len(data) == 0) or (data is None):
return True, None
fin = not(data[0] & 1)
if fin:
return fin, None
masked = not(data[1] & 1)
plen = data[1] - (128 if masked else 0)
mask_start = 2
if plen == 126:
mask_start = 4
plen = int.from_bytes(data[2:4], byteorder='sys.byteorder')
elif plen == 127:
mask_start = 10
plen = int.from_bytes(data[2:10], byteorder='sys.byteorder')
mask = data[mask_start:mask_start+4]
data = data[mask_start+4:mask_start+4+plen]
decoded = []
i = 0
while i < len(data):
decoded.append(data[i] ^ mask[i%4])
i+=1
text = str(bytearray(decoded), "utf-8")
return fin, text
def sendto(self, client, data, **kwargs):
"""
Send <b>data</b> to <b>client</b>. <b>data</b> can be of type <i>str</i>, <i>bytes</i>, <i>bytearray</i>, <i>int</i>.
:param client: Client socket for data exchange.
:param data: Data, which will be sent to the client via <i>socket</i>.
:type client: socket
:type data: str|bytes|bytearray|int|float
"""
if type(data) == bytes or type(data) == bytearray:
frame = data
elif type(data) == str:
frame = bytes(data, kwargs.get('encoding', 'utf-8'))
elif type(data) == int or type(data) == float:
frame = bytes(str(data), kwargs.get('encoding', 'utf-8'))
else:
return None
framelen = len(frame)
head = bytes([0x81])
if framelen < 126:
head += bytes(int.to_bytes(framelen, 1, 'big'))
elif 126 <= framelen < 0x10000:
head += bytes(126)
head += bytes(int.to_bytes(framelen, 2, 'big'))
else:
head += bytes(127)
head += bytes(int.to_bytes(framelen, 8, 'big'))
client.send(head + frame)
它工作正常。 我希望服务器使用所有处理器内核来提高性能。并且此代码在大量连接中无效。这种情况下如何实现多线程解决方案?
抱歉我的英语不好。
【问题讨论】:
可能是code review site的问题。 你需要子处理,而不是线程。 你能发或写一个例子吗? 我认为算法是:以核心数创建子进程 => 在每个进程中执行 start 方法的代码。但我不明白,如何在进程之间正确发送数据。 是的,我可以写一个例子,但不,我不会。正如您已经了解的那样,不同进程之间的通信并不容易,坦率地说,这很复杂,但是 python 中有一些模块可以帮助减轻多处理带来的痛苦 (docs.python.org/2/library/multiprocessing.html)。 【参考方案1】:在 CPython 中,全局解释器锁或 GIL 是一个互斥锁, 防止多个本机线程在以下位置执行 Python 字节码 一次。
因此您的代码将无法正常工作。如果您想同时支持多个客户端,可以使用 processeses 代替线程(不在 Windows* 上)、twisted 或 asyncore。
如果您的选择是多处理,试试这个:
client.py:
import socket
def main():
s = socket.socket()
s.connect(("localhost", 5555))
while True:
data = raw_input("> ")
s.send(data)
if data == "quit":
break
s.close()
if __name__ == "__main__":
main()
server.py:
from multiprocessing import Process
from os import getpid
import socket
def receive(conn):
print "(%d) connected." % getpid()
while True:
data = conn.recv(1024)
if data:
if data == "quit":
break
else:
print "(%s) data" % getpid()
def main():
s = socket.socket()
s.bind(("localhost", 5555))
s.listen(1)
while True:
conn, address = s.accept()
print "%s:%d connected." % address
Process(target=receive, args=(conn,)).start()
s.close()
if __name__ == "__main__":
main()
*在 Windows 上,此代码在酸洗套接字时会引发错误:
File "C:\Python27\lib\pickle.py", line 880, in load_eof
raise EOFError
【讨论】:
线程在 Python 中为 I/O 绑定的任务工作正常 - 多个线程确实可以同时运行(GIL 是围绕大多数可能阻塞的 I/O 调用发布的)。而且,正如您链接到的文档所说,multiiprocessing
在 Windows 上也可以正常工作。
当我说“不在 Windows 上”时,我指的是套接字。尝试将套接字对象作为参数传递给新进程。
@cdonts,是的,我遇到了您描述的问题。在进程之间发送套接字是不可能的。我需要一些时间让它工作。当我完成时,我将在这里发布工作代码。伙计们,如果您有一些代码示例,请在此处发布。
感谢您提供的清晰示例。但是,在这种情况下,客户端的数量就是进程的数量。偷偷怀疑这不是正确的做法。稍后我会发布我的代码。以上是关于Python上的多线程websocket服务器的主要内容,如果未能解决你的问题,请参考以下文章