Python 10 协程,异步IO,Paramiko

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python 10 协程,异步IO,Paramiko相关的知识,希望对你有一定的参考价值。

本节内容

  Gevent协程

  异步IO

  Paramiko

 

携程

协程,又称为微线程,纤程(coroutine)。是一种用户态的轻量级线程。

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前的寄存器上下文和栈。因此,协程可以保存上一次调用时候的状态,每次过程重入时,就相当于进入上一次的状态,即上一次离开时所处在的逻辑流位置。

 

协程的好处:

  无需线程上下文切换的开销

  无需原子操作锁定及同步的开销

所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始运行,就一直运行到结束,中间不会有任何切换。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱或切割的。视作整体是原子性的核心。

  方便切换控制流,简化编程模型

  高并发+高扩展性+低成本:一个CPU支持上文的协程都不是问题。很适合用于做高并发处理。

缺点:

  无法利用多核资源:协程的本质是单线程,它不能同时将单个CPU的多个核用上,协程需要和进程配合才能运行在多CPU上,当然我们日常所编写的代码绝大部分都没有这个必要,除非是CPU密集型应用。

  进行阻塞操作:进行阻塞操作会阻塞掉整个程序

使用yield实现协程操作例子:

def consumer(name):
    print("开始吃包子。。。")
    while True:
        new_baozi = yield
        print("%s 正在吃包子| %s" % (name, new_baozi))

def producer():
    r1 = con1.__next__()
    r2 = con2.__next__()
    n = 0
    while n < 5:
        n += 1
        print("米其林大厨正在做包子", n)
        con1.send(n)
        con2.send(n)


if __name__ == "__main__":
    con1 = consumer("dandy")
    con2 = consumer("renee")
    p = producer()

楼上的列子,请问算不算协程?

我们先来给协程一个标准定义:

   必须在只有一个单线程里实现并发

     修改共享数据不需加锁

     用户程序里自己保存多个控制流的上下文栈

   一个协程遇到IO操作自动切换到其它协程

基于上面4点定义,我们刚刚yield实现的兵不能算是合格的线程,因为它有一点功能没有实现。

 

Greenlet

greenlet 是一个用C实现的协程模块,相比python自带的yield,它可以使你在任意函数之间随意切换,而不需要把这个函数先声明为generator

#!/user/bin/env python
# -*-coding: utf-8-*-
from greenlet import greenlet

def test():
    print(11)
    gr2.switch()
    print(55)
    gr2.switch()

def test_copy():
    print(22)
    gr1.switch()
    print(33)
gr1 = greenlet(test)
gr2 = greenlet(test_copy)
gr1.switch()

 感觉确实用着比generator还简单了呢,但是还没有解决一个问题,就是遇到IO操作自动切换。

 

Gevent

Gevent是一个第三方库,可以轻松通过gevent实现并发同步或者异步,在Gevent中用到的主要模式是Greenlet,它是以C扩展模块的形式介入python的轻量级协程。Greelet全部运行在主程序操作系统进程的内部,但它们被协作式调度。

import gevent

def func1():
    print("func1 beginning...")
    gevent.sleep(2)
    print("func1 again...")

def func2():
    print("func2 beginning...")
    gevent.sleep(1)
    print("func2 again")
def func3():
    print("func3 beginning")
    gevent.sleep(0)
    print("func3 again")

gevent.joinall([
    gevent.spawn(func1),
    gevent.spawn(func2),
    gevent.spawn(func3)
])

同步与异步的性能区别

下面我们用网络爬虫的一个小实例来讲一下同步与异步IO的性能区别

from urllib import request
import gevent,time
from gevent import monkey  # gevent不知道urllib进行IO操作所以需要monkey补丁。。
monkey.patch_all()  # 这句话的意思是把当前程序的所以IO单独做上标记

def f(url):
    print("GET: %s" % url)
    res = request.urlopen(url)
    data = res.read()
    print("%d bytes received from %s." % (len(data), url))

urls = ["https://www.python.org/",
        "https://www.yahoo.com/",
        "https://www.github.com/"]

time_start = time.time()
for url in urls:
    f(url)
print("同步cost:", time.time()-time_start)

async_time = time.time()
gevent.joinall([
    gevent.spawn(f, "https://www.python.org/"),
    gevent.spawn(f, "https://www.yahoo.com/"),
    gevent.spawn(f, "https://www.github.com/"),

])
print("异步cost:", time.time()-async_time)

真正执行一下,其实发现时间花费的差距还是很大的。这里爬网页的时候遇到IO就会自动切换任务。

附上一个正常的小程序实例:

import gevent
 
def task(pid):
    """
    Some non-deterministic task
    """
    gevent.sleep(0.5)
    print(‘Task %s done‘ % pid)
 
def synchronous():
    for i in range(1,10):
        task(i)
 
def asynchronous():
    threads = [gevent.spawn(task, i) for i in range(10)]
    gevent.joinall(threads)
 
print(‘Synchronous:‘)
synchronous()
 
print(‘Asynchronous:‘)
asynchronous()

现在我们通过代码实现gevent下的socket并发,摒弃之前的多线程实现并发,真正的用单线(协程)来实现并发。

server side

import socket
import time
import gevent
from gevent import socket,monkey
monkey.patch_all()

def server():
    s = socket.socket()
    s.bind(("localhost", 9999))
    s.listen(500)
    while True:
        con, addr = s.accept()
        gevent.spawn(handle_request, con)

def handle_request(con):
    try:
        while True:
            data = con.recv(1024)
            print("recv:", data)
            con.send(data)
            if not data:
                con.shutdown(socket.SHUT_WR)

    except Exception as e:
        print(e)
    finally:
        con.close()

if __name__ == "__main__":
    server()

Client side

import socket
host, port = "localhost", 9999
client = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
client.connect((host, port))
while True:
    msg = bytes(input(">>>>>:").strip().encode())
    client.send(msg)
    data = client.recv(1024)

    print("client recv:",data)

client.close()

玩个大的!100个并发:

import socket, threading


def sock_conn():
    client = socket.socket()

    client.connect(("localhost", 9999))
    count = 0
    while count < 1:
        client.send(("hello %s" % count).encode())

        data = client.recv(1024)
        print("%s recv from server:" % threading.get_ident(), data.decode())
        count += 1
    client.close()
for i in range(100):
    t = threading.Thread(target=sock_conn)
    t.start()

 

论事件驱动与异步IO

n通常,我们写服务器处理模型的程序时,有以下几种模型:

(1)每收到一个请求,创建一个新的进程,来处理该请求

(2)每收到一个请求,创建一个新的线程,来处理该请求

(3)每收到一个请求,放入一个事件列表,让主进程通过非阻塞I/O方式来处理请求

上面的几种方式,各有千秋

第一种方法,由于每有一个新的请求就要创建新的进行,开销太大,所以会嫉妒浪费服务器资源,导致性能降低.

第二种方法,由于要设计到线程的同步,有可能面临死锁的问题。

第三种,在写应用程序代码的时候,逻辑比前面两种要复杂。

综合各方面考虑,一般普遍认为第三种方式是大多数网络服务器采用的方式。

 

事件驱动模型

在UI编程中,常常需要对鼠标点击进行响应,首先如何获得鼠标点击呢?

方式一:创建一个线程,该线程一直循环检测是否有鼠标点击,那么这个方式有以下几个缺点

1.CPU资源浪费,可能鼠标点击的频率非常小,但是扫描线程还是会一直循环检测,这就会造成CPU的资源浪费,如果扫描鼠标点击的接口永远是阻塞的呢?

2.如果是阻塞的,又会出现程序整体卡住,不能做其他处理,比如我需要摁键盘输入,等等

3.如果一个循环需要扫描的设备非常多,这时候间隙又会有响应时间的问题,所以肯定不会是这个方法。

方式二:就是事件驱动模型

目前大部分的UI编程都是事件驱动模型,如很多UI平台都会提供onClick()事件,这个事件就是代表鼠标按下这个操作事件。事件驱动模型的大体思路如下:

1.有一个事件消息队列;

2.鼠标按下时,往这个队列增加一个点击事件(消息);

3.有个循环,不断从队列取出事件,根据不同的事件,调用不通的函数,如onClick,onKeyDown等等

4.事件消息一般都各自保存各自的处理函数指针,这样每个消息都有独立的处理函数

 技术分享

事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程范式是(单线程)同步以及多线程编程。

可能讲了这么多大家还是不理解这三者的区别,现在我画一张图给大家更好的对比并解释一下;下图展示了随着时间的推移,这三种模式下程序所做的工作。这个程序有3个任务需要完成,每个任务都在等待I/O操作时阻塞自身。阻塞在I/O操作上所花费的时间已经用灰色框标示出来了。

 

 

 

在单线程同步模型中,任务按照顺序执行。如果某个任务因为I/O而阻塞,其他所有的任务都必须等待,直到它完成之后它们才能依次执行。这种明确的执行顺序和串行化处理的行为是很容易推断得出的。如果任务之间并没有互相依赖的关系,但仍然需要互相等待的话这就使得程序不必要的降低了运行速度。

在多线程版本中,这3个任务分别在独立的线程中执行。这些线程由操作系统来管理,在多处理器系统上可以并行处理,或者在单处理器系统上交错执行。这使得当某个线程阻塞在某个资源的同时其他线程得以继续执行。与完成类似功能的同步程序相比,这种方式更有效率,但程序员必须写代码来保护共享资源,防止其被多个线程同时访问。多线程程序更加难以推断,因为这类程序不得不通过线程同步机制如锁、可重入函数、线程局部存储或者其他机制来处理线程安全问题,如果实现不当就会导致出现微妙且令人痛不欲生的bug。

先解释一下上面的模型,我们为什么一直要规避IO,因为IO操作是由操作系统完成的而不是程序本身占用的资源。程序遇到IO都会要丢给操作系统文件调度接口。在执行IO的时候,我们传给操作系统会注册一个IO事件并绑定一个回调函数,只要操作系统IO一处理完成,就通知主程序。

当我们面对如下的环境时,事件驱动模型通常是一个好的选择:

  1. 程序中有许多任务,而且…
  2. 任务之间高度独立(因此它们不需要互相通信,或者等待彼此)而且…
  3. 在等待事件到来时,某些任务会阻塞。

当应用程序需要在任务间共享可变的数据时,这也是一个不错的选择,因为这里不需要采用同步处理。

网络应用程序通常都有上述这些特点,这使得它们能够很好的契合事件驱动编程模型。

此处要提出一个问题,就是,上面的事件驱动模型中,只要一遇到IO就注册一个事件,然后主程序就可以继续干其它的事情了,只到IO处理完毕后,继续恢复之前中断的任务,这本质上是怎么实现的呢?哈哈,下面我们就来一起揭开这神秘的面纱。。。。

 

Select\\Poll\\Epoll异步IO 

同步IO和异步IO,阻塞IO和非阻塞IO分别是什么,到底有什么区别?不同的人在不同的上下文下给出的答案是不同的。所以先限定一下本文的上下文。

本文讨论的背景是Linux环境下的network IO。

首先需要解释几个概念:

用户空间与内核空间

现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。

 

进程切换

为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复以前挂起的某个进程的执行。这种行为被称为进程切换。因此可以说,任何进程都是在操作系统内核的支持下运行的,是与内核紧密相关的。

从一个进程的运行转到另一个进程上运行,这个过程中经过下面这些变化: 1. 保存处理机上下文,包括程序计数器和其他寄存器。 2. 更新PCB信息。

3. 把进程的PCB移入相应的队列,如就绪、在某事件阻塞等队列。 4. 选择另一个进程执行,并更新其PCB。 5. 更新内存管理的数据结构。 6. 恢复处理机上下文。

总而言之就是很耗资源,具体的可以参考这篇文章:进程切换

注:进程控制块(Processing Control Block),是操作系统核心中一种数据结构,主要表示进程状态。其作用是使一个在多道程序环境下不能独立运行的程序(含数据),成为一个能独立运行的基本单位或与其它进程并发执行的进程。或者说,OS是根据PCB来对并发执行的进程进行控制和管理的。 PCB通常是系统内存占用区中的一个连续存区,它存放着操作系统用于描述进程情况及控制进程运行所需的全部信息 

进程的阻塞

正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新工作做等,则由系统自动执行阻塞原语(Block),使自己由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行态的进程(获得CPU),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的

文件描述符fd

文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。

文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。

缓存 I/O

缓存 I/O 又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。

缓存 I/O 的缺点: 数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作,这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。

 

 IO模式

刚刚说了,对于一次IO访问(以read举例),数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说,当一个read操作发生时,它会经历两个阶段: 1. 等待数据准备 (Waiting for the data to be ready) 2. 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)

正式因为这两个阶段,linux系统产生了下面五种网络模式的方案。 - 阻塞 I/O(blocking IO) - 非阻塞 I/O(nonblocking IO) - I/O 多路复用( IO multiplexing) - 信号驱动 I/O( signal driven IO) - 异步 I/O(asynchronous IO)

注:由于signal driven IO在实际中并不常用,所以我这只提及剩下的四种IO Model。

 

阻塞I/O(blocking IO)

在linux中,默认情况下所以的socket都是blocking,一个典型的读操作流程大概是这样:

技术分享

当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据(对于网络IO来说,很多时候数据在一开始还没有到达。比如,还没有收到一个完整的UDP包。这个时候kernel就要等待足够的数据到来)。这个过程需要等待,也就是说数据被拷贝到操作系统内核的缓冲区中是需要一个过程的。而在用户进程这边,整个进程会被阻塞(当然,是进程自己选择的阻塞)。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。

所以,blocking IO的特点就是在IO执行的两个阶段都被block了。

 

 

非阻塞 I/O(nonblocking IO)

linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:

linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子: 技术分享

当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。

所以,nonblocking IO的特点是用户进程需要不断的主动询问kernel数据好了没有。

 

I/O 多路复用( IO multiplexing)

IO multiplexing就是我们说的select,poll,epoll,有些地方也称这种IO方式为event driven IO。select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select,poll,epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。

技术分享

当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。

 

所以,I/O 多路复用的特点是通过一种机制一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就可以返回。

 

这个图和blocking IO的图其实并没有太大的不同,事实上,还更差一些。因为这里需要使用两个system call (select 和 recvfrom),而blocking IO只调用了一个system call (recvfrom)。但是,用select的优势在于它可以同时处理多个connection。

所以,如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)

在IO multiplexing Model中,实际中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

 

异步 I/O(asynchronous IO)

inux下的asynchronous IO其实用得很少。先看一下它的流程:

 技术分享

用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

 

总结

blocking和non-blocking的区别

调用blocking IO会一直block住对应的进程直到操作完成,而non-blocking IO在kernel还准备数据的情况下会立刻返回。

synchronous IO和asynchronous IO的区别

在说明synchronous IO和asynchronous IO的区别之前,需要先给出两者的定义。POSIX的定义是这样子的:
- A synchronous I/O operation causes the requesting process to be blocked until that I/O operation completes;
- An asynchronous I/O operation does not cause the requesting process to be blocked;

两者的区别就在于synchronous IO做”IO operation”的时候会将process阻塞。按照这个定义,之前所述的blocking IO,non-blocking IO,IO multiplexing都属于synchronous IO。

有人会说,non-blocking IO并没有被block啊。这里有个非常“狡猾”的地方,定义中所指的”IO operation”是指真实的IO操作,就是例子中的recvfrom这个system call。non-blocking IO在执行recvfrom这个system call的时候,如果kernel的数据没有准备好,这时候不会block进程。但是,当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,在这段时间内,进程是被block的。

而asynchronous IO则不一样,当进程发起IO 操作之后,就直接返回再也不理睬了,直到kernel发送一个信号,告诉进程说IO完成。在这整个过程中,进程完全没有被block。

各个IO Model的比较如图所示:

 技术分享

通过上面的图片,可以发现non-blocking IO和asynchronous IO的区别还是很明显的。在non-blocking IO中,虽然进程大部分时间都不会被block,但是它仍然要求进程去主动的check,并且当数据准备完成以后,也需要进程主动的再次调用recvfrom来将数据拷贝到用户内存。而asynchronous IO则完全不同。它就像是用户进程将整个IO操作交给了他人(kernel)完成,然后他人做完后发信号通知。在此期间,用户进程不需要去检查IO操作的状态,也不需要主动的去拷贝数据。

 

下面我们选用select做一下详解;

首先列一下,sellect、poll、epoll三者的区别 

select 

select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。

select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一。

select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。

另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。

poll 
poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。

poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。

epoll 
直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。

epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。

epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。

另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

 

Python select 

Python的select()方法直接调用操作系统的IO接口,它监控sockets,open files, and pipes(所有带fileno()方法的文件句柄)何时变成readable 和writeable, 或者通信错误,select()使得同时监控多个连接变的简单,并且这比写一个长循环来等待和监控多客户端连接要高效,因为select直接通过操作系统提供的C的网络接口进行操作,而不是通过Python的解释器。

注意:Using Python’s file objects with select() works for Unix, but is not supported under Windows.

接下来通过echo server例子要以了解select 是如何通过单进程实现同时处理多个非阻塞的socket连接的

 

import select
import socket
import sys
import Queue

# Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)
 
# Bind the socket to the port
server_address = (‘localhost‘, 10000)
print(sys.stderr, ‘starting up on %s port %s‘ % server_address)
server.bind(server_address)
 
# Listen for incoming connections
server.listen(5)

select()方法接收并监控3个通信列表, 第一个是所有的输入的data,就是指外部发过来的数据,第2个是监控和接收所有要发出去的data(outgoing data),第3个监控错误信息,接下来我们需要创建2个列表来包含输入和输出信息来传给select().

 

# Sockets from which we expect to read
inputs = [ server ]

# Sockets to which we expect to write
outputs = [ ]

 

所有客户端的进来的连接和数据将会被server的主循环程序放在上面的list中处理,我们现在的server端需要等待连接可写(writable)之后才能过来,然后接收数据并返回(因此不是在接收到数据之后就立刻返回),因为每个连接要把输入或输出的数据先缓存到queue里,然后再由select取出来再发出去。

Connections are added to and removed from these lists by the server main loop. Since this version of the server is going to wait for a socket to become writable before sending any data (instead of immediately sending the reply), each output connection needs a queue to act as a buffer for the data to be sent through it.

# Outgoing message queues (socket:Queue)
message_queues = {}

The main portion of the server program loops, calling select() to block and wait for network activity.

下面是此程序的主循环,调用select()时会阻塞和等待直到新的连接和数据进来

while inputs:

    # Wait for at least one of the sockets to be ready for processing
    print >>sys.stderr, \\nwaiting for the next event‘
    readable, writable, exceptional = select.select(inputs, outputs, inputs)

 当你把inputs,outputs,exceptional(这里跟inputs共用)传给select()后,它返回3个新的list,我们上面将他们分别赋值为readable,writable,exceptional, 所有在readable list中的socket连接代表有数据可接收(recv),所有在writable list中的存放着你可以对其进行发送(send)操作的socket连接,当连接通信出现error时会把error写到exceptional列表中。

select() returns three new lists, containing subsets of the contents of the lists passed in. All of the sockets in the readable list have incoming data buffered and available to be read. All of the sockets in the writable list have free space in their buffer and can be written to. The sockets returned in exceptional have had an error (the actual definition of “exceptional condition” depends on the platform).

 

Readable list 中的socket 可以有3种可能状态,第一种是如果这个socket是main "server" socket,它负责监听客户端的连接,如果这个main server socket出现在readable里,那代表这是server端已经ready来接收一个新的连接进来了,为了让这个main server能同时处理多个连接,在下面的代码里,我们把这个main server的socket设置为非阻塞模式。

The “readable” sockets represent three possible cases. If the socket is the main “server” socket, the one being used to listen for connections, then the “readable” condition means it is ready to accept another incoming connection. In addition to adding the new connection to the list of inputs to monitor, this section sets the client socket to not block.

# Handle inputs
for s in readable:
 
    if s is server:
        # A "readable" server socket is ready to accept a connection
        connection, client_address = s.accept()
        print(sys.stderr, ‘new connection from‘, client_address)
        connection.setblocking(0)
        inputs.append(connection)
 
        # Give the connection a queue for data we want to send
        message_queues[connection] = Queue.Queue()

第二种情况是这个socket是已经建立了的连接,它把数据发了过来,这个时候你就可以通过recv()来接收它发过来的数据,然后把接收到的数据放到queue里,这样你就可以把接收到的数据再传回给客户端了。

The next case is an established connection with a client that has sent data. The data is read with recv(), then placed on the queue so it can be sent through the socket and back to the client.

else:
     data = s.recv(1024)
     if data:
         # A readable client socket has data
         print(sys.stderr, ‘received "%s" from %s‘ % (data, s.getpeername()))
         message_queues[s].put(data)
         # Add output channel for response
         if s not in outputs:
             outputs.append(s)

第三种情况就是这个客户端已经断开了,所以你再通过recv()接收到的数据就为空了,所以这个时候你就可以把这个跟客户端的连接关闭了。

A readable socket without data available is from a client that has disconnected, and the stream is ready to be closed.

else:
    # Interpret empty result as closed connection
    print(sys.stderr, ‘closing‘, client_address, ‘after reading no data‘)
    # Stop listening for input on the connection
    if s in outputs:
        outputs.remove(s)  #既然客户端都断开了,我就不用再给它返回数据了,所以这时候如果这个客户端的连接对象还在outputs列表中,就把它删掉
    inputs.remove(s)    #inputs中也删除掉
    s.close()           #把这个连接关闭掉
    # Remove message queue
    del message_queues[s]   

对于writable list中的socket,也有几种状态,如果这个客户端连接在跟它对应的queue里有数据,就把这个数据取出来再发回给这个客户端,否则就把这个连接从output list中移除,这样下一次循环select()调用时检测到outputs list中没有这个连接,那就会认为这个连接还处于非活动状态

There are fewer cases for the writable connections. If there is data in the queue for a connection, the next message is sent. Otherwise, the connection is removed from the list of output connections so that the next time through the loop select() does not indicate that the socket is ready to send data.

# Handle outputs
for s in writable:
    try:
        next_msg = message_queues[s].get_nowait()
    except Queue.Empty:
        # No messages waiting so stop checking for writability.
        print(sys.stderr, ‘output queue for‘, s.getpeername(), ‘is empty‘)
        outputs.remove(s)
    else:
        print(sys.stderr, ‘sending "%s" to %s‘ % (next_msg, s.getpeername()))
        s.send(next_msg)

最后,如果在跟某个socket连接通信过程中出了错误,就把这个连接对象在inputs\\outputs\\message_queue中都删除,再把连接关闭掉

# Handle "exceptional conditions"
for s in exceptional:
    print(sys.stderr, ‘handling exceptional condition for‘, s.getpeername())
    # Stop listening for input on the connection
    inputs.remove(s)
    if s in outputs:
        outputs.remove(s)
    s.close()
 
    # Remove message queue
    del message_queues[s]

客户端

下面的这个是客户端程序展示了如何通过select()对socket进行管理并与多个连接同时进行交互,

The example client program uses two sockets to demonstrate how the server with select() manages multiple connections at the same time. The client starts by connecting each TCP/IP socket to the server.

import socket
import sys
 
messages = [ ‘This is the message. ‘,
             ‘It will be sent ‘,
             ‘in parts.‘,
             ]
server_address = (‘localhost‘, 10000)
 
# Create a TCP/IP socket
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          socket.socket(socket.AF_INET, socket.SOCK_STREAM),
          ]
 
# Connect the socket to the port where the server is listening
print(sys.stderr, ‘connecting to %s port %s‘ % server_address)
for s in socks:
    s.connect(server_address)

接下来通过循环通过每个socket连接给server发送和接收数据。


Then it sends one pieces of the message at a time via each socket, and reads all responses available after writing new data.

for message in messages:
    # Send messages on both sockets
    for s in socks:
        print(sys.stderr, ‘%s: sending "%s"‘ % (s.getsockname(), message))
        s.send(message)
 
    # Read responses on both sockets
    for s in socks:
        data = s.recv(1024)
        print (sys.stderr, ‘%s: received "%s"‘ % (s.getsockname(), data))
        if not data:
            print(sys.stderr, ‘closing socket‘, s.getsockname())

附上参考ALEX老师的博客地址:http://www.cnblogs.com/alex3714/p/4372426.html 

http://www.cnblogs.com/alex3714/articles/5876749.html 

 

Paramiko

Python的paramiko模块是基于SSH用于连接远程服务器并执行相关操作

SSHclient

用于连接远程服务器并执行基本命令

基于用户名密码连接:

import paramiko
# 创建SSH对象
ssh = paramiko.SSHClient
# 允许连接不在know_hosts文件中的主机
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接服务器
ssh.connect(hostname="example",port=22, username="dandy", password="password.1")
# 执行命令
stdin, stdout, stderr = ssh.exec_command("df")
# 获取命令结果
result = stdout.read()
# 关闭连接
ssh.close()

transport:

import paramiko
transport = paramiko.Transport(("hostname", 22))
transport.connect(username="dandy", password="123")

ssh = paramiko.SSHClient()
ssh._transport = transport
stdin, stdout, stderr = ssh.exec_command()
print(stdout.read())
transport.close()

SFTP Client

用于连接远程服务器并执行上传下载

基于用户名密码的上传下载

import paramiko
transport = paramiko.Transport(("hostname", 22))
transport.connect(username="dandy", password="123")

sftp = paramiko.SFTPClient.from_transport(transport)

sftp.put(‘/tmp/location.py‘, ‘/tmp/test.py‘)
sftp.get(‘remove_path‘, ‘local_path‘)
transport.close()

摘录参考与武老师的博客:http://www.cnblogs.com/wupeiqi/articles/5095821.html

 





以上是关于Python 10 协程,异步IO,Paramiko的主要内容,如果未能解决你的问题,请参考以下文章

Python全栈开发-Day10-进程/协程/异步IO/IO多路复用

Python进阶协程和异步 IO

python协程和异步IO——IO多路复用

Python协程中的并行异步IO

Python协程异步IO

Python 协程/异步IO/SelectPollEpoll异步IO与事件驱动