深入学习twisted(基础)二

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入学习twisted(基础)二相关的知识,希望对你有一定的参考价值。

上一篇对twisted大致做了介绍以及一些基本的概念。

 

这一篇从一个python epoll/soket聊天程序开始。

我们要实现的很简单,就是利用epoll实现一个服务端/客户端间的聊天,服务端和客户端可以接收消息的同时,可以利用raw_input,在命令行输入。

我们将socket fd设置为非阻塞的,但是我们知道标准的输入输出io都是阻塞的,我们就开辟一个线程专门处理输入,在主线程进行消息的收发。这样当我们没有输入的时候,raw_input就会阻塞,由我们的主线程进行监听和处理。代码如下:

server.py

  1 #!/usr/local/bin/python
  2 
  3 import socket
  4 import select
  5 import errno
  6 import os
  7 import sys
  8 import time
  9 import threading
 10 import traceback
 11 import signal
 12 
 13 # global varables define here
 14 #
 15 IS_RUNNING = 0
 16 SERVER_INPUT = ""
 17 CONNTCTION_FD = 0
 18 
 19 # class define here
 20 #
 21 class ServerInput(threading.Thread):
 22     ‘‘‘ thread to handle write event ‘‘‘
 23     def __init__(self):
 24         threading.Thread.__init__(self)
 25     def run(self):
 26         global SERVER_INPUT
 27         global CONNECTION_FD
 28         global IS_RUNNING
 29         IS_RUNNING = 1
 30         while IS_RUNNING:
 31             SERVER_INPUT = raw_input("(192.168.56.101): ")
 32             content = SERVER_INPUT
 33             if len(content) > 0 :
 34                 send_len = 0
 35                 while True:
 36                     try:
 37                         global connections
 38                         send_len += CONNECTION_FD.send(content)
 39                     except Exception, e:
 40                         print "error happen"
 41                         traceback.print_exc()
 42                         print e
 43                     else:
 44                         if(send_len == len(content)):
 45                             break
 46                         content = ""
 47                         SERVER_INPUT = ""
 48     def stop(self):
 49         global IS_RUNNING
 50         IS_RUNNING = 0
 51 
 52 # globla function here
 53 def signal_handle(signum, frame):
 54     print "detect a signal" + str(signum)
 55     global IS_RUNNING
 56     IS_RUNNING = 0 
 57     server_input_thread.stop()
 58 
 59 
 60 # main function
 61 #
 62 if __name__ == "__main__":
 63     server_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM,0)
 64     server_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 65     server_fd.bind(("0.0.0.0", 2003))
 66     server_fd.listen(10)
 67     epoll_fd = select.epoll()
 68     epoll_fd.register(server_fd.fileno(), select.EPOLLIN|select.EPOLLOUT)
 69     connections = {}
 70     addresses = {}
 71     datalist = {}
 72     client_addr = ""
 73     server_content = ""
 74 
 75     signal.signal(signal.SIGINT, signal_handle)
 76     # a new thread to handle output event
 77     #
 78     server_input_thread = ServerInput() 
 79     try:
 80         server_input_thread.start()
 81     except Exception, e:
 82         print e
 83     else:
 84         IS_RUNNING = 1
 85 
 86     # just use EPOLLIN to listen read event
 87     #
 88     while IS_RUNNING:
 89         epoll_list = epoll_fd.poll()
 90         for fd, events in epoll_list:
 91             if fd == server_fd.fileno():
 92                 conn, addr = server_fd.accept()
 93                 conn.setblocking(0)
 94                 epoll_fd.register(conn.fileno(), select.EPOLLIN | select.EPOLLET)
 95                 connections[conn.fileno()] = conn
 96                 addresses[conn.fileno()] = addr
 97                 client_addr = addr
 98                 print "    Welcome our new guest: " + str(client_addr)
 99                 CONNECTION_FD = conn
100             elif select.EPOLLIN & events:
101                 datas = ‘‘
102                 while True:
103                     try:
104                         data = connections[fd].recv(10)
105                         if not data and not datas:
106                             epoll_fd.unregister(fd)
107                             connections[fd].close()
108                             break
109                         else:
110                             datas += data
111                     except socket.error, msg:
112                         if msg.errno == errno.EAGAIN:
113                             datalist[fd] = datas
114                             break
115                         else:    
116                             print "connecting error"
117                             epoll_fd.unregister(fd)
118                             connections[fd].close()
119                             break
120                     print "\n" + str(client_addr) + " : " + datas
121             elif select.EPOLLHUP & events:
122                 print "unknown connect"
123                 epoll_fd.unregister(fd)
124                 connections[fd].close()
125             else:
126                 continue
127     else:
128         for fd in connections:
129             epoll_fd.unregister(fd)
130             connections[fd].close() # close all connection

client.py

 1 #!/usr/local/bin/python
 2 
 3 import socket
 4 import time
 5 import threading
 6 import signal
 7 
 8 # global variables define here
 9 #
10 INPUT_CONTENT = ""
11 IS_RUNNING = 0
12 
13 # class define here
14 #
15 class MyinputThread(threading.Thread):
16     ‘‘‘ a new thread to handle user input ‘‘‘
17     def __init__(self):
18         threading.Thread.__init__(self)
19     def run(self):
20         global INPUT_CONTENT
21         global IS_RUNNING
22         IS_RUNNING = 1
23         while IS_RUNNING:        
24             print "\n"
25             INPUT_CONTENT = raw_input("(192.168.56.102):")
26     def stop(self):
27         global IS_RUNNING
28         IS_RUNNING = 0
29 
30 # global function define here
31 #
32 def signal_handle(signum, frame):
33     global IS_RUNNING
34     IS_RUNNING = 0
35     myInputThread.stop()
36 
37 # main function
38 #
39 if __name__ == "__main__":
40     IS_RUNNING = 1
41     signal.signal(signal.SIGINT, signal_handle)
42 
43     connFd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
44     connFd.connect(("192.168.56.101", 2003))
45     connFd.settimeout(0.1)
46 
47     myInputThread = MyinputThread()
48     myInputThread.start()
49 
50     while IS_RUNNING:
51         client_content = INPUT_CONTENT
52         if len(client_content) > 0 and connFd.send(client_content) != len(client_content):
53             print "connection lost"
54             break
55         INPUT_CONTENT = ""
56         try:
57             readData = connFd.recv(1024)
58         except socket.timeout,e:
59             err = e.args[0]
60             if err == time out:
61                     print "time out"
62             readData = "time out"
63         else:
64             print "\n" + "(192.168.56.101): " + readData
65     connFd.close()

运行的结果如下:

server side:
1
(192.168.1.103): Welcome our new guest: (192.168.1.104, 35226) 2 3 (192.168.1.103): hello,thiss server 4 (192.168.1.103): 5 (192.168.1.104, 35226) : hello,im 6 7 (192.168.1.104, 35226) : hello,im client 8 9 (192.168.1.103): bye
client side:
1
(192.168.1.104): 2 (192.168.56.101): hello,thiss server 3 (192.168.1.104):hello,im client 4 (192.168.1.104): 5 (192.168.56.101): bye 6 (192.168.1.104):bye

 

以上代码仅仅作为例子,处理的也仅仅是one server,one client的情况。 python epoll怎么使用就不再赘述,通过这个例子我们看到一个典型的python epoll程序,server端需要创建socket,绑定,监听,注册epoll,轮询处理读写事件。客户端需要创建socket,连接,处理读写io。 twisted框架是基于select/epoll的,它也是同样的一套流程,被封装好,方便我们调用。我在后面的twisted源码中会涉及到具体的内容。

从上述代码还可以看到:

  • 如果我们要处理一些特殊的需求,比如我们要处理标准输入输出,我们还需要自己建立线程处理(当然还有别的办法),这种方式既原始又漏洞百出;
  • 上面的例子时在linux下运行的,如果换到windows下我们还要把epoll换成select,换到unix下我们还要用poll;
  • 上面得代码并不完美,你想要开始或者停止,仅仅只靠处理ctrl+c 信号中断简直是太简陋了;
  • 上面得代码并没有处理大规模并发,实际用它们会面临各种各样的问题。
  • 上面得代码并不灵活,我们有新的需求,比如新增一个协议,解析一种报文,我们只能缝缝补补。

twisted是跨平台的;如果我们要处理标准输入输出,其中有封装好的stdio.StandardIO可以很方便的使用,除了twisted已经封装好的tcp,udp,ssl,http,ftp等协议,我们可以定义自己的protocal,只需要简单的调用就可以实现。我们可以把服务端做成一个service在后台执行,等等。

下一篇详细分析一下select/epoll的源码,这是网络编程的基础,而且无论是面试问它们的区别还是学习,它们都很重要,相信看过源码分析后,所有的异同都会了如指掌。

以上是关于深入学习twisted(基础)二的主要内容,如果未能解决你的问题,请参考以下文章

Python爬虫编程思想(27):Twisted框架基础

爬虫日记(93):Twisted的设计模型

《深入浅出图神经网络》GNN原理解析☄学习笔记神经网络基础

深入浅出图神经网络|GNN原理解析☄学习笔记神经网络基础

scrapy爬虫框架

Scrapy基础