同步异步 + 回调函数

Posted lvweihe

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了同步异步 + 回调函数相关的知识,希望对你有一定的参考价值。

重点记忆 异步回调函数

如果进程池+回调: 回调函数由主进程去执行.
如果线程池+回调: 回到函数由空闲的线程去执行.(比如有4个线程,10个任务,第一轮完成4个任务,交由主线程处理结果,第二轮同样如此,但是第三轮将会空闲出2个子进程,则这2个子进程将会和主进程一同处理结果,以此类推,当所有的任务完成时,所有的子进程和主进程一起处理结果,增加效率)

回调函数不管有没有返回数据,返回值都是None,回调函数内部加函数名是调用此函数,obj隐形传参

1.概念

1. 从执行的角度

  1. 阻塞: 程序运行时,遇到了io,程序挂起,操作系统强行将CPU切走
  2. 非阻塞: 程序员没有遇到io,或者程序员遇到io但是我通过某种手段,让CPU强行运行我的程序

2. 提交任务的角度

  1. 同步: 提交一个任务,子任务开始运行直到此任务结束(可能遇到 io ),并返回一个返回值,我在提交下一个任务
  2. 异步: 一次提交所有任务,然后我就直接执行下一行代码,效率高

3. 什么叫爬虫?

  1. 利用代码模拟一个浏览器,进行浏览器的工作流程得到一堆源代码.
  2. 对源代码进行数据清洗得到我想要数据.

2. 同步调用

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os

def task(i):
    print(f'os.getpid()开始执行')
    time.sleep(random.randint(1,3))
    print(f'\\033[1;35;0mos.getpid()任务结束\\033[0m')
    return i  #就是obj.result()的返回值

if __name__ == '__main__':
    pool = ProcessPoolExecutor(4)
    for i in range(10):
        obj = pool.submit(task,i) #默认接受
        # obj是一个动态对象,返回的当前的对象的状态,有可能运行中,可能(就绪阻塞),还可能是结束了
        print(f'任务结果是:obj')
        print(f'任务结果是:obj.result()')
        #obj.result() #阻塞,必须等到这个任务完成并返回了结果之后,再执行下一个任务
    pool.shutdown(wait=True)
    # shutdown: 让我的主进程等待进程池中所有的子进程都结束任务之后, 再执行.有点类似与join
    print('==主')
obj是一个动态对象,返回的当前的对象的状态,有可能运行中,可能(就绪阻塞),还可能      是结束了.
obj.result() 必须等到这个任务完成后,返回了结果之后,在执行下一个任务.
shutdown: 让我的主进程等待进程池中所有的子进程都结束任务之后,再执行. 有点类    似与join.
shutdown: 在上一个进程池没有完成所有的任务之前,不允许添加新的任务.
一个任务是通过一个函数实现的,任务完成了他的返回值就是函数的返回值.

3.异步调用

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time
import random
import os

def task(i):
    print(f'os.getpid()开始任务')
    time.sleep(random.randint(1,3))
    print(f'\\033[1;35;0mos.getpid()任务结束\\033[0m')
    return i

if __name__ == '__main__':
    #异步调用:基于发布任务的角度
    pool = ProcessPoolExecutor(4) #设置进程数,pid一共就四个
    for i in range(10): #一次发布10个任务
        pool.submit(task,i)
    pool.shutdown(wait=True)

4.爬虫简解

1.安装第三方模块 requests (安装方法见博客园)

import requests
ret = requests.get('http://www.taobao.com')
if ret.status_code == 200:  #固定写法,验证请求
    print(ret.text) #获取源码
    print(len(ret.text))  #有时候直接打印ret.text报错,是浏览器编码问题,代码可用

2.版本一

    1. 异步发出10个任务,并发的执行,但是统一的接收所有的任务的返回值.(效率低,不能实时的获取结果)
    2. 分析结果流程是串行,影响效率.
         for res in obj_list:
            print(parse(res.result()))
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import requests

def task(url):
    """模拟的就是多个源代码,一定有io操作(爬取时网络延迟)"""
    ret = requests.get(url)
    if ret.status_code == 200:
        return ret.text  #源码

def parse(content):
    """模拟对数据进行分析 一般没有IO"""
    return len(content)

if __name__ == '__main__':
    url_list = ['http://www.taobao.com',
                'http://www.baidu.com',
                'http://www.aqiyi.com',
                'http://www.youku.com',
                'https://gitee.com/',
                'https://www.cnblogs.com/jin-xin/articles/10067177.html']
    pool = ProcessPoolExecutor(1)
    obj_list = []
    for url in url_list:
        obj = pool.submit(task,url) #注意返回值不是ret.text
        obj_list.append(obj)
    print(obj_list)
    pool.shutdown(wait=True)
    for res in obj_list:
        print(parse(res.result()))  #res.result = ret.text  #分析结果串行
    print('==主')

3.版本二

  线程池设置4个线程, 异步发起10个任务,每个任务是通过网页获取源码+数据分析, 并发执行,最后将所有的结果展示出来.
  耦合性增强了.
  并发执行任务,此任务最好是IO阻塞,才能发挥最大的效果
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import requests

def task(url):
      """模拟的就是多个源代码,一定有io操作(爬取时网络延迟)"""
    ret = requests.get(url)
    if ret.status_code == 200:
        return parse(ret.text)

def parse(content):
    """模拟对数据进行分析 一般没有IO"""
    return len(content)

if __name__ == '__main__':
    url_list = [
            'http://www.baidu.com',
            'http://www.JD.com',
            'http://www.JD.com',
            'http://www.JD.com',
            'http://www.taobao.com',
            'https://www.cnblogs.com/jin-xin/articles/7459977.html',
            'https://www.luffycity.com/',
            'https://www.cnblogs.com/jin-xin/articles/9811379.html',
            'https://www.cnblogs.com/jin-xin/articles/11245654.html',
            'https://www.sina.com.cn/',
        ]
    pool = ProcessPoolExecutor(4)
    obj_list = []
    for url in url_list:
        obj = pool.submit(task,url)  #obj接收的是submit的返回值,一个动态对象
        obj_list.append(obj)
    pool.shutdown(wait=True)
    for res in obj_list:
        print(res.result())  #相当于obj.result()接受的是task的返回值

4.版本3 异步调用 + 回调函数

  1. 基于 异步调用回收所有任务的结果我要做到实时回收结果,# 并发执行任务每个任务只是处理IO阻塞的,不能增加新得功能.

  2. 重点

  3. 如果进程池+回调: 回调函数由主进程去执行.
    如果线程池+回调: 回到函数由空闲的线程去执行.
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import requests

def task(url):
    '''模拟的就是爬取多个源代码 一定有IO操作'''
    ret = requests.get(url)
    if ret.status_code == 200:
        return ret.text

def parse(self): #隐形传参,默认接受obj
    '''模拟对数据进行分析 一般没有IO'''
    print(len(self.result()))

if __name__ == '__main__':
    url_list = [
            'http://www.baidu.com',
            'http://www.JD.com',
            'http://www.JD.com',
            'http://www.JD.com',
            'http://www.taobao.com',
            'https://www.cnblogs.com/jin-xin/articles/7459977.html',
            'https://www.luffycity.com/',
            'https://www.cnblogs.com/jin-xin/articles/9811379.html',
            'https://www.cnblogs.com/jin-xin/articles/11245654.html',
            'https://www.sina.com.cn/',
        ]
    pool = ThreadPoolExecutor(4)

    for url in url_list:
        obj = pool.submit(task,url)
        obj.add_done_callback(parse)
        #回调函数不管有没有返回值,都是None,回调函数内部加函数名是调用此函数,obj隐形传参

以上是关于同步异步 + 回调函数的主要内容,如果未能解决你的问题,请参考以下文章

同步异步 + 回调函数

回调函数+同步异步函数

简单理解函数回调——同步回调与异步回调

同步回调函数和异步回调函数

异步回调函数

JavaScript 异步操作之回调函数