Python eventlet

Posted 安然_随心

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python eventlet相关的知识,希望对你有一定的参考价值。

原文地址:
http://www.cnblogs.com/Security-Darren/p/4168233.html
http://www.2cto.com/net/201610/553556.html
http://blog.csdn.net/gaoxingnengjisuan/article/details/12913275

eventlet是python库函数,一个是处理和网络相关的,另一个可以通过协程实现并发。所谓并发,就是开启了多个greenthread(绿色线程),并且对这些greenthread进行管理,以实现非阻塞式的I/O。eventlet为了实现“绿色线程”,竟然对python的和网络相关的几个标准库函数进行了改写,并且可以以补丁(patch)的方式导入到程序中,因为python的库函数只支持普通的线程,而不支持协程,eventlet称之为“绿化”。

本文首先讲述GreenLet,GreenThread,然后说明eventlet中几个常用的类

GreenLet

greenlet是一个称为协程(coroutine)的东西,有下面几个特点。
1.每个协程都有自己的私有stack及局部变量;
2.同一时间内只有一个协程在运行,故无须对某些共享变量加锁;
3.协程之间的执行顺序,完成由程序来控制;
总之,协程就是运行在一个线程内的伪并发方式,最终只有一个协程在运行,然后程序来控制执行的顺序。可以看下面的例子来理解上面的意思。

import greenlet  

def test1(n):  
    print "test1:",n  
    gr2.switch(32)  
    print "test1: over"  

def test2(n):  
    print "test2:",n  
    gr1.switch(23)  
    print "test2: over"  

greenlet = greenlet.greenlet  
current = greenlet.getcurrent()  
gr1 = greenlet(test1,current)  
gr2 = greenlet(test2,current)  
gr1.switch(2)  

这段程序的执行结果如下:

test1: 2  
test2: 32  
test1: over  

整个程序的过程很直白,首先创建两个协程,创建的过程传入了要执行的函数和父greenlet(在前面给出的三个链接中有详细介绍),然后调用其中的一个协程的switch函数,并且传递参数进去,就开始执行test1,然后到了gr2.switch(32)语句,切换到test2函数来,最后又切换回去。最终test1运行结束,回到父greenlet中,执行结束。这个过程就是始终只有一个协程在运行,函数的执行流由程序自己来控制。这个过程在上面的链接中描述的更加具体。

参考资料 :
http://greenlet.readthedocs.org/en/latest/

GreenThread

那么在eventlet中对greenlet进行了简单的封装,就成了GreenThread,并且上面的程序还会引来一个问题,如果我们想要写一个协程,那到底该如何来控制函数的执行过程了,如果协程多了,控制岂不是很复杂了。带着这个问题来看eventlet的实现。

在eventLet中,GreenThread的调度是通过hub来实现的。hub是EventLet的时间循环,用来调度IO事件和GreenThread。
Hub有不同版本的实现,如epolls,poll,selects,pyevent版,不同版本的性能特性不同,(具体见http://eventlet.net/doc/hubs.html#eventlet.hubs.use_hub)。可使用eventlet.hubs.use_hub(hub=None)来配置使用的hub,其中,传入的参数为选用的hub的名字。

hub是线程局部类实例,即每个线程都有一个hub实现,用来管理该线程中的co-rountine的调度。你可以为每个线程选用合适的hub(这样很麻烦,通常是:你为主线程选用一个hub,其他线程使用默认的hub)

hub工作原理:hub自己有一个GreenLet,作为MainLoop.当某个运行中的coroutine需要IO操作时,则该co-routine在hub中注册一个监听器(hub知道何时该co-routine wake up)。如果有其他可运行的co-routine,则hub MainLoop 切换到这些co-routine(via get_hub().switch())。如果这些co-routime完成了或是也需要IO,则由切换到hub main Loop中。这样,保证每个co-rountine都能被调度。

hub MAINLOOP 在第一个IO操作时启动。这种lazy模式使得不需要显示的调用dispatch函数。

eventlet官方的文档(http://eventlet.net/doc/basic_usage.html)

GreenPool

该模块提供对 greenthread 池的支持。

  greenthread 池提供了一定数量的备用 greenthread ,有效限制了孵化 greenthread 过多导致的内存不足,当池子中没有足够的空闲 greenthread 时,孵化过程被暂停,只有当先前工作中的 greenthread 完成当前工作,才能为下一个任务做孵化准备。

class eventlet.greenpool.GreenPool(size=1000)
类主要方法:
  1. free()

  2. imap(function, *iterables)

  3. resize(new_size)

  4. running()

  5. spawn(function, *args, **kwargs)

  6. spawn_n(function, *args, **kwargs)

  7. starmap(function, iterable)

  8. waitall()

  9. waiting()
1. free() :返回当前对象中可用的greenthreads。

  如果为 0 或更少,那么 spawn() 和 spawn_n() 将会阻塞调用 greenthread 直到有新的可用的 greenthread 为止。

  至于为什么此处可能返回负值,请查看3. resize()
  
2. imap(function, *iterables) :效果等同于 itertools.imap() ,在并发和内存使用上等同于 starmap() 。

  例如,可以非常方便地对文件做一些操作:
  

def worker(line):
    return do_something(line)
pool = GreenPool()
for result in pool.imap(worker, open("filename", 'r')):
    print(result)
  1. resize(new_size) :改变当前允许同时工作的 greenthreads 最大数量

如果当前有多于 new_size 的 greenthreads 处于工作中,它们可以完成自己的执行,只不过此时不许任何的新 greenthreads 被分配。只有当足够数量的 greenthreads 完成自己的工作,然后工作中的 greenthreads 总数低于 new_size 时,新的 greenthreads 才能被分配。在此之前,free() 的返回值将会使负的。

  1. running() :返回当前池子中正在执行任务的 greenthreads 。
  2. spawn(function, *args, **kwargs) : 从当前的池子中孵化一个可用的greenthread,在这个 greenthread 中执行 function ,参数 *args, **kwargs 为传给 function 的参数。返回一个 GreenThread 对象,这个对象执行着 function ,可以通过该 GreenThread 对象获取 function 的返回值。
      如果当前池子中没有空余的 greenthread ,那么该方法阻塞直到有新的可用的 greenthreads 被释放。
      该函数可以重用, function 可以调用同一个 GreenPool 对象的 spawn 方法,不用担心死锁。

  3. spawn_n(function, *args, **kwargs) :创建一个 greenthread 来运行 function,效果等同于 spawn()。 只不过这个函数返回 None,即丢弃 function 的返回值。

  4. starmap(function, iterable) :等同于 itertools.starmap(),除了对于可迭代对象中的每一个元素,都会在一个 greenthread 里面执行 func 。 并发的上限由池子的容量限制。在实际的操作中, starmap() 消耗的内存与池子的容量成比例,从而格外适合遍历特别长的输入列表。

  5. waitall() ; 等待池子中的所有 greenthreads 完成工作。
  6. waiting() :返回当前等待孵化的 greenthreads 数。

GreenPile

class eventlet.greenpool.GreenPile(size_or_pool=1000)
在它内部维护了一个GreenPool对象和一个Queue对象。这个GreenPool对象可以是从外部传递进来的,也可以是在类内部创建的,GreenPool对象主要是用来创建绿色线程的,即在GreenPile内部调用了GreenPool.spawn()方法。而Queue对象则是用来保spawn()方法的返回值的,即Queue中保存的是GreenThread对象。并且它还实现了next()方法,也就意味着GreenPile对象具有了迭代器的性质。所以如果我们要对绿色线程的返回值进行操作的话,用这个类是再好不过的了。

Event

class eventlet.event.Event
Event:在同一个线程中的协程通信的机制(an arbitrary number of coroutines can wait for one event from another)。Event和Queue类型,可看成只能容纳一条消息的Queue,但是和Queue不同的是:
(1)调用send() 不会取消对当前greenthread的调度;
(2)对一个event实例,只能调用一次send函数。如果你需要发送两条消息,则需要两个event对象。
一个使用实例:

from eventlet import event
import eventlet
evt = event.Event()
def baz(b):
     evt.send(b + 1)
 _ = eventlet.spawn_n(baz, 3)
 evt.wait()  

函数说明:
1. ready() :判断一个Event对象有没有发出过事件,如果调用 wait() 会立即返回一个事件结果,那么此处就返回真值。该方法用来避免需要等待一段时间才会发生的事件。例如,你可以将一堆事件放到一个Python列表中,然后重复地遍历他们,这是就可以调用 ready() 直到其中的一个事件返回True,然后就可以立刻调用 wait() 来获取它了。
2. send(result=None, exc=None) :发送一个事件,可唤醒其他wait的协程。调用该方法的协程会立刻返回(不会阻塞)

>>> from eventlet import event
>>> import eventlet
>>> evt = event.Event()
>>> def waiter():
...     print('about to wait')
...     result = evt.wait()
...     print('waited for 0'.format(result))
>>> _ = eventlet.spawn(waiter)
>>> eventlet.sleep(0)
about to wait
>>> evt.send('a')
>>> eventlet.sleep(0)
waited for a

需要注意的是:不同对同一个event实例调用多次send函数,这会导致抛出异常

>>> evt.send('whoops')
Traceback (most recent call last):
...
AssertionError: Trying to re-send() an already-triggered event.
  1. send_exception(*args) :作用类似于 send() 方法,只不过向等待者发送的是一个异常。如果单个异常对象被传进来,它会在 wait() 方法被调用的时候重新抛出,生成一个新的堆栈轨迹。
>>> from eventlet import event
>>> evt = event.Event()
>>> evt.send_exception(RuntimeError())
>>> evt.wait()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "eventlet/event.py", line 120, in wait
    current.throw(*self._exc)
RuntimeError

 如果需要完整地保留堆栈轨迹,必须传入整个 sys.exc_info() 元组。
 

>>> import sys
>>> evt = event.Event()
>>> try:
...     raise RuntimeError()
... except RuntimeError:
...     evt.send_exception(*sys.exc_info())
...
>>> evt.wait()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "eventlet/event.py", line 120, in wait
    current.throw(*self._exc)
  File "<stdin>", line 2, in <module>
RuntimeError

 此时会在Event对象内部存储一个 traceback 对象,这可能会导致循环引用。详见 sys.exc_info() 的文档。

  1. wait() :等待直到另一个协程调用 send() 。返回其他协程传递给 send() 方法的值。
>>> from eventlet import event
>>> import eventlet
>>> evt = event.Event()
>>> def wait_on():
...    retval = evt.wait()
...    print("waited for 0".format(retval))
>>> _ = eventlet.spawn(wait_on)
>>> evt.send('result')
>>> eventlet.sleep(0)
waited for result

参考:http://eventlet.net/doc/modules/event.html
http://www.cnblogs.com/Security-Darren/p/4168116.html

举例

1、网络爬虫,用一个绿色线程池获取urls的信息。用到绿色线程池和imap()函数。

import eventlet
from eventlet.green import urllib2


urls = [
    "http://www.google.com/intl/en_ALL/images/logo.gif",
    "https://wiki.secondlife.com/w/images/secondlife.jpg",
    "http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif",
]


def fetch(url):
    return urllib2.urlopen(url).read()


pool = eventlet.GreenPool(200) #创建绿色线程池对象,可以指定数量

for body in pool.imap(fetch, urls): #协程根据指定要执行的函数依次执行获得url的信息
    print("got body", len(body))

2、socket服务器

import eventlet

def handle(fd):
    print "client connected"
    while True:   #典型的读取文件的操作
        # pass through every non-eof line  
        x = fd.readline()
        if not x: break
        fd.write(x)
        fd.flush()         #将文件内容刷新到硬盘中
        print "echoed", x,
    print "client disconnected"


print "server socket listening on port 8000"   #服务器监听程序,响应客户端的请求  
server = eventlet.listen(('127.0.0.1', 8000))  # (IP地址, 端口) 元祖的形式表示
pool = eventlet.GreenPool(200)    #绿色线程池,允许并行访问

while True:
        new_sock, address = server.accept() #阻塞,等待客户端连接   返回socket对象和客户端的  IP地址和端口号
        print "accepted", address
        pool.spawn_n(handle, new_sock.makefile('rw'))  #创建新的绿色线程然后去执行

3、使用GreenPile的例子

import eventlet
from eventlet.green import socket

def geturl(url):
    c = socket.socket()
    ip = socket.gethostbyname(url)
    c.connect((ip, 80))
    print '%s connected' % url
    c.sendall('GET /\\r\\n\\r\\n')
    return c.recv(1024)

urls = ['www.google.com', 'www.yandex.ru', 'www.python.org']
pile = eventlet.GreenPile()
for x in urls:
    pile.spawn(geturl, x)

# note that the pile acts as a collection of return values from the functions  
# if any exceptions are raised by the function they'll get raised here  
for url, result in zip(urls, pile):
    print '%s: %s' % (url, repr(result)[:50])

以上是关于Python eventlet的主要内容,如果未能解决你的问题,请参考以下文章

Python/Erlang:Twisted、Stackless、Greenlet、Eventlet、协程有啥区别?它们是不是类似于 Erlang 进程?

flask-SocketIO 和 eventlet 有问题

python3中的真值测试

openstack中eventlet使用

Python入门之真值表达式

python构建一个动态增长的真值表