并发编程---线程queue---进程池线程池---异部调用(回调机制)

Posted Mr。yang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程---线程queue---进程池线程池---异部调用(回调机制)相关的知识,希望对你有一定的参考价值。

线程

  • 队列:先进先出
  • 堆栈:后进先出
  • 优先级:数字越小优先级越大,越先输出
import queue

q = queue.Queue(3) # 先进先出-->队列

q.put(\'first\')
q.put(2)
# q.put(\'third\')
# q.put(4) #由于没有人取走,就会卡主
q.put(4,block=False)  #等同于q.get_nowait(), Ture 阻塞,Flase不阻塞,报异常满了
# # q.put(4,block=True,timeout=3)

print(q.get())
print(q.get())
print(q.get())
print(q.get(block=True,timeout=3)) # 阻塞等待3秒 没有取走数据就报异常
# print(q.get(block=False)) #等同于q.get_nowait()
# print(q.get_nowait())


q = queue.LifoQueue(3) #后进先出-->堆栈
q.put(\'first\')
q.put(2)
q.put(\'third\')

print(q.get())
print(q.get())
print(q.get())
\'\'\'
打印结果:
third
2
first
\'\'\'

q = queue.PriorityQueue(3) #优先级队列

q.put((10,\'one\'))
q.put((40,\'two\'))
q.put((30,\'three\'))

print(q.get())
print(q.get())
print(q.get())
\'\'\'
数字越小优先级越高
打印结果
(10, \'one\')
(30, \'three\')
(40, \'two\')
\'\'\'
线程queue

进程池线程池

  • 池:是用来对进程(线程)的数量加以限制
  • 进程池:计算密集型,用多进程
  • 线程池:IO密集型,用多线程,例如:sockect网络通信就应该用多线程
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time,random

\'\'\'
sockect网络通信是IO操作,所以用多线程
计算密集型:用多进程
\'\'\'

def task(name):
    print(\'name:%s pid:%s run\' %(name,os.getpid()))
    time.sleep(random.randint(1,3))

if __name__ == \'__main__\':
    # pool = ProcessPoolExecutor(4) # 进程池最多装4个进程,不指定的话默认是cpu的核数
    pool = ThreadPoolExecutor(5)
    for i in range(10):
        pool.submit(task,\'yang%s\' %i) # 异步调用池子收了10个任务,但同一时间只有4个任务在进行

    pool.shutdown(wait=True) # 类似join  代表往池子里面丢任务的入口关掉 计数器-1
    print(\'\')
\'\'\'
打印结果:
name:yang0 pid:11120 run
name:yang1 pid:11120 run
name:yang2 pid:11120 run
name:yang3 pid:11120 run
name:yang4 pid:11120 run

name:yang5 pid:11120 run
name:yang6 pid:11120 run
name:yang7 pid:11120 run

name:yang8 pid:11120 run
name:yang9 pid:11120 run
主
\'\'\'

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import currentThread
import os,time,random

def task():
    print(\'name:%s pid:%s run\' %(currentThread().getName(),os.getpid()))
    time.sleep(random.randint(1,3))

if __name__ == \'__main__\':
    # pool = ProcessPoolExecutor(4) # 进程池最多装4个进程,不指定的话默认是cpu的核数
    pool = ThreadPoolExecutor(5)
    for i in range(10):
        pool.submit(task) # 异步调用池子收了10个任务,但同一时间只有4个任务在进行

    pool.shutdown(wait=True) # 类似join  代表往池子里面丢任务的入口关掉 计数器-1
    print(\'\')
\'\'\'
打印结果:
name:ThreadPoolExecutor-0_0 pid:14052 run
name:ThreadPoolExecutor-0_1 pid:14052 run
name:ThreadPoolExecutor-0_2 pid:14052 run
name:ThreadPoolExecutor-0_3 pid:14052 run
name:ThreadPoolExecutor-0_4 pid:14052 run
name:ThreadPoolExecutor-0_2 pid:14052 run
name:ThreadPoolExecutor-0_1 pid:14052 run
name:ThreadPoolExecutor-0_3 pid:14052 run
name:ThreadPoolExecutor-0_4 pid:14052 run
name:ThreadPoolExecutor-0_0 pid:14052 run
主
\'\'\'
进程池|线程池

同步调用和异步调用

提交任务的两种方式:

  • 同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果,再执行下一行代码,导致程序是串行执行
  • 异步调用:提交完任务后,不在原地等待任务执行完。回调机制:自动触发
#1.同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果,再执行下一行代码,导致程序是串行执行

from concurrent.futures import ThreadPoolExecutor
import time
import random

def la(name):
    print(\'%s is laing\' %name)
    time.sleep(random.randint(3,5))
    res = random.randint(7,13)*\'#\'
    return {\'name\':name,\'res\':res}

def weigh(shit):
    name = shit[\'name\']
    size = len(shit[\'res\'])
    print(\'%s 拉了 <%s>kg\' %(name,size))

if __name__ == \'__main__\':
    pool = ThreadPoolExecutor(10)

    shit1 = pool.submit(la,\'alex\').result()
    weigh(shit1)

    shit2 = pool.submit(la,\'yang\').result()
    weigh(shit2)

    shit3 = pool.submit(la,\'hang\').result()
    weigh(shit3)
\'\'\'
打印结果:
alex is laing
alex 拉了 <8>kg
yang is laing
yang 拉了 <8>kg
hang is laing
hang 拉了 <7>kg
\'\'\'
同步调用
#2.异步调用:提交完任务后,不在原地等待任务执行完
from concurrent.futures import ThreadPoolExecutor
import time
import random

def la(name):
    print(\'%s is laing\' %name)
    time.sleep(random.randint(3,5))
    res = random.randint(7,13)*\'#\'
    return {\'name\':name,\'res\':res}
    # weigh({\'name\':name,\'res\':res})  # 这样写,所有功能 不能体现出解耦合

def weigh(shit):
    shit = shit.result() # 拿到是一个对象,需要进行result()
    name = shit[\'name\']
    size = len(shit[\'res\'])
    print(\'%s 拉了 <%s>kg\' %(name,size))

if __name__ == \'__main__\':
    pool = ThreadPoolExecutor(10)

    shit1 = pool.submit(la,\'alex\').add_done_callback(weigh)

    shit2 = pool.submit(la,\'yang\').add_done_callback(weigh)

    shit3 = pool.submit(la,\'hang\').add_done_callback(weigh)
\'\'\'
打印结果:
alex is laing
yang is laing
hang is laing
hang 拉了 <10>kg
alex 拉了 <7>kg
yang 拉了 <12>kg
\'\'\'
异步调用

异步调用的应用

from concurrent.futures import ThreadPoolExecutor
import requests
import time

def get(url):
    print(\'GET %s\'%url)
    response = requests.get(url)
    time.sleep(3)
    return {\'url\':url,\'content\':response.text}

def parse(res):
    res = res.result()
    print(\'%s parse res is %s\' %(res[\'url\'],len(res[\'content\'])))

if __name__ == \'__main__\':
    urls = [
        \'http://www.cnblogs.com/linhaifeng\',
        \'https://www.python.org\',
        \'https://www.openstack.org\',
    ]

    pool = ThreadPoolExecutor(2)
    for url in urls:
        pool.submit(get,url).add_done_callback(parse)
\'\'\'
打印结果:
GET http://www.cnblogs.com/linhaifeng
GET https://www.python.org
http://www.cnblogs.com/linhaifeng parse res is 16320
GET https://www.openstack.org
https://www.python.org parse res is 49273
https://www.openstack.org parse res is 64040
\'\'\'
应用

以上是关于并发编程---线程queue---进程池线程池---异部调用(回调机制)的主要内容,如果未能解决你的问题,请参考以下文章

并发编程 - 线程 - 1.线程queue/2.线程池进程池/3.异步调用与回调机制

并发编程 - 线程 - 1.线程queue/2.线程池进程池/3.异步调用与回调机制

并发编程---线程queue---进程池线程池---异部调用(回调机制)

python并发编程之进程池,线程池

并发编程 之 线程的队列, 线程池; 以及协程

并发编程目录