Python eventlet
Posted 安然_随心
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python eventlet相关的知识,希望对你有一定的参考价值。
原文地址:
http://www.cnblogs.com/Security-Darren/p/4168233.html
http://www.2cto.com/net/201610/553556.html
http://blog.csdn.net/gaoxingnengjisuan/article/details/12913275
eventlet是python库函数,一个是处理和网络相关的,另一个可以通过协程实现并发。所谓并发,就是开启了多个greenthread(绿色线程),并且对这些greenthread进行管理,以实现非阻塞式的I/O。eventlet为了实现“绿色线程”,竟然对python的和网络相关的几个标准库函数进行了改写,并且可以以补丁(patch)的方式导入到程序中,因为python的库函数只支持普通的线程,而不支持协程,eventlet称之为“绿化”。
本文首先讲述GreenLet,GreenThread,然后说明eventlet中几个常用的类
GreenLet
greenlet是一个称为协程(coroutine)的东西,有下面几个特点。
1.每个协程都有自己的私有stack及局部变量;
2.同一时间内只有一个协程在运行,故无须对某些共享变量加锁;
3.协程之间的执行顺序,完成由程序来控制;
总之,协程就是运行在一个线程内的伪并发方式,最终只有一个协程在运行,然后程序来控制执行的顺序。可以看下面的例子来理解上面的意思。
import greenlet
def test1(n):
print "test1:",n
gr2.switch(32)
print "test1: over"
def test2(n):
print "test2:",n
gr1.switch(23)
print "test2: over"
greenlet = greenlet.greenlet
current = greenlet.getcurrent()
gr1 = greenlet(test1,current)
gr2 = greenlet(test2,current)
gr1.switch(2)
这段程序的执行结果如下:
test1: 2
test2: 32
test1: over
整个程序的过程很直白,首先创建两个协程,创建的过程传入了要执行的函数和父greenlet(在前面给出的三个链接中有详细介绍),然后调用其中的一个协程的switch函数,并且传递参数进去,就开始执行test1,然后到了gr2.switch(32)语句,切换到test2函数来,最后又切换回去。最终test1运行结束,回到父greenlet中,执行结束。这个过程就是始终只有一个协程在运行,函数的执行流由程序自己来控制。这个过程在上面的链接中描述的更加具体。
参考资料 :
http://greenlet.readthedocs.org/en/latest/
GreenThread
那么在eventlet中对greenlet进行了简单的封装,就成了GreenThread,并且上面的程序还会引来一个问题,如果我们想要写一个协程,那到底该如何来控制函数的执行过程了,如果协程多了,控制岂不是很复杂了。带着这个问题来看eventlet的实现。
在eventLet中,GreenThread的调度是通过hub来实现的。hub是EventLet的时间循环,用来调度IO事件和GreenThread。
Hub有不同版本的实现,如epolls,poll,selects,pyevent版,不同版本的性能特性不同,(具体见http://eventlet.net/doc/hubs.html#eventlet.hubs.use_hub)。可使用eventlet.hubs.use_hub(hub=None)来配置使用的hub,其中,传入的参数为选用的hub的名字。
hub是线程局部类实例,即每个线程都有一个hub实现,用来管理该线程中的co-rountine的调度。你可以为每个线程选用合适的hub(这样很麻烦,通常是:你为主线程选用一个hub,其他线程使用默认的hub)
hub工作原理:hub自己有一个GreenLet,作为MainLoop.当某个运行中的coroutine需要IO操作时,则该co-routine在hub中注册一个监听器(hub知道何时该co-routine wake up)。如果有其他可运行的co-routine,则hub MainLoop 切换到这些co-routine(via get_hub().switch())。如果这些co-routime完成了或是也需要IO,则由切换到hub main Loop中。这样,保证每个co-rountine都能被调度。
hub MAINLOOP 在第一个IO操作时启动。这种lazy模式使得不需要显示的调用dispatch函数。
eventlet官方的文档(http://eventlet.net/doc/basic_usage.html)
GreenPool
该模块提供对 greenthread 池的支持。
greenthread 池提供了一定数量的备用 greenthread ,有效限制了孵化 greenthread 过多导致的内存不足,当池子中没有足够的空闲 greenthread 时,孵化过程被暂停,只有当先前工作中的 greenthread 完成当前工作,才能为下一个任务做孵化准备。
class eventlet.greenpool.GreenPool(size=1000)
类主要方法:
1. free()
2. imap(function, *iterables)
3. resize(new_size)
4. running()
5. spawn(function, *args, **kwargs)
6. spawn_n(function, *args, **kwargs)
7. starmap(function, iterable)
8. waitall()
9. waiting()
1. free() :返回当前对象中可用的greenthreads。
如果为 0 或更少,那么 spawn() 和 spawn_n() 将会阻塞调用 greenthread 直到有新的可用的 greenthread 为止。
至于为什么此处可能返回负值,请查看3. resize()
2. imap(function, *iterables) :效果等同于 itertools.imap() ,在并发和内存使用上等同于 starmap() 。
例如,可以非常方便地对文件做一些操作:
def worker(line):
return do_something(line)
pool = GreenPool()
for result in pool.imap(worker, open("filename", 'r')):
print(result)
- resize(new_size) :改变当前允许同时工作的 greenthreads 最大数量
如果当前有多于 new_size 的 greenthreads 处于工作中,它们可以完成自己的执行,只不过此时不许任何的新 greenthreads 被分配。只有当足够数量的 greenthreads 完成自己的工作,然后工作中的 greenthreads 总数低于 new_size 时,新的 greenthreads 才能被分配。在此之前,free() 的返回值将会使负的。
- running() :返回当前池子中正在执行任务的 greenthreads 。
spawn(function, *args, **kwargs) : 从当前的池子中孵化一个可用的greenthread,在这个 greenthread 中执行 function ,参数 *args, **kwargs 为传给 function 的参数。返回一个 GreenThread 对象,这个对象执行着 function ,可以通过该 GreenThread 对象获取 function 的返回值。
如果当前池子中没有空余的 greenthread ,那么该方法阻塞直到有新的可用的 greenthreads 被释放。
该函数可以重用, function 可以调用同一个 GreenPool 对象的 spawn 方法,不用担心死锁。spawn_n(function, *args, **kwargs) :创建一个 greenthread 来运行 function,效果等同于 spawn()。 只不过这个函数返回 None,即丢弃 function 的返回值。
starmap(function, iterable) :等同于 itertools.starmap(),除了对于可迭代对象中的每一个元素,都会在一个 greenthread 里面执行 func 。 并发的上限由池子的容量限制。在实际的操作中, starmap() 消耗的内存与池子的容量成比例,从而格外适合遍历特别长的输入列表。
- waitall() ; 等待池子中的所有 greenthreads 完成工作。
- waiting() :返回当前等待孵化的 greenthreads 数。
GreenPile
class eventlet.greenpool.GreenPile(size_or_pool=1000)
在它内部维护了一个GreenPool对象和一个Queue对象。这个GreenPool对象可以是从外部传递进来的,也可以是在类内部创建的,GreenPool对象主要是用来创建绿色线程的,即在GreenPile内部调用了GreenPool.spawn()方法。而Queue对象则是用来保spawn()方法的返回值的,即Queue中保存的是GreenThread对象。并且它还实现了next()方法,也就意味着GreenPile对象具有了迭代器的性质。所以如果我们要对绿色线程的返回值进行操作的话,用这个类是再好不过的了。
Event
class eventlet.event.Event
Event:在同一个线程中的协程通信的机制(an arbitrary number of coroutines can wait for one event from another)。Event和Queue类型,可看成只能容纳一条消息的Queue,但是和Queue不同的是:
(1)调用send() 不会取消对当前greenthread的调度;
(2)对一个event实例,只能调用一次send函数。如果你需要发送两条消息,则需要两个event对象。
一个使用实例:
from eventlet import event
import eventlet
evt = event.Event()
def baz(b):
evt.send(b + 1)
_ = eventlet.spawn_n(baz, 3)
evt.wait()
函数说明:
1. ready() :判断一个Event对象有没有发出过事件,如果调用 wait() 会立即返回一个事件结果,那么此处就返回真值。该方法用来避免需要等待一段时间才会发生的事件。例如,你可以将一堆事件放到一个Python列表中,然后重复地遍历他们,这是就可以调用 ready() 直到其中的一个事件返回True,然后就可以立刻调用 wait() 来获取它了。
2. send(result=None, exc=None) :发送一个事件,可唤醒其他wait的协程。调用该方法的协程会立刻返回(不会阻塞)
>>> from eventlet import event
>>> import eventlet
>>> evt = event.Event()
>>> def waiter():
... print('about to wait')
... result = evt.wait()
... print('waited for 0'.format(result))
>>> _ = eventlet.spawn(waiter)
>>> eventlet.sleep(0)
about to wait
>>> evt.send('a')
>>> eventlet.sleep(0)
waited for a
需要注意的是:不同对同一个event实例调用多次send函数,这会导致抛出异常
>>> evt.send('whoops')
Traceback (most recent call last):
...
AssertionError: Trying to re-send() an already-triggered event.
- send_exception(*args) :作用类似于 send() 方法,只不过向等待者发送的是一个异常。如果单个异常对象被传进来,它会在 wait() 方法被调用的时候重新抛出,生成一个新的堆栈轨迹。
>>> from eventlet import event
>>> evt = event.Event()
>>> evt.send_exception(RuntimeError())
>>> evt.wait()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "eventlet/event.py", line 120, in wait
current.throw(*self._exc)
RuntimeError
如果需要完整地保留堆栈轨迹,必须传入整个 sys.exc_info() 元组。
>>> import sys
>>> evt = event.Event()
>>> try:
... raise RuntimeError()
... except RuntimeError:
... evt.send_exception(*sys.exc_info())
...
>>> evt.wait()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "eventlet/event.py", line 120, in wait
current.throw(*self._exc)
File "<stdin>", line 2, in <module>
RuntimeError
此时会在Event对象内部存储一个 traceback 对象,这可能会导致循环引用。详见 sys.exc_info() 的文档。
- wait() :等待直到另一个协程调用 send() 。返回其他协程传递给 send() 方法的值。
>>> from eventlet import event
>>> import eventlet
>>> evt = event.Event()
>>> def wait_on():
... retval = evt.wait()
... print("waited for 0".format(retval))
>>> _ = eventlet.spawn(wait_on)
>>> evt.send('result')
>>> eventlet.sleep(0)
waited for result
参考:http://eventlet.net/doc/modules/event.html
http://www.cnblogs.com/Security-Darren/p/4168116.html
举例
1、网络爬虫,用一个绿色线程池获取urls的信息。用到绿色线程池和imap()函数。
import eventlet
from eventlet.green import urllib2
urls = [
"http://www.google.com/intl/en_ALL/images/logo.gif",
"https://wiki.secondlife.com/w/images/secondlife.jpg",
"http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif",
]
def fetch(url):
return urllib2.urlopen(url).read()
pool = eventlet.GreenPool(200) #创建绿色线程池对象,可以指定数量
for body in pool.imap(fetch, urls): #协程根据指定要执行的函数依次执行获得url的信息
print("got body", len(body))
2、socket服务器
import eventlet
def handle(fd):
print "client connected"
while True: #典型的读取文件的操作
# pass through every non-eof line
x = fd.readline()
if not x: break
fd.write(x)
fd.flush() #将文件内容刷新到硬盘中
print "echoed", x,
print "client disconnected"
print "server socket listening on port 8000" #服务器监听程序,响应客户端的请求
server = eventlet.listen(('127.0.0.1', 8000)) # (IP地址, 端口) 元祖的形式表示
pool = eventlet.GreenPool(200) #绿色线程池,允许并行访问
while True:
new_sock, address = server.accept() #阻塞,等待客户端连接 返回socket对象和客户端的 IP地址和端口号
print "accepted", address
pool.spawn_n(handle, new_sock.makefile('rw')) #创建新的绿色线程然后去执行
3、使用GreenPile的例子
import eventlet
from eventlet.green import socket
def geturl(url):
c = socket.socket()
ip = socket.gethostbyname(url)
c.connect((ip, 80))
print '%s connected' % url
c.sendall('GET /\\r\\n\\r\\n')
return c.recv(1024)
urls = ['www.google.com', 'www.yandex.ru', 'www.python.org']
pile = eventlet.GreenPile()
for x in urls:
pile.spawn(geturl, x)
# note that the pile acts as a collection of return values from the functions
# if any exceptions are raised by the function they'll get raised here
for url, result in zip(urls, pile):
print '%s: %s' % (url, repr(result)[:50])
以上是关于Python eventlet的主要内容,如果未能解决你的问题,请参考以下文章
Python/Erlang:Twisted、Stackless、Greenlet、Eventlet、协程有啥区别?它们是不是类似于 Erlang 进程?