自动化运维Python系列之进程线程协程

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自动化运维Python系列之进程线程协程相关的知识,希望对你有一定的参考价值。

进程 线程 协程

1)进程是具有一定独立功能的的程序关于某个数据集合上的一次运行活动,是系统进行资源分配和调度的独立单位

2)线程是进程的一个实体,是CPU调度和分派的基本单位

3)协程是程序自身产生的一种线程复用机制,作用是让一个线程重复利用,减少系统资源开销,提高程序效率


技术分享

由于进程、线程都是操作系统的基本概念,比较抽象,我们可以将CPU看作是一个时刻在运行中的大型工厂,车间就是工厂里具有独立工作能力的程序进程,每个车间里工作的机器人就是线程:

系统工作模式:

同一时间工厂只能为一个车间供电,供电期间CPU调度线程,完成他们自己的工作,一旦供电时间到,即使有线程工作未完成,也会立即停止,各线程会保存当前工作的进度(系统的上下文切换),等待下一次供电时间,继续完成上一次工作;其实我们系统里同一时刻确实只能运行一个程序进程,只是由于CPU在各个程序之间切换的速度够快,我们感知不到,所有表面上看所有程序都是同时运行的。

 

Python线程

import threading
import time
 
def show(arg):
    time.sleep(1)
    print(‘thread‘ + str(arg))
 
# 创建10个线程 start表示创建成功 等待CPU调度运行
for i in range(10):
    t = threading.Thread(target=show, args=(i,))
    t.start()
 
print(‘main thread stop‘)
 
# 输出:
main thread stop
thread3
thread2
thread1
thread0
...
 
# Thread      线程的其他方法
# start      线程准备就绪,等待CPU调度
# setName    为线程设置名称
# getName    获取线程名称
# setDaemon  设置为后台线程或前台线程(默认)
#            如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止
#            如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
# join      逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
# run       线程被cpu调度后自动执行线程对象的run方法

自定义创建线程类

# 线程被创建准备好后 被cpu调度会去执行threading.Thread里面的 run 方法
 
class MyThread(threading.Thread):
 
    def __init__(self, func, args):
        self.func = func
        self.args = args
        super(MyThread, self).__init__()
 
    def run(self):
        self.func(self.args)
 
def f2(arg):
    print(arg)
 
obj = MyThread(f2, 123)
obj.start()

线程锁

由于CPU调用线程是随机性的,也就是线程被执行顺序不固定,在创建了很多线程同时去执行修改同一数据时就会发生操作顺序混乱,导致出现脏数据

import threading
import time
 
NUM = 10
 
def func():
    global NUM
    NUM -= 1
    # 程序等待1秒钟 10个线程基本都执行到此处
    time.sleep(1)
    # 此时打印出来的 NUM 值就是 0 了
    print(NUM)
  
for i in range(10):
    t = threading.Thread(target=func,)
    t.start()
 
# 输出
0
0
0
...

加线程锁以后

import threading
import time
 
NUM = 10
 
def func(l):
    global NUM
    l.acquire()
    NUM -= 1
    time.sleep(1)
    print(NUM)
    l.release()
 
# RLock 可以加多层线程锁
# lock = threading.Lock()
lock = threading.RLock()
 
for i in range(10):
    t = threading.Thread(target=func, args=(lock, ))
    t.start()
 
# 输出
9
8
7
6
...

信号量(Semaphore)

线程锁同时只允许一个线程更改数据,而semaphore可以允许同时一定数量的线程通过,后面的线程将等待

import threading, time
 
def run(n):
    semaphore.acquire()
    time.sleep(1)
    print("run the thread: %s" % n)
    semaphore.release()
 
if __name__ == ‘__main__‘:
    num = 0
    # 最多允许5个线程同时运行
    semaphore = threading.BoundedSemaphore(5)
    for i in range(20):
        t = threading.Thread(target=run, args=(i,))
        t.start()

事件(event)

Python线程的事件用于主线程控制子线程的执行,事件主要提供3中方法:set、wait、clear

有点像红绿灯:clear红灯停 set绿灯行 wait黄灯等

import threading
 
def func(i, e):
    print(i)
    # 出现wait后 所有线程将在此阻塞 直到用户输入1设置event.set放行
    e.wait()
    print(i + 100)
 
event = threading.Event()
 
for i in range(3):
    t = threading.Thread(target=func, args=(i, event,))
    t.start()
# 默认为设置为clear阻塞
event.clear()
 
inp = input(‘>>>‘)
# 手动放行
if inp == ‘1‘:
    event.set()
 
# 输出:
1
2
3
>>> 1
100
200
300

Condition条件

condition使得在等待的线程 满足一定条件才会被放行设定数量的线程

import threading
 
def func(i, con):
    print(i)
    con.acquire()
    con.wait()
    print(i+100)
    con.release()
 
c = threading.Condition()
 
for i in range(10):
    t = threading.Thread(target=func, args=(i, c, ))
    t.start()
 
while True:
    inp = input(‘>>>‘)
    if inp == ‘q‘:
        break
    c.acquire()
    c.notify(int(inp))
    c.release()

Timer

指定n秒后 执行某操作

from threading import Timer
 
def hello():
    print("hello, world")
 
t = Timer(1, hello)
t.start()

 

Python自定义线程池

Python中在3之前没有线程池 3中已有的线程池也是功能非常少的低级线程池

自定义一个线程池

import queue
import threading
import contextlib
import time
 
# 空任务标识 终止进程
StopEvent = object()
 
class ThreadPool(object):
 
    def __init__(self, max_num, max_task_num=None):
        if max_task_num:
            # 装任务的队列q
            self.q = queue.Queue(max_task_num)
        else:
            self.q = queue.Queue()
        # 允许的最大大线程数
        self.max_num = max_num
        # 终止线程
        self.cancel = False
        self.terminal = False
        # 当前已创建的线程数量
        self.generate_list = []
        # 当前空闲的线程数量
        self.free_list = []
 
    def run(self, func, args, callback=None):
        """
        线程池执行一个任务
        :param func: 任务函数
        :param args: 任务函数所需参数
        :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
        :return: 如果线程池已经终止,则返回True否则None
        """
        if self.cancel:
            return
        # 判断已经创建的线程数量是否小于最大线程数 表示所有的线程在忙 多余任务不再创建线程
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        # 任务元组放到队列
        w = (func, args, callback,)
        self.q.put(w)
 
    def generate_thread(self):
        """
        创建一个线程
        """
        t = threading.Thread(target=self.call)
        t.start()
 
    def call(self):
        """
        循环去获取任务函数并执行任务函数
        """
        current_thread = threading.currentThread()
        # 将已经创建的线程加入线程列表
        self.generate_list.append(current_thread)
 
        # 取任务元组
        event = self.q.get()
        while event != StopEvent:
            # 得到任务元组的内容
            func, arguments, callback = event
            try:
                # 执行 action 函数
                result = func(*arguments)
                success = True
            except Exception as e:
                success = False
                result = None
 
            if callback is not None:
                try:
                    callback(success, result)
                except Exception as e:
                    pass
 
            # 将线程标记为空闲
            with self.worker_state(self.free_list, current_thread):
                if self.terminal:
                    event = StopEvent
                else:
                    # 如果取到的是空任务执行下面的else
                    event = self.q.get()
        else:
 
            self.generate_list.remove(current_thread)
 
    def close(self):
        """
        执行完所有的任务后,所有线程停止
        """
        self.cancel = True
        # 创建了多少线程就传几个空任务
        full_size = len(self.generate_list)
        while full_size:
            self.q.put(StopEvent)
            full_size -= 1
 
    def terminate(self):
        """
        无论是否还有任务,终止线程
        """
        self.terminal = True
        while self.generate_list:
            self.q.put(StopEvent)
        self.q.queue.clear()
 
    @contextlib.contextmanager
    def worker_state(self, state_list, worker_thread):
        """
        用于记录线程中正在等待的线程数
        """
        state_list.append(worker_thread)
        try:
            yield
        finally:
            state_list.remove(worker_thread)
 
# How to use
 
pool = ThreadPool(5)
 
def callback(status, result):
    # status, execute action status
    # result, execute action return value
    pass
 
def action(i):
    print(i)
 
# 30个任务
for i in range(30):
    ret = pool.run(action, (i,), callback)
 
time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
pool.close()
# pool.terminate()

 

Python进程

创建进程类似于创建线程

不同的是,创建进程需要消耗系统不小的开销 而且各进程间数据不共享

from multiprocessing import Process
import threading
import time
   
def foo(i):
    print ‘say hi‘,i
   
for i in range(10):
    p = Process(target=foo,args=(i,))
    p.start()

进程间数据共享方式

1)queues 队列方式

2)Array 数组方式

3)dict 字典方式

# 方式一 queues
 
from multiprocessing import Process
from multiprocessing import queues
import multiprocessing
 
def foo(i, arg):
    arg.put(i)
    print(‘say hi‘, i, arg.qsize())
 
if __name__ == ‘__main__‘:
    li = queues.Queue(20, ctx=multiprocessing)
    for i in range(10):
        p = Process(target=foo, args=(i, li, ))
        p.start()
 
# 方式二 Array数据
 
from multiprocessing import Process
from multiprocessing import Array
 
def foo(i, arg):
    arg[i] = i + 100
    for item in arg:
        print(item)
    print(‘=========‘)
 
if __name__ == ‘__main__‘:
    # 使用数据Array必须指定数据类型 和长度
    li = Array(‘i‘, 10)
    for i in range(10):
        p = Process(target=foo, args=(i, li, ))
        p.start()
        p.join()
 
# 方式三 dict
 
from multiprocessing import Process
from multiprocessing import Manager
import time
 
def foo(i, arg):
    arg[i] = 100 + i
    print(arg.values())
 
if __name__ == ‘__main__‘:
    obj = Manager()
    li = obj.dict()
    for i in range(10):
        p = Process(target=foo, args=(i, li, ))
        p.start()
        # 主进程结束后 子进程会被强行终止
        # 可以使用join等待所有子进程全部执行完成 或者 time.sleep
        p.join()

进程锁

from multiprocessing import Process
from multiprocessing import Array
from multiprocessing import RLock, Lock, Event, Condition, Semaphore
import time
 
def foo(i, lis, lc):
    lc.acquire()
    # 将列表中的数字递减1
    lis[0] = lis[0] - 1
    time.sleep(1)
    print(‘say hi‘, lis[0])
    lc.release()
 
if __name__ == ‘__main__‘:
    li = Array(‘i‘, 1)
    li[0] = 10
    lock = RLock()
    for i in range(10):
        p = Process(target=foo, args=(i, li, lock))
        p.start()

进程池

进程池内部维护一个进程序列 但使用时 这去进程池中获取一个进程,如果进程中没有,则需要等待,知道进程池中有进程为止

import time
from multiprocessing import Pool
 
def f1(arg):
    time.sleep(1)
    print(arg)
 
if __name__ == ‘__main__‘:
    pool = Pool(5)
  
    for i in range(30):
        # pool.apply(func=f1, args=(i, ))
        # 异步执行 一部到位
        pool.apply_async(func=f1, args=(i, ))
 
    # close 表示所有的子进程任务执行完毕
    pool.close()
    # time.sleep(1)
    # terminate 表示立即终止所有子进程
    # pool.terminate()
    pool.join()

协程

进程和协程的操作是系统,而协程的创建和操作是程序的编写者

协程存在的意义:对于多协程的应用,CPU通过切片的方式来切换协程间的执行,线程切换时需要耗时(保存状态,下次继续)协程则只使用一个线程,在一个线程中规定某个代码块执行顺序

协程使用场景:当程序中存在大量不需要使用CPU的操作时,即多IO操作适用协程

技术分享

greelet

from greenlet import greenlet
 
def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()
 
def test2():
    print(56)
    gr1.switch()
    print(78)
 
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()

gevent

import gevent
  
def foo():
    print(‘Running in foo‘)
    gevent.sleep(0)
    print(‘Explicit context switch to foo again‘)
  
def bar():
    print(‘Explicit context to bar‘)
    gevent.sleep(0)
    print(‘Implicit context switch back to bar‘)
  
gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])

通过IO自动切换来完成网站请求


from gevent import monkey; monkey.patch_all()
import gevent
import requests
 
def f(url):
    print(‘GET: %s‘ % url)
    resp = requests.get(url)
    data = resp.text
    print(‘%d bytes received from %s.‘ % (len(data), url))
 
gevent.joinall([
        gevent.spawn(f, ‘https://www.python.org/‘),
        gevent.spawn(f, ‘https://www.yahoo.com/‘),
        gevent.spawn(f, ‘https://github.com/‘),
])
 
# 输出
GET: https://www.python.org/
GET: https://www.yahoo.com/
GET: https://github.com/
462078 bytes received from https://www.yahoo.com/.
25689 bytes received from https://github.com/.
47373 bytes received from https://www.python.org/.

 

队列queue

import queue
 
# queue.Queue 先进先出
# q = queue.LifoQueue() 后进先出
 
# 权重队列
# q = queue.PriorityQueue()
# q.put((1, "alex"))
# print(q.get())
 
# 双向队列
# q = queue.deque()
# q.append(123)
# q.append(456)
# q.appendleft(789)
# q.pop()
# q.popleft()
 
# 先进先出队列
# 参数10表示最多只接收10个数据
# q = queue.Queue(10)
# q.put(11)
# q.put(22)
# 超时时间timeout block是否阻塞
# q.put(33, timeout=2)
# q.put(33, block=False)
 
# print(q.qsize())
# get默认阻塞 有数据才能取
# print(q.get())
# print(q.get(block=False))
 
# join task_done 阻塞进程 当队列中的任务执行完毕后 不再阻塞
q = queue.Queue()
q.put(123)
q.put(456)
q.get()
q.task_done()
q.get()
q.task_done()
 
q.join()





本文出自 “改变从每一天开始” 博客,请务必保留此出处http://lilongzi.blog.51cto.com/5519072/1880974

以上是关于自动化运维Python系列之进程线程协程的主要内容,如果未能解决你的问题,请参考以下文章

Python自动化运维之高级函数

Python自动化运维之高级函数

python 复习—并发编程系统并发线程和进程协程GIL锁CPU/IO密集型计算

python之线程进程和协程

python----单线程实现并发之协程

Python之进程线程协程篇