Python SocketServer:发送到多个客户端?
Posted
技术标签:
【中文标题】Python SocketServer:发送到多个客户端?【英文标题】:Python SocketServer: sending to multiple clients? 【发布时间】:2011-04-09 20:47:36 【问题描述】:好吧,我正在尝试构建一个带有 SocketServer 的小型 python prgram,它应该将接收到的消息发送到所有连接的客户端。我被卡住了,我不知道如何在服务器端存储客户端,也不知道如何发送给多个客户端。哦,我的程序每次连接超过 1 个客户端时都会失败,并且每次客户端发送超过一条消息时...
到目前为止,这是我的代码:
print str(self.client_address[0])+' connected.'
def handle(self):
new=1
for client in clients:
if client==self.request:
new=0
if new==1:
clients.append(self.request)
for client in clients:
data=self.request.recv(1024)
client.send(data)
class Host:
def __init__(self):
self.address = ('localhost', 0)
self.server = SocketServer.TCPServer(self.address, EchoRequestHandler)
ip, port = self.server.server_address
self.t = threading.Thread(target=self.server.serve_forever)
self.t.setDaemon(True)
self.t.start()
print ''
print 'Hosted with IP: '+ip+' and port: '+str(port)+'. Clients can now connect.'
print ''
def close(self):
self.server.socket.close()
class Client:
name=''
ip=''
port=0
def __init__(self,ip,port,name):
self.name=name
self.hostIp=ip
self.hostPort=port
self.s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s.connect((self.hostIp, self.hostPort))
def reco(self):
self.s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s.connect((self.hostIp, self.hostPort))
def nick(self,newName):
self.name=newName
def send(self,message):
message=self.name+' : '+message
len_sent=self.s.send(message)
response=self.s.recv(len_sent)
print response
self.reco()
def close(self):
self.s.close()
显然我不知道自己在做什么,所以任何帮助都会很棒。 提前致谢!
编辑:我在 Windows Vista 上使用 Python 2.7。
【问题讨论】:
我在这里解决了类似的问题:server/client code 【参考方案1】:您想在此处查看asyncore。您在客户端调用的套接字操作是阻塞的(在接收到某些数据或发生超时之前不返回),这使得很难监听从主机发送的消息并让客户端实例将数据排入队列以发送同时。 asyncore 应该从你那里抽象出基于超时的轮询循环。
这是一个代码“示例”——如果有任何不清楚的地方请告诉我:
from __future__ import print_function
import asyncore
import collections
import logging
import socket
MAX_MESSAGE_LENGTH = 1024
class RemoteClient(asyncore.dispatcher):
"""Wraps a remote client socket."""
def __init__(self, host, socket, address):
asyncore.dispatcher.__init__(self, socket)
self.host = host
self.outbox = collections.deque()
def say(self, message):
self.outbox.append(message)
def handle_read(self):
client_message = self.recv(MAX_MESSAGE_LENGTH)
self.host.broadcast(client_message)
def handle_write(self):
if not self.outbox:
return
message = self.outbox.popleft()
if len(message) > MAX_MESSAGE_LENGTH:
raise ValueError('Message too long')
self.send(message)
class Host(asyncore.dispatcher):
log = logging.getLogger('Host')
def __init__(self, address=('localhost', 0)):
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.bind(address)
self.listen(1)
self.remote_clients = []
def handle_accept(self):
socket, addr = self.accept() # For the remote client.
self.log.info('Accepted client at %s', addr)
self.remote_clients.append(RemoteClient(self, socket, addr))
def handle_read(self):
self.log.info('Received message: %s', self.read())
def broadcast(self, message):
self.log.info('Broadcasting message: %s', message)
for remote_client in self.remote_clients:
remote_client.say(message)
class Client(asyncore.dispatcher):
def __init__(self, host_address, name):
asyncore.dispatcher.__init__(self)
self.log = logging.getLogger('Client (%7s)' % name)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.name = name
self.log.info('Connecting to host at %s', host_address)
self.connect(host_address)
self.outbox = collections.deque()
def say(self, message):
self.outbox.append(message)
self.log.info('Enqueued message: %s', message)
def handle_write(self):
if not self.outbox:
return
message = self.outbox.popleft()
if len(message) > MAX_MESSAGE_LENGTH:
raise ValueError('Message too long')
self.send(message)
def handle_read(self):
message = self.recv(MAX_MESSAGE_LENGTH)
self.log.info('Received message: %s', message)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
logging.info('Creating host')
host = Host()
logging.info('Creating clients')
alice = Client(host.getsockname(), 'Alice')
bob = Client(host.getsockname(), 'Bob')
alice.say('Hello, everybody!')
logging.info('Looping')
asyncore.loop()
这会导致以下输出:
INFO:root:Creating host
INFO:root:Creating clients
INFO:Client ( Alice):Connecting to host at ('127.0.0.1', 51117)
INFO:Client ( Bob):Connecting to host at ('127.0.0.1', 51117)
INFO:Client ( Alice):Enqueued message: Hello, everybody!
INFO:root:Looping
INFO:Host:Accepted client at ('127.0.0.1', 55628)
INFO:Host:Accepted client at ('127.0.0.1', 55629)
INFO:Host:Broadcasting message: Hello, everybody!
INFO:Client ( Alice):Received message: Hello, everybody!
INFO:Client ( Bob):Received message: Hello, everybody!
【讨论】:
谢谢,看起来像我要找的东西!不幸的是,我没有设法让它在 main 之外工作:我在 Host.__init__() 的末尾添加了 asyncore.loop(),并且我的主机对象接受客户端连接,但它没有对发送的消息没有反应... @Alex: asyncore.loop() 永远运行!通过调用它实际上是在说,“我已经完成了对程序的控制,将事情交给异步循环,以便它可以在其余时间处理发送/接收。”请注意我在调用 asyncore.loop() 之前如何设置所有内容。你想通过移动它来做什么? 好吧,我不想只运行一些明确的连接,而是让主机运行,并且客户端能够随时连接/发送消息。感谢您的宝贵时间! @Alex:如果您运行多个 Python 解释器实例,您可以轻松地做到这一点。 Main 仍将运行asyncore.loop()
(这就是调度程序真正做 的事情),但您可以将程序分解为 host.py 和 client.py,在其中将地址传递给客户端主机通过sys.argv
。
一个更完整的解决方案可能会尝试确保始终发送和接收完整的消息,而不是强制执行较小的消息大小并希望消息适合网络缓冲区等。【参考方案2】:
您可以使用socketserver
向所有连接的客户端广播消息。但是,该功能并未内置在代码中,需要通过扩展一些已经提供的类来实现。在以下示例中,这是使用 ThreadingTCPServer
和 StreamRequestHandler
类实现的。它们提供了构建的基础,但仍需要进行一些修改以允许您尝试完成。文档应该有助于解释每个函数、类和方法为了完成工作而尝试做什么。
服务器
#! /usr/bin/env python3
import argparse
import pickle
import queue
import select
import socket
import socketserver
def main():
"""Start a chat server and serve clients forever."""
parser = argparse.ArgumentParser(description='Execute a chat server demo.')
parser.add_argument('port', type=int, help='location where server listens')
arguments = parser.parse_args()
server_address = socket.gethostbyname(socket.gethostname()), arguments.port
server = CustomServer(server_address, CustomHandler)
server.serve_forever()
class CustomServer(socketserver.ThreadingTCPServer):
"""Provide server support for the management of connected clients."""
def __init__(self, server_address, request_handler_class):
"""Initialize the server and keep a set of registered clients."""
super().__init__(server_address, request_handler_class, True)
self.clients = set()
def add_client(self, client):
"""Register a client with the internal store of clients."""
self.clients.add(client)
def broadcast(self, source, data):
"""Resend data to all clients except for the data's source."""
for client in tuple(self.clients):
if client is not source:
client.schedule((source.name, data))
def remove_client(self, client):
"""Take a client off the register to disable broadcasts to it."""
self.clients.remove(client)
class CustomHandler(socketserver.StreamRequestHandler):
"""Allow forwarding of data to all other registered clients."""
def __init__(self, request, client_address, server):
"""Initialize the handler with a store for future date streams."""
self.buffer = queue.Queue()
super().__init__(request, client_address, server)
def setup(self):
"""Register self with the clients the server has available."""
super().setup()
self.server.add_client(self)
def handle(self):
"""Run a continuous message pump to broadcast all client data."""
try:
while True:
self.empty_buffers()
except (ConnectionResetError, EOFError):
pass
def empty_buffers(self):
"""Transfer data to other clients and write out all waiting data."""
if self.readable:
self.server.broadcast(self, pickle.load(self.rfile))
while not self.buffer.empty():
pickle.dump(self.buffer.get_nowait(), self.wfile)
@property
def readable(self):
"""Check if the client's connection can be read without blocking."""
return self.connection in select.select(
(self.connection,), (), (), 0.1)[0]
@property
def name(self):
"""Get the client's address to which the server is connected."""
return self.connection.getpeername()
def schedule(self, data):
"""Arrange for a data packet to be transmitted to the client."""
self.buffer.put_nowait(data)
def finish(self):
"""Remove the client's registration from the server before closing."""
self.server.remove_client(self)
super().finish()
if __name__ == '__main__':
main()
当然,您还需要一个可以与您的服务器通信并使用服务器所使用的相同协议的客户端。由于这是 Python,因此决定使用pickle
模块来促进服务器和客户端之间的数据传输。可以使用其他数据传输方法(例如 JSON、XML 等),但能够对数据进行腌制和解封足以很好地满足该程序的需求。文档再次包含在内,因此弄清楚发生了什么应该不会太难。请注意,服务器命令可能会中断用户数据输入。
客户
#! /usr/bin/env python3
import argparse
import cmd
import pickle
import socket
import threading
def main():
"""Connect a chat client to a server and process incoming commands."""
parser = argparse.ArgumentParser(description='Execute a chat client demo.')
parser.add_argument('host', type=str, help='name of server on the network')
parser.add_argument('port', type=int, help='location where server listens')
arguments = parser.parse_args()
client = User(socket.create_connection((arguments.host, arguments.port)))
client.start()
class User(cmd.Cmd, threading.Thread):
"""Provide a command interface for internal and external instructions."""
prompt = '>>> '
def __init__(self, connection):
"""Initialize the user interface for communicating with the server."""
cmd.Cmd.__init__(self)
threading.Thread.__init__(self)
self.connection = connection
self.reader = connection.makefile('rb', -1)
self.writer = connection.makefile('wb', 0)
self.handlers = dict(print=print, ping=self.ping)
def start(self):
"""Begin execution of processor thread and user command loop."""
super().start()
super().cmdloop()
self.cleanup()
def cleanup(self):
"""Close the connection and wait for the thread to terminate."""
self.writer.flush()
self.connection.shutdown(socket.SHUT_RDWR)
self.connection.close()
self.join()
def run(self):
"""Execute an automated message pump for client communications."""
try:
while True:
self.handle_server_command()
except (BrokenPipeError, ConnectionResetError):
pass
def handle_server_command(self):
"""Get an instruction from the server and execute it."""
source, (function, args, kwargs) = pickle.load(self.reader)
print('Host: Port: '.format(*source))
self.handlers[function](*args, **kwargs)
def preloop(self):
"""Announce to other clients that we are connecting."""
self.call('print', socket.gethostname(), 'just entered.')
def call(self, function, *args, **kwargs):
"""Arrange for a handler to be executed on all other clients."""
assert function in self.handlers, 'You must create a handler first!'
pickle.dump((function, args, kwargs), self.writer)
def do_say(self, arg):
"""Causes a message to appear to all other clients."""
self.call('print', arg)
def do_ping(self, arg):
"""Ask all clients to report their presence here."""
self.call('ping')
def ping(self):
"""Broadcast to all other clients that we are present."""
self.call('print', socket.gethostname(), 'is here.')
def do_exit(self, arg):
"""Disconnect from the server and close the client."""
return True
def postloop(self):
"""Make an announcement to other clients that we are leaving."""
self.call('print', socket.gethostname(), 'just exited.')
if __name__ == '__main__':
main()
【讨论】:
【参考方案3】:为什么要使用 SocketServer?一个简单的客户端无法满足您的需求?
import socket
HOST = ''
PORT = 8000
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((HOST, PORT))
sock.listen(5)
while True:
conn, addr = sock.accept()
print 'connecting to', addr
while True:
data = conn.recv(1024)
if not data:
break
conn.send(data)
【讨论】:
这不会阻止其他客户端连接吗?【参考方案4】:要同时接收多个客户端,您必须添加SocketServer.ForkingMixIn
或ThreadingMixIn
。
【讨论】:
以上是关于Python SocketServer:发送到多个客户端?的主要内容,如果未能解决你的问题,请参考以下文章
python-41-初识hmac与socketserver模块
自动化运维Python系列之IO多路复用SocketServer源码分析