线程进程协程

Posted play wife VS paly games

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程进程协程相关的知识,希望对你有一定的参考价值。

线程:

线程可以理解成轻量的进程,实际在linux两者几乎没有区别,唯一的区别是线程并不产生新的地址空间和资源。

Threading用于提供线程的相关操作,线程是应用程序中工作的最小单元。

import threading 
import time 
   
def show(arg): 
    time.sleep(1) 
    print(\'thread\'+str(arg))
   
for i in range(10): 
    t = threading.Thread(target=show, args=(i,)) 
    t.start() 

上述代码创建了10个前台线程,然后控制器就交给了cpu,CPU根据内部算法进行调度,

更多的方法:

  • start 线程准备就绪,等待CPU调度
  • setName  为线程设置名字
  • getName  获取线程名称
  • setDaemon  设置后台线程或者前台线程(默认False) 

    如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止

    如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止

  •  join   逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
  • run  线程被cpu调度后自动执行线程对象的run方法

线程锁:

由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,CPU接着执行其他线程。所以,可能出现如下问题:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time

gl_num = 0

def show(arg):
    global gl_num
    time.sleep(1)
    gl_num +=1
    print gl_num

for i in range(10):
    t = threading.Thread(target=show, args=(i,))
    t.start()

print \'main thread stop\'
未使用锁
#!/usr/bin/env python 
#coding:utf-8 
    
import threading 
import time 
    
gl_num = 0
#生成锁  
lock = threading.RLock() 
    
def Func(): 
    lock.acquire() #给线程上锁
    global gl_num 
    gl_num +=1
    time.sleep(1) 
    print gl_num 
    lock.release() #释放线程锁
        
for i in range(10): #创建10个线程
    t = threading.Thread(target=Func) 
    t.start() 
加上线程锁

event

python线程的事件用于主线程控制其他线程的执行,线程主要提供了三个方法:set、wait、clear

事件处理的机制:全局定义了一个"flag",值为False,,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

  • clear:将"flag"设置为False
  • set:将"flag"设置为True
#!/usr/bin/env python 
# -*- coding:utf-8 -*-  
import threading 
  
def do(event): 
    print \'start\'
    event.wait() 
    print \'execute\'
  
event_obj = threading.Event() 
for i in range(10): 
    t = threading.Thread(target=do, args=(event_obj,)) 
    t.start() 
  
event_obj.clear() 
inp = input(\'input:\') 
if inp == \'true\': 
    event_obj.set() 

使用线程队列

当多个线程需要共享数据的或者资源的时候,可能会使线程的使用变得复杂,线程的模块提供了许多同步原语,包括信号量、条件变量、事件和锁。当这些选项存在时,最佳实践是转而关注于使用队列,相比较而言,队列更容易处理,并且更容易处理,并且可以使线程编程更加安全,因为他们能够有效地传递单个线程资源的所有访问,并支持更加清晰的、可读性更高的设计模式。

url获取序列:

import urllib2
import time
        
hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com",
        "http://ibm.com", "http://apple.com"]
        
start = time.time()
        #grabs urls of hosts and prints first 1024 bytes of page
for host in hosts:
     url = urllib2.urlopen(host)
     print url.read(1024)
        
print("Elapsed Time: %s" % (time.time() - start))
 

简单版本的线程池:

#!/usr/bin/env python 
# -*- coding:utf-8 -*- 
import Queue 
import threading 
  
  
class ThreadPool(object): 
  
    def __init__(self, max_num=20): 
        self.queue = queue.Queue(max_num) 
        for i in xrange(max_num): 
            self.queue.put(threading.Thread) 
  
    def get_thread(self): 
        return self.queue.get() 
  
    def add_thread(self): 
        self.queue.put(threading.Thread) 
  
pool = ThreadPool(10) 
  
def func(arg, p): 
    print arg 
    import time 
    time.sleep(2) 
    p.add_thread() 
  
  
for i in xrange(30): 
    thread = pool.get_thread() 
    t = thread(target=func, args=(i, pool)) 
    t.start() 
简单版的线程池
import queue 
import threading 
import contextlib 
  
StopEvent = object() 
  
class ThreadPool(object): 
  
    def __init__(self, max_num): 
        self.q = queue.Queue(max_num) 
        self.max_num = max_num 
        self.cancel = False
        self.generate_list = [] 
        self.free_list = [] 
  
    def run(self, func, args, callback=None): 
        """ 
        线程池执行一个任务 
        :param func: 任务函数 
        :param args: 任务函数所需参数 
        :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数) 
        :return: 如果线程池已经终止,则返回True否则None 
        """
        if self.cancel: 
            return True
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num: 
            self.generate_thread() 
        w = (func, args, callback,) 
        self.q.put(w) 
  
    def generate_thread(self): 
        """ 
        创建一个线程 
        """
        t = threading.Thread(target=self.call) 
        t.start() 
  
    def call(self): 
        """ 
        循环去获取任务函数并执行任务函数 
        """
        current_thread = threading.currentThread 
        self.generate_list.append(current_thread) 
  
        event = self.q.get() 
        while event != StopEvent: 
            func, arguments, callback = event 
            try: 
                result = func(*arguments) 
                success = True
            except Exception, e: 
                success = False
                result = None
  
            if callback is not None: 
                try: 
                    callback(success, result) 
                except Exception, e: 
                    pass
            with self.worker_state(self.free_list, current_thread): 
                event = self.q.get() 
        else: 
            self.generate_list.remove(current_thread) 
  
    def terminal(self): 
        """ 
        终止线程池中的所有线程 
        """
        self.cancel = True
        full_size = len(self.generate_list) 
        while full_size: 
            self.q.put(StopEvent) 
            full_size -= 1
  
    @contextlib.contextmanager 
    def worker_state(self, state_list, worker_thread): 
        """ 
        用于记录线程中正在等待的线程数 
        """
        state_list.append(worker_thread) 
        try: 
            yield
        finally: 
            state_list.remove(worker_thread) 
进阶版的线程池
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import queue
import threading
import contextlib
import time

StopEvent = object()


class ThreadPool(object):

    def __init__(self, max_num):
        self.q = queue.Queue()
        self.max_num = max_num

        self.terminal = False
        self.generate_list = []
        self.free_list = []

    def run(self, func, args, callback=None):
        """
        线程池执行一个任务
        :param func: 任务函数
        :param args: 任务函数所需参数
        :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
        :return: 如果线程池已经终止,则返回True否则None
        """

        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        w = (func, args, callback,)
        self.q.put(w)

    def generate_thread(self):
        """
        创建一个线程
        """
        t = threading.Thread(target=self.call)
        t.start()

    def call(self):
        """
        循环去获取任务函数并执行任务函数
        """
        current_thread = threading.currentThread
        self.generate_list.append(current_thread)

        event = self.q.get()
        while event != StopEvent:

            func, arguments, callback = event
            try:
                result = func(*arguments)
                success = True
            except Exception as e:
                success = False
                result = None

            if callback is not None:
                try:
                    callback(success, result)
                except Exception as e:
                    pass

            with self.worker_state(self.free_list, current_thread):
                if self.terminal:
                    event = StopEvent
                else:
                    event = self.q.get()
        else:

            self.generate_list.remove(current_thread)

    def close(self):
        """
        执行完所有的任务后,所有线程停止
        """
        full_size = len(self.generate_list)
        while full_size:
            self.q.put(StopEvent)
            full_size -= 1

    def terminate(self):
        """
        无论是否还有任务,终止线程
        """
        self.terminal = True

        while self.generate_list:
            self.q.put(StopEvent)

        self.q.empty()

    @contextlib.contextmanager
    def worker_state(self, state_list, worker_thread):
        """
        用于记录线程中正在等待的线程数
        """
        state_list.append(worker_thread)
        try:
            yield
        finally:
            state_list.remove(worker_thread)

# How to use

pool = ThreadPool(5)

def callback(status, result):
    # status, execute action status
    # result, execute action return value
    pass

def action(i):
    time.sleep(1)
    print(i)

for i in range(30):
    ret = pool.run(action, (i,), callback)

# pool.close()
# pool.terminate()
线程池更新一
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import queue
import threading
import contextlib
import time

StopEvent = object()

class ThreadPool(object):
    def __init__(self, max_num, max_task_num = None):
        if max_task_num:
            self.q = queue.Queue(max_task_num)
        else:
            self.q = queue.Queue()
        self.max_num = max_num
        self.cancel = False
        self.terminal = False
        self.generate_list = []
        self.free_list = []

    def run(self, func, args, callback=None):
        """
        线程池执行一个任务
        :param func: 任务函数
        :param args: 任务函数所需参数
        :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
        :return: 如果线程池已经终止,则返回True否则None
        """
        if self.cancel:
            return
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        w = (func, args, callback,)
        self.q.put(w)

    def generate_thread(self):
        """
        创建一个线程
        """
        t = threading.Thread(target=self.call)
        t.start()

    def call(self):
        """
        循环去获取任务函数并执行任务函数
        """
        current_thread = threading.currentThread
        self.generate_list.append(current_thread)

        event = self.q.get()
        while event != StopEvent:

            func, arguments, callback = event
            try:
                result = func(*arguments)
                success = True
            except Exception as e:
                success = False
                result = None

            if callback is not None:
                try:
                    callback(success, result)
                except Exception as e:
                    pass

            with self.worker_state(self.free_list, current_thread):
                if self.terminal:
                    event = StopEvent
                else:
                    event = self.q.get()
        else:

            self.generate_list.remove(current_thread)

    def close(self):
        """
        执行完所有的任务后,所有线程停止
        """
        self.cancel = True
        full_size = len(self.generate_list)
        while full_size:
            self.q.put(StopEvent)
            full_size -= 1

    def terminate(self):
        """
        无论是否还有任务,终止线程
        """
        self.terminal = True

        while self.generate_list:
            self.q.put(StopEvent)

        self.q.empty()

    @contextlib.contextmanager
    def worker_state(self, state_list, worker_thread):
        """
        用于记录线程中正在等待的线程数
        """
        state_list.append(worker_thread)
        try:
            yield
        finally:
            state_list.remove(worker_thread)

# How to use

pool = ThreadPool(5)

def callback(status, result):
    # status, execute action status
    # result, execute action return value
    pass

def action(i):
    print(i)

for i in range(30):
    ret = pool.run(action, (i,), callback)

time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
# pool.close()
# pool.terminate()
线程池更新二

queue模块:

queue就是对队列,它是线程安全的

举例来说,我们去肯德基吃饭。厨房是给我们做饭的地方,前台负责把厨房做好的饭卖给顾客,顾客则去前台领取做好的饭。这里的前台就相当于我们的队列。

这个模型也叫生产者-消费者模型

import queue

q = queue.Queue(maxsize=0)  # 构造一个先进显出队列,maxsize指定队列长度,为0 时,表示队列长度无限制。

q.join()    # 等到队列为kong的时候,在执行别的操作
q.qsize()   # 返回队列的大小 (不可靠)
q.empty()   # 当队列为空的时候,返回True 否则返回False (不可靠)
q.full()    # 当队列满的时候,返回True,否则返回False (不可靠)
q.put(item, block=True, timeout=None) #  将item放入Queue尾部,item必须存在,可以参数block默认为True,表示当队列满时,会等待队列给出可用位置,                         为False时为非阻塞,此时如果队列已满,会引发queue.Full 异常。 可选参数timeout,表示 会阻塞设置的时间,过后,                          如果队列无法给出放入item的位置,则引发 queue.Full 异常q.get(block=True, timeout=None) #   移除并返回队列头部的一个值,可选参数block默认为True,表示获取值的时候,如果队列为空,则阻塞,为False时,不阻塞,                      若此时队列为空,则引发 queue.Empty异常。 可选参数timeout,表示会阻塞设置的时候,过后,如果队列为空,则引发Empty异常。q.put_nowait(item) #   等效于 put(item,block=False)q.get_nowait() #    等效于 get(item,block=False)

生产者--消费者:

#!/usr/bin/env python
import Queue
import threading


message = Queue.Queue(10)


def producer(i):
    while True:
        message.put(i)


def consumer(i):
    while True:
        msg = message.get()


for i in range(12):
    t = threading.Thread(target=producer, args=(i,))
    t.start()

for i in range(10):
    t = threading.Thread(target=consumer, args=(i,))
    t.start()

multiprocessing模块

multiprocessing是python的多进程管理包,和threading.Thread类似。直接从侧面用subprocesses替换线程使用GIL的方式,由于这一点,multiprocessing模块可以让程序员在给定的机器上充分的利用CPU。

在multiprocessing中,通过创建Process对象生成进程,然后调用它的start()方法,

from multiprocessing import Process
 
def f(name):
    print(\'hello\', name)
 
if __name__ == \'__main__\':
    p = Process(target=f, args=(\'bob\',))
    p.start()
    p.join()

 

进程:

进程间的数据共享

在使用并发设计的时候最好尽可能的避免共享数据,尤其是在使用多进程的时候。 如果你真有需要 要共享数据, multiprocessing提供了两种方式。

Shared memory

数据可以用Value或Array存储在一个共享内存地图里,如下:

from multiprocessing import Process, Value, Array
 
def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]
 if __name__ == \'__main__\':
    num = Value(\'d\', 0.0)
    arr = Array(\'i\', range(10))
 
    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()
 
    print(num.value)
    print(arr[:])

输出:

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

创建num和arr时,“d”和“i”参数由Array模块使用的typecodes创建:“d”表示一个双精度的浮点数,“i”表示一个有符号的整数,这些共享对象将被线程安全的处理。Array(‘i’, range(10))中的‘i’参数:

‘c’: ctypes.c_char     ‘u’: ctypes.c_wchar    ‘b’: ctypes.c_byte     ‘B’: ctypes.c_ubyte‘h’: ctypes.c_short     ‘H’: ctypes.c_ushort    ‘i’: ctypes.c_int      ‘I’: ctypes.c_uint
‘l’: ctypes.c_long,    ‘L’: ctypes.c_ulong    ‘f’: ctypes.c_float    ‘d’: ctypes.c_double

Server process

由Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array类型的支持

from multiprocessing import Process, Manager
 def f(d, l):
    d[1] = \'1\'
    d[\'2\'] = 2
    d[0.25] = None
    l.reverse()
 
if __name__ == \'__main__\':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))
 
        p = Process(target=f, args=(d, l))
        p.start()
        p.join()
 
        print(d)
        print(l)

输出:

{0.25: None, 1: \'1\', \'2\': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

Server process manager比 shared memory 更灵活,因为它可以支持任意的对象类型。另外,一个单独的manager可以通过进程在网络上不同的计算机之间共享,不过他比shared memory要慢。

进程池:

Pool类描述了一个工作进程池,他有几种不同的方法让任务卸载工作进程。

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止我们可以用Pool类创建一个进程池, 展开提交的任务给进程池。 例:

from multiprocessing import Pool
import time
def myFun(i):
    time.sleep(2)
    return i+100

def end_call(arg):
    print("end_call",arg)
p = Pool(5)

# print(p.map(myFun,range(10)))
for i in range(10):
    p.apply_async(func=myFun,args=(i,),callback=end_call)
print("end")p.close()
p.join()
from multiprocessing import Pool, TimeoutError
import time
import os
 
def f(x):
    return x*x
 
if __name__ == \'__main__\':
    # 创建4个进程 
    with Pool(processes=4) as pool:
 
        # 打印 "[0, 1, 4,..., 81]" 
        print(pool.map(f, range(10)))
 
        # 使用任意顺序输出相同的数字, 
        for i in pool.imap_unordered(f, range(10)):
            print(i)
 
        # 异步执行"f(20)" 
        res = pool.apply_async(f, (20,))      # 只运行一个进程 
        print(res.get(timeout=1))             # 输出 "400" 
 
        # 异步执行 "os.getpid()" 
        res = pool.apply_async(os.getpid, ()) # 只运行一个进程 
        print(res.get(timeout=1))             # 输出进程的 PID 
 
        # 运行多个异步执行可能会使用多个进程 
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])
 
        # 是一个进程睡10秒 
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("发现一个 multiprocessing.TimeoutError异常")
 
        print("目前,池中还有其他的工作")
 
    # 退出with块中已经停止的池 
    print("Now the pool is closed and no longer available")
进程池官方版

 

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
  • processes :使用的工作进程的数量,如果processes是None那么使用 os.cpu_count()返回的数量。
  • initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
  • maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个心的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
  • context: 用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context注意:Pool对象的方法只可以被创建pool的进程所调用。进程池的方法
  • apply(func[, args[, kwds]]) :使用arg和kwds参数调用func函数,结果返回前会一直阻塞,由于这个原因,apply_async()更适合并发执行,另外,func函数仅被pool中的一个进程运行。

  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一个变体,会返回一个结果对象。如果callback被指定,那么callback可以接收一个参数然后被调用,当结果准备好回调时会调用callback,调用失败时,则用error_callback替换callback。 Callbacks应被立即完成,否则处理结果的线程会被阻塞。

  • close() : 阻止更多的任务提交到pool,待任务完成后,工作进程会退出。

  • terminate() : 不管任务是否完成,立即停止工作进程。在对pool对象进程垃圾回收的时候,会立即调用terminate()。

  • join() : wait工作线程的退出,在调用join()前,必须调用close() or terminate()。这样是因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程。

  • map(func, iterable[, chunksize])¶

  • map_async(func, iterable[, chunksize[, callback[, error_callback]]])¶

  • imap(func, iterable[, chunksize])¶

  • imap_unordered(func, iterable[, chunksize])

  • starmap(func, iterable[, chunksize])¶

  • starmap_async(func, iterable[, chunksize[, callback[, error_back]]])

协程

协程又叫微线程,从技术的角度来说,“协程就是你可以暂停执行的函数”。如果你把它理解成“就像生成器一样”,那么你就想对了。 线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。

协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。

协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;

Event Loop

Event Loop是一种等待程序分配时间或消息的编程架构。简单的说就是 当事件A发生的时候,我们就去执行事件B。 最简单的例子就是:当我们浏览网页的时候,我们点击页面的某个元素,这个点击事件会被 javascript 捕捉到,然后 JavaScript 就会检查这个事件是否绑定了onclick()回调函数来处理这个事件,只要绑定了,onclick()回调函数就会被执行。

event loop是协程执行的控制点, 如果你希望执行协程, 就需要用到它们。

event loop提供了如下的特性:

  • 注册、执行、取消延时调用(异步函数)
  • 创建用于通信的client和server协议(工具)
  • 创建和别的程序通信的子进程和协议(工具)
  • 把函数调用送入线程池中

协程示例:

import asyncio
 
async def cor1():
    print("COR1 start")
    await cor2()
    print("COR1 end")
 
async def cor2():
    print("COR2")
 
loop = asyncio.get_event_loop()
loop.run_until_complete(cor1())
loop.close()

最后三行是重点:

  • asyncio.get_event_loop()  : asyncio启动默认的event loop 
  • run_until_complete()  :  这个函数是阻塞执行的,知道所有的异步函数执行完成,
  • close()  :  关闭event loop。

subprocess模块

通过使用subprocess模块可以创建新的进程,连接到他们的输入/输出/错误管道,并获取他们的返回值。 该模块计划替代及一个旧的模块的方法:

os.system 
os.spawn* 

使用subprocess模块

在所有用例调用subprocess时推荐使用run()方法,更高级的用例,可以直接使用subprocess.Popen接口。

run()方法

在Python3.5增加的。

subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, shell=False, timeout=None, check=False) 

run()默认不会捕捉到标准输出和标准错误输出,要捕捉的话,可以为标准输出和标准错误输出指定subprocess.PIPE(一个特殊值,可被用于Popen的stdin, stdout或 stderr参数,表示一个标准流的管道应该被打开, Popen.communicate()用的最多)。

  • args :args应该是一个字符串或者一个序列。
  • timeout:设置超时时间,会传递给subprocess.Popen.communicate()。如果超时,子进程会被杀死并等待。子进程被终止后会报告一个 TimeoutExpired异常。
  • input参数会传递给subprocess.Popen.communicate(),从而作为subprocess的标准输入。当我们使用的时候,内部的Popen对象会自动创建stdin=PIPE,stdin参数可能不会被使用。
  • check:如果check参数为True,且进程退出的时候得到退出码一个非0的值,会报告一个 CalledProcessError异常
  • shell:shell参数默认为False,此时arg参数应该是一个列表。subprocess.run(["ls","-l"]) ; 当shell=True时,args可以是一个字符串。subprocess.run("ls -l",shell=True)
>>> ret = subprocess.run(["ls", "-l"])  # doesn\'t capture output 
CompletedProcess(args=[\'ls\', \'-l\'], returncode=0)
>>> print(ret.stdout)
None
 
>>> subprocess.run("exit 1", shell=True, check=True)
Traceback (most recent call last):
  ...
subprocess.CalledProcessError: Command \'exit 1\' returned non-zero exit status 1
 
>>> ret1 = subprocess.run(["ls", "-l", "/dev/null"], stdout=subprocess.PIPE)
CompletedProcess(args=[\'ls\', \'-l\', \'/dev/null\'], returncode=0,
stdout=b\'crw-rw-rw- 1 root root 1, 3 Jan 23 16:23 /dev/null\\n\')
 
>>> print(ret.stdout)
b\'crw-rw-rw- 1 root root 1, 3 6\\xe6\\x9c\\x88   8 06:50 /dev/null\\n\'

call方法

subprocess.call(args, *, stdin=Nonestdout=Nonestderr=Noneshell=Falsetimeout=None

call()方法等价于:run(..., check=True)和run()方法类所以,只是不支持input参数和check参数; 

注意: 不要在这个方法里使用stdout=PIPE 或 stderr=PIPE,当到一个管道的输出填满系统的管道缓存时,子进程会被阻塞。  

check_call方法

subprocess.check_output(args, *, stdin=None, stderr=None, shell=False, universal_newlines=False, timeout=None)

check_call()方法等价于: run(..., check=True, stdout=PIPE).stdout

3.1新增,3.3时增加了timeout参数,3.4时增加了对关键字参数的支持

check_output()方法

内部调用的是run()方法,但是会捕捉到stdout

>>> ret = subprocess.check_output(["ls", "-l", "/dev/null"])
>>> print(ret)
b\'crw-rw-rw- 1 root root 1, 3 6\\xe6\\x9c\\x88   8 06:50 /dev/null\\n\'

Popen类

上面的四个方法本质上调用的都是subprocess中的Popen类。

Popen对象都有以下方法:

poll() : 检查子进程是否已经终止,返回一个returncode,相当于exit code。

wait() : 等待子进程终止,返回一个returncode

communicate(input=None) :和进程进行交互:发送数据到stdin;从stdout和stderr读取数据,直到读取完。等待进程终止。可选参数input会传递数据给子进程,当input=None事,没有数据传递给子进程。 communicate() 返回一个元组 (stdout, stderr). 注意: 读取的数据在内存的buffer中,所以当数据大小大于buffer或者没有限制时,不要使用这个方法。

 

以上是关于线程进程协程的主要内容,如果未能解决你的问题,请参考以下文章

python 多进程,多线程,协程

进程线程协程

进程线程协程的区别

进程_线程 之 --- 协程

协程

进程线程和协程的区别