并发编程

Posted liuhongshuai

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程相关的知识,希望对你有一定的参考价值。

1,简单例子

技术分享图片
# import os
# import time
# print(os.getpid())#当前进程
# print(os.getppid())#父进程


# import os
# import time
# from multiprocessing import Process
# def func(money):
#     time.sleep(1)
#     print(‘取钱{}元‘.format(money))
#
# if __name__=="__main__":
#     p=Process(target=func,args=(10,))#创建一个进程对象
#     p.start()#进程开启
#     print(‘===============‘)
#     p.join()#阻塞
#     print(‘****************‘)

#主进程与子进程是异步执行
#如果在主进程结束了 子进程未结束 主进程会等待着子进程
View Code

2,开启多个子进程

技术分享图片
#开启多个子进程
# import os
# import time
# from multiprocessing import Process
# def func(i):
#     time.sleep(1)
#     print(‘{}:子进程{},父进程{}‘.format(i,os.getpid(),os.getppid()))
#
# if __name__==‘__main__‘:
#     for i in range(10):
#         p=Process(target=func,args=(i,))
#         p.start()
#         # p.join()#阻塞  让主进程等待子进程结束后执行
#     print(‘=======主进程================‘)
View Code

3,自定义进程

技术分享图片
#自定义进程 基于继承 必须实现run方法
# import os
# from multiprocessing import Process
# class MyProcess(Process):
#     def __init__(self,arg1,arg2):
#         super().__init__()
#         self.arg1=arg1
#         self.arg2=arg2
#     def run(self):
#         print(‘子进程{},{}-{}‘.format(os.getpid(),self.arg1,self.arg2))
#         self.walk()#在子进程中调用
#     def walk(self):
#         print(‘子进程{}‘.format(os.getpid()))
#
# if __name__==‘__main__‘:
#     p=MyProcess(1,2)
#     p.start()#run方法
#     p.join()
#     p.walk()#直接在主进程中调用,并没有在子进程中执行
#     print(‘主进程‘,os.getpid())
View Code

4,数据隔离

技术分享图片
#数据隔离 进程与进程之间的数据是共享的
# from multiprocessing import Process
# n=100
# def func():
#     global n
#     n=n-1
#     print(n)
#
# if __name__==‘__main__‘:
#     for i in range(10):
#         p=Process(target=func)
#         p.start()#结果均为99
#         p.join()
#     print(‘主进程:‘,n)#100
View Code

5,守护进程

技术分享图片
#守护进程 p.daemon=True
#守护进程会随着主进程的结束而结束
#守护进程要在start前设置 守护进程中不要再开子进程
# import time
# from multiprocessing import Process
#
# def func1():
#     print(‘func1 start‘)
#     time.sleep(5)
#     print(‘func1 end‘)
# def func2():
#     print(‘func2 start‘)
#     time.sleep(5)
#     print(‘func2 end‘)
#
# if __name__=="__main__":
#     p1=Process(target=func1)
#     p1.daemon=True
#     p1.start()#p1随主进程结束而结束
#     p2=Process(target=func2)
#     p2.start()
#     time.sleep(2)
#     print(‘=======主进程==========‘)
View Code

6,进程的其他方法和属性

技术分享图片
#进程的其他属性和方法 pid name is_alive() terminate
# import time
# from multiprocessing import Process
# def func():
#     time.sleep(2)
#     print(‘Hello World‘)
#
# if __name__=="__main__":
#     p=Process(target=func)
#     p.start()
#     print(p.pid)#进程id
#     print(p.name)#进程名字
#     p.name=‘进程1‘#修改进程名字
#     print(p.name)
#
#     print(p.is_alive())#进程是否活跃
#     p.terminate()#结束进程 但不会立刻被杀死
#     print(p.is_alive())
#     time.sleep(1)
#     print(p.is_alive())
View Code

7,锁

技术分享图片
#锁 在并发编程中 保证数据安全
# from multiprocessing import Lock
# lock=Lock()
# lock.acquire()
# lock.release()

#抢票实例
# import json
# import time
# import random
# from multiprocessing import Process
# from multiprocessing import Lock
#
# def search(i):
#     with open(‘ticket‘,‘r‘,encoding=‘utf-8‘) as f:
#         print(i,json.load(f)[‘count‘])
#
# def get(i):
#     with open(‘ticket‘,‘r‘,encoding=‘utf-8‘) as f:
#         ticket_num=json.load(f)[‘count‘]
#     time.sleep(random.random())
#     if ticket_num>0:
#         with open(‘ticket‘,‘w‘,encoding=‘utf-8‘) as f:
#             json.dump({‘count‘:ticket_num-1},f)
#         print(‘{}号抢到票了‘.format(i))
#     else:
#         print(‘{}号没票了‘.format(i))
#
# def task(i,lock):
#     search(i)#查看票
#     lock.acquire()
#     get(i)#抢票
#     lock.release()
#
# if __name__==‘__main__‘:
#     lock=Lock()
#     for i in range(20):#20个人同时抢票
#         p=Process(target=task,args=(i,lock))
#         p.start()
View Code

8,信号量

技术分享图片
#信号量 多把钥匙公用一把锁
# from multiprocessing import Semaphore
# sem=Semaphore(4)
# sem.acquire()
# sem.release()

#歌厅实例
# import time
# import random
# from multiprocessing import Process
# from multiprocessing import Semaphore
# def sing(i,sem):
#     sem.acquire()
#     print(‘{}进入ktv‘.format(i))
#     time.sleep(random.random())
#     print(‘{}出了ktv‘.format(i))
#     sem.release()
# #迷你歌厅 同一时间只能有4个人进入
# if __name__=="__main__":
#     sem=Semaphore(4)
#     for i in range(20):
#         p=Process(target=sing,args=(i,sem))
#         p.start()
View Code

9,事件

技术分享图片
# 事件 标志 同时 是所有的进程 都陷入阻塞
# from multiprocessing import Event
# e=Event()#实例化一个事件
# e.set()#将标志变为非阻塞
# e.wait()#等待
# e.clear()#将标志变为阻塞
# e.is_set()#判断是否阻塞

#红绿灯实例
# import time
# import random
# from multiprocessing import Process
# from multiprocessing import Event
#
# def traffic_light(e):
#     while True:
#         if e.is_set():
#             time.sleep(3)
#             print(‘红灯亮‘)
#             e.clear()#红变绿
#         else:
#             time.sleep(3)
#             print(‘绿灯亮‘)
#             e.set()#绿变红
#
# def car(i,e):
#     e.wait()
#     print(‘{}车通过‘.format(i))
#
# if __name__=="__main__":
#     e=Event()#立一个红灯
#     tra=Process(target=traffic_light,args=(e,))
#     tra.start()#启动一个进程来控制红绿灯
#     for i in range(20):
#         if i%6==0:
#             time.sleep(random.randint(1,3))
#         car_pro=Process(target=car,args=(i,e))
#         car_pro.start()
View Code

10,队列

技术分享图片
#队列
# from multiprocessing import Queue
# q=Queue(3)#有长度限制
# # q=Queue()#没有长度限制
# q.put(1)
# q.put(2)
# print(q.qsize())#有多少值
# q.put(3)
# # q.put(4)#阻塞
# print(q.get())
# print(q.get())
# print(q.get())
# # print(q.get())#阻塞

#利用队列实现了 主进程与子进程的通信 子进程之间的通信
# from multiprocessing import Process
# from multiprocessing import Queue
# def q_put(q):
#     q.put(‘Hello World‘)
# def q_get(q):
#     print(q.get())
#
# if __name__=="__main__":
#     q=Queue()
#     p=Process(target=q_put,args=(q,))
#     p.start()
#     p1=Process(target=q_get,args=(q,))
#     p1.start()
View Code

11,生产者消费者模型

技术分享图片
#生产者消费者模型
# from multiprocessing import Process
# from multiprocessing import Queue
# import random
# import time
#
# def producer(q,food):
#     for i in range(5):
#         q.put(‘{}-{}‘.format(food,i))
#         print(‘生产了{}-{}‘.format(food,i))
#         time.sleep(random.random())
#     q.put(None)
#     q.put(None)
#     q.put(None)
#
# def consumer(q,name):
#     while True:
#         food=q.get()
#         if food==None:break
#         print(‘{}吃了{}‘.format(name,food))
#
# if __name__==‘__main__‘:
#     q=Queue()
#     p1=Process(target=producer,args=(q,‘包子‘))
#     p1.start()
#     p2=Process(target=producer,args=(q,‘骨头‘))
#     p2.start()
#     c1=Process(target=consumer,args=(q,‘alex‘))
#     c1.start()
#     c2=Process(target=consumer,args=(q,‘wusir‘))
#     c2.start()
队列版
技术分享图片
# JoinableQueue版  (可以感知数据的处理 task_done)
# from multiprocessing import Process
# from multiprocessing import JoinableQueue
# import random
# import time
#
# def producer(q,food):
#     for i in range(5):
#         q.put(‘{}-{}‘.format(food,i))
#         print(‘生产了{}-{}‘.format(food,i))
#         time.sleep(random.random())
#     q.join()#等待消费者把所有数据处理完
#
# def consumer(q,name):
#     while True:
#         food=q.get()
#         print(‘{}吃了{}‘.format(name,food))
#         q.task_done()#消费完了
#
#
# if __name__==‘__main__‘:
#     q=JoinableQueue()
#     p1=Process(target=producer,args=(q,‘包子‘))
#     p1.start()
#     p2=Process(target=producer,args=(q,‘骨头‘))
#     p2.start()
#     c1=Process(target=consumer,args=(q,‘alex‘))
#     c1.daemon=True
#     c1.start()
#     c2=Process(target=consumer,args=(q,‘wusir‘))
#     c2.daemon=True
#     c2.start()
#
#     p1.join()#等待p1执行完毕
#     p2.join()#等待p2执行完毕
#生产者生产的数据全部被消费 —— 生产者进程结束 —— 主进程代码执行结束 —— 消费者守护进程结束
JoinableQueue版
技术分享图片
# IPC机制 队列Quere
# 管道 Pipe 双向通信
# from multiprocessing import Pipe
# p1,p2=Pipe()#支持双向通信
# p1.send(‘hello‘)
# print(p2.recv())
# p2.send(‘hi‘)
# print(p1.recv())
# p1.close()
# p2.close()

#EOFerror错误:双向通信的一端关闭
#队列=管道(没有锁的机制,数据不安全)+锁
#队列 在同一台机器上的多个进程之间通信

#利用管道实现生产者消费者模型
# from multiprocessing import Lock
# from multiprocessing import Process
# from multiprocessing import Pipe
#
# def producer(p,n):
#     produce,consume=p
#     consume.close()
#     for i in range(n):
#         produce.send(i)
#     produce.send(None)
#     produce.send(None)
#     produce.close()
#
# def consumer(p,name,lock):
#     produce,consume=p
#     produce.close()
#     while True:
#         lock.acquire()
#         food=consume.recv()
#         lock.release()
#         if food:
#             print(‘{}收到包子:{}‘.format(name,food))
#         else:
#             consume.close()
#             break
#
# if __name__==‘__main__‘:
#     produce,consume=Pipe()
#     lock=Lock()
#     p1=Process(target=producer,args=((produce,consume),10))
#     c1=Process(target=consumer,args=((produce,consume),‘c1‘,lock))
#     c2=Process(target=consumer,args=((produce,consume),‘c2‘,lock))
#
#     c1.start()
#     c2.start()
#     p1.start()
#
#     produce.close()
#     consume.close()
#
#     c1.join()
#     c2.join()
#     p1.join()
管道版

12,Manager

技术分享图片
# IPC-Manager
# import time
# from multiprocessing import Manager
# from multiprocessing import Process
#
# def func(dic):
#     print(dic)
#
# if __name__==‘__main__‘:
#     m=Manager()
#     d=m.dict({‘count‘:0})
#     print(d)
#     p=Process(target=func,args=(d,))
#     p.start()

# Manager : dict list pipe ,并不提供数据安全的支持
# from multiprocessing import Manager,Process,Lock
# def work(d,lock):
#     lock.acquire()
#     d[‘count‘]-=1
#     lock.release()
#
# if __name__==‘__main__‘:
#     lock=Lock()
#     m=Manager()
#     dic=m.dict({‘count‘:100})#共享的数据
#     l=[]
#     for i in range(10):
#         p=Process(target=work,args=(dic,lock))
#         p.start()
#         l.append(p)
#     [p.join() for p in l]
#     print(dic)
View Code

13,进程池

技术分享图片
#进程池
# import os
# import random
# from multiprocessing import Process
# from multiprocessing import Pool
# import time

# def func(i):
#     i+=1
#     print(i)
#
# if __name__==‘__main__‘:
    # p=Pool(5)#创建了五个进程
    # start_time=time.time()
    # p.map(func,range(100))#target=func,args=next(iterable)
    # p.close()#不允许再向进程池中添加任务
    # p.join()
    # end_time=time.time()
    # print(end_time-start_time)

#进程池2
# import time
# from multiprocessing import Pool
#
# def func(i):
#     time.sleep(1)
#     i+=1
#     # print(i)
#     return i+1
#
# if __name__==‘__main__‘:
#     p=Pool(5)
#     for i in range(20):
#         res = p.apply(func, args=(i,))  # 同步提交结果 顺序执行代码 直接调用之后得到返回值
#         print(res)

    # res_l=[]
#     for i in range(20):
#         res=p.apply_async(func,args=(i,))#异步提交任务的机制
#         res_l.append(res)
#         # print(res.get())#阻塞:等待任务结果
#     p.close()#close必须加在join前,不允许再添加新的任务了
#     p.join()#等待子进程结束在向下执行
#     [print(i.get()) for i in res_l]#异步调用获取函数的返回值
View Code

14,回调函数

技术分享图片
# callback 回调函数 :主进程执行 参数是子进程执行的函数的返回值

# import os
# import time
# from multiprocessing import Pool
#
# def func(i):
#     print(‘子进程{}:{}‘.format(i,os.getpid()))
#     return i*"*"
#
# def call(arg):#回调函数是在主进程中完成的 不能传参数 只能接受多进程函数的返回值
#     print(‘回调:‘,os.getpid())
#     print(arg)
#
# if __name__==‘__main__‘:
#     print(‘---------->‘,os.getpid())
#     p=Pool(5)
#     for i in range(10):
#         p.apply_async(func,args=(i,),callback=call)
#     p.close()
#     p.join()


#回调函数应用
# from urllib.request import urlopen
# import requests
# from multiprocessing import Pool
# def get_url(url):
#     ret = requests.get(url)
#     return {‘url‘:url,
#             ‘status_code‘:ret.status_code,
#             ‘content‘:ret.text}
#
# def parser(dic):
#     print(dic[‘url‘],dic[‘status_code‘],len(dic[‘content‘]))
#     # 把分析结果写到文件里
# if __name__ == ‘__main__‘:
#     url_l = [
#         ‘http://www.baidu.com‘,
#         ‘http://www.sogou.com‘,
#         ‘http://www.hao123.com‘,
#         ‘http://www.yangxiaoer.cc‘,
#         ‘http://www.python.org‘
#     ]
#     p = Pool(4)
#     for url in url_l:
#         p.apply_async(get_url,args=(url,),callback=parser)
#     p.close()
#     p.join()
View Code

15,线程与协程

技术分享图片
# -----------------------------------------------------
# 线程是CPU调度的最小单位
# 进程是资源分配的最小单位
# 与进程相比,开启线程的时空开销小 cpu在线程之间切换快
# 一个程序中 可以同时有多进程和线程

# ---------------------------------------------------------------
#线程
# import os
# import time
# from threading import Thread
#
# def func():
#     time.sleep(1)
#     print(‘子线程:{}‘.format(os.getpid()))
# for i in range(10):
#     t=Thread(target=func)
#     t.start()
#     t.join()
# print(‘主线程‘,os.getpid())
#主线程和子线程的进程id相同,属于同一个进程


#自定义线程
# import os
# import time
# from threading import Thread
#
# class MyThread(Thread):
#     count=0#静态属性
#     def __init__(self,arg1,arg2):
#         super().__init__()
#         self.arg1=arg1
#         self.arg2=arg2
#     def run(self):
#         MyThread.count+=1
#         time.sleep(1)
#         print(‘{}-{}-{}-{}‘.format(self.name,os.getpid(),self.arg1,self.arg2))
#
# for i in range(10):
#     t=MyThread(i,i*"*")
#     t.start()
# print(t.count)

# import time
# import threading
#
# def func(i):
#     time.sleep(0.5)
#     print(i,threading.current_thread().name,threading.current_thread().ident)
#     #线程名字 线程id
# for i in range(10):
#     t=threading.Thread(target=func,args=(i,))
#     t.start()
#
# print(threading.enumerate())#返回正在运行着的线程列表
# print(len(threading.enumerate()))#11 #主线程+10个子线程
# print(threading.activeCount())#活跃线程数
# -------------------------------------------------------

#守护线程
# import time
# from threading import Thread
# def func():
#     print(‘子线程开始执行‘)
#     time.sleep(3)
#     print(‘子线程执行完毕‘)
#
# t=Thread(target=func)
# t.setDaemon(True)
# t.start()
# print(‘==========‘)
# -----------------------------------------------------

#
# import time
# from threading import Thread
# from threading import Lock
#
# n=100
# lock=Lock()
# def func():
#     global n
#     time.sleep(2)
#     lock.acquire()
#     temp=n#从进程中获取n
#     time.sleep(0.01)
#     n=temp-1#得到结果,在存储回进程
#     lock.release()
#
#
# for i in range(5):
#     t=Thread(target=func)
#     t.start()
#     t.join()
# print(n)

# -----------------------------------------
#死锁

# 科学家就餐实例 有问题
# import time
# from threading import RLock#递归锁
# from threading import Lock#互斥锁
# from threading import Thread
# m=RLock()
# kz=RLock()
#
# def eat(name):
#     kz.acquire()
#     print(‘{}拿到筷子了‘.format(name))
#     m.acquire()
#     print(‘{}拿到面了‘.format(name))
#     print(‘{}吃面‘.format(name))
#     m.release()
#     kz.release()
#
# def eat2(name):
#     m.acquire()
#     print(‘{}拿到面了‘.format(name))
#     kz.acquire()
#     print(‘{}拿到筷子了‘.format(name))
#     print(‘{}吃面‘.format(name))
#     kz.release()
#     m.release()
#
# Thread(target=eat,args=(‘alex‘,)).start()
# Thread(target=eat2,args=(‘wusir‘,)).start()
# Thread(target=eat,args=(‘yuan‘,)).start()
# Thread(target=eat2,args=(‘haifeng‘,)).start()

# ---------------------------------------------------
#信号量
# import time
# import random
# from threading import Thread
# from threading import Semaphore
#
# def func(n,sem):
#     sem.acquire()
#     print(‘thread-{} start‘.format(n))
#     time.sleep(random.random())
#     print(‘thread-{} end‘.format(n))
#     sem.release()
# sem=Semaphore(5)#一把锁有五把钥匙
# for i in range(20):
#     Thread(target=func,args=(i,sem)).start()

# -----------------------------------------------------
#事件
# 刚刚创建的时候 flag=False
# wait 阻塞 flag=False
# set False-->True
# clear True-->False

#连接数据库实例
# import time
# import random
# from threading import Event
# from threading import Thread

# e=Event()
# def conn_mysql():#连接数据库
#     count=1
#     while not e.is_set():#当事件的flag为False时才执行循环内的语句
#         if count>3:
#             raise TimeoutError
#         print(‘尝试连接第{}次‘.format(count))
#         count+=1
#         e.wait(0.5)#一直阻塞变成了只阻塞0.5
#     print(‘连接成功‘)# 收到check_conn函数内的set指令,让flag变为True跳出while循环,执行本句代码
#
# def check_conn():
#     ‘‘‘
#     检测数据库服务器的连接是否正常
#     ‘‘‘
#     time.sleep(random.randint(1,2))# 模拟连接检测的时间
#     e.set()# 告诉事件的标志数据库可以连接
#
# check=Thread(target=check_conn)
# check.start()
# conn=Thread(target=conn_mysql)
# conn.start()

# -----------------------------------------
#条件
# import threading
# def run(n):
#     con.acquire()
#     con.wait()
#     print(‘run the thread:{}‘.format(n))
#     con.release()
#
# if __name__==‘__main__‘:
#     con=threading.Condition()#条件=锁+wait的功能
#     for i in range(10):
#         t=threading.Thread(target=run,args=(i,))
#         t.start()
#     while True:
#         inp=input(‘>>>‘)
#         if inp==‘q‘:
#             break
#         con.acquire()# condition中的锁 是 递归锁
#         if inp==‘all‘:
#             con.notify_all()
#         else:
#             con.notify(int(inp)) # 传递信号 notify(1) --> 可以放行一个线程
#         con.release()

# -------------------------------------------
#定时器
# from threading import Timer
# def hello():
#     print(‘Hello World‘)
#
# while True:#每隔一段时间 开启一个线程
#     t=Timer(10,hello)#定时开启一个线程 执行一个任务
#     t.start()
# ---------------------------------------
#队列 安全的
# import queue
# q=queue.Queue()#先进先出
# q.put(1)
# q.put(2)
# q.put(3)
# print(q.qsize())
# print(q.get())
# print(q.get())
# print(q.get())
#
#
# obj=queue.LifoQueue()#后进先出:栈
# obj.put(1)
# obj.put(2)
# obj.put(3)
# obj.put(4)
# print(obj.get())
# print(obj.get())
# print(obj.get())
# print(obj.get())

# import queue
# pq=queue.PriorityQueue()#值越小越优先 值相同就ascii码小的先出
# pq.put(‘x‘)
# pq.put(‘a‘)
# pq.put(‘z‘)
# print(pq.get())
# print(pq.get())

# --------------------------------------------
# concurrent
# import time
# import random
# from concurrent import futures
#
# def funcname(n):
#     print(n)
#     time.sleep(random.randint(1,3))
#     return n*"*"
# def call(args):
#     print(args.result())
#
# thread_pool=futures.ThreadPoolExecutor(5)
# # thread_pool.map(funcname,range(10))# map,天生异步,接收可迭代对象的数据,不支持返回值
# f_list=[]
# for i in range(10):
#     f=thread_pool.submit(funcname,i)#submit合并了创建线程对象和start的功能
#     f_list.append(f)
# # thread_pool.shutdown()#close() join()
# for f in f_list:# 一定是按照顺序出结果
#     print(f.result())#f.result()阻塞 等f执行完得到结果
#
# # 回调函数 add_done_callback(回调函数的名字)
# thread_pool.submit(funcname,1).add_done_callback(call)
# # 统一了入口和方法 简化了操作 降低了学习的时间成本

# ---------------------------------------------
#协程介绍
# def func1():
#     print(1)
#     yield
#     print(3)
#     yield
#
# def func2():
#     g = func1()
#     next(g)
#     print(2)
#     next(g)
#     print(4)
#
# func2()

# def consumer():
#     while True:
#         n = yield
#         print(‘消费了一个包子%s‘%n)
#
# def producer():
#     g = consumer()
#     next(g)
#     for i in range(10):
#         print(‘生产了包子%s‘%i)
#         g.send(i)
#
# producer()
# import time
# from greenlet import greenlet   # 在单线程中切换状态的模块
# def eat1():
#     print(‘吃鸡腿1‘)
#     g2.switch()
#     time.sleep(5)
#     print(‘吃鸡翅2‘)
#     g2.switch()
#
# def eat2():
#     print(‘吃饺子1‘)
#     g1.switch()
#     time.sleep(3)
#     print(‘白切鸡‘)
#
# g1 = greenlet(eat1)
# g2 = greenlet(eat2)
# g1.switch()
# gevent内部封装了greenlet模块



# #串行执行
# import time
# def consumer(res):
#     ‘‘‘任务1:接收数据,处理数据‘‘‘
#     pass
#
# def producer():
#     ‘‘‘任务2:生产数据‘‘‘
#     res=[]
#     for i in range(100000000):
#         res.append(i)
#     return res
#
# start=time.time()
# #串行执行
# res=producer()
# consumer(res) #写成consumer(producer())会降低执行效率
# stop=time.time()
# print(stop-start) #1.5536692142486572
#
#
#
# #基于yield并发执行
# import time
# def consumer():
#     ‘‘‘任务1:接收数据,处理数据‘‘‘
#     while True:
#         x=yield
#
# def producer():
#     ‘‘‘任务2:生产数据‘‘‘
#     g=consumer()
#     next(g)
#     for i in range(100000000):
#         g.send(i)
#
# start=time.time()
# #基于yield保存状态,实现两个任务直接来回切换,即并发的效果
# #PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
# producer()
#
# stop=time.time()
# print(stop-start)

# 在代码之间切换执行 反而会降低效率
# 切换 不能规避IO时间

# 如果 在同一个程序中 有IO的情况下 才切换  会让效率提高很多
# yield greenlet 都不能在切换的时候 规避IO时间


# gevent
from gevent import monkey;monkey.patch_all()
import time     # time socket urllib requests
import gevent   # greenlet gevent在切换程序的基础上又实现了规避IO
from threading import current_thread
def func1():
    print(current_thread().name)
    print(123)
    time.sleep(1)
    print(456)

def func2():
    print(current_thread().name)   # dummythread
    print(hahaha)
    time.sleep(1)
    print(10jq)

g1 = gevent.spawn(func1)  # 遇见他认识的io会自动切换的模块
g2 = gevent.spawn(func2)
# g1.join()
# g2.join()
gevent.joinall([g1,g2])

#效率对比
from gevent import monkey;monkey.patch_all()
import time     # time socket urllib requests
import gevent   # greenlet gevent在切换程序的基础上又实现了规避IO

def task(args):
    time.sleep(1)
    print(args)

def sync_func():   # 同步
    for i in range(10):
        task(i)

def async_func(): # 异步
    g_l = []
    for i in range(10):
        g_l.append(gevent.spawn(task,i))   # 给写成任务传参数
    gevent.joinall(g_l)

start = time.time()
sync_func()
print(time.time() - start)

start = time.time()
async_func()
print(time.time() - start)

#爬取网页信息的例子
from gevent import monkey;monkey.patch_all()
import time
import gevent
import requests

# 爬取网页
# 10个网页
# 协程函数去发起10个网页的爬取任务
def get_url(url):
    res = requests.get(url)
    print(url,res.status_code,len(res.text))

url_lst =[
    http://www.sohu.com,
    http://www.baidu.com,
    http://www.qq.com,
    http://www.python.org,
    http://www.cnblogs.com,
    http://www.mi.com,
    http://www.apache.org,
    https://www.taobao.com,
    http://www.360.com,
    http://www.7daysinn.cn/
]

start = time.time()
for url in url_lst:
    get_url(url)
print(time.time() - start)

#爬取网页效率
from gevent import monkey;monkey.patch_all()
import time
import gevent
import requests

# 爬取网页
# 10个网页
# 协程函数去发起10个网页的爬取任务
def get_url(url):
    res = requests.get(url)
    print(url,res.status_code,len(res.text))

url_lst =[
    http://www.sohu.com,
    http://www.baidu.com,
    http://www.qq.com,
    http://www.python.org,
    http://www.cnblogs.com,
    http://www.mi.com,
    http://www.apache.org,
    https://www.taobao.com,
    http://www.360.com,
    http://www.7daysinn.cn/
]

g_lst = []
start = time.time()
for url in url_lst:
    g = gevent.spawn(get_url,url)
    g_lst.append(g)
gevent.joinall(g_lst)
print(time.time() - start)

#IO模型
# 概念
# 阻塞 非阻塞 同步 异步
# 阻塞 time.sleep(1)
# 异步 同时执行几个事儿
# 同步 两个事儿 一个一个的执行

# 网络IO模型
# 1.阻塞IO
# 2.非阻塞IO
# 3.IO多路复用
# 4.信号驱动IO
# 5.异步IO

# 网络IO
# recv recvfrom accept requests.get()
# send connect sendto

# IO的两个阶段
# 数据准备阶段
# 数据copy阶段

# 阻塞IO
#主进程的阻塞问题 ,多进程 多线程 分离了阻塞
# 真的解决了这些阻塞么?
# 多进程和多线程来说
# 来几个人请求 就要开几个线程
# 进程线程不能无限开
# 池  —— > 4
# 以后用进程都用进程池 单纯的进程池不能满足用户的需求,只适合小并发的问题

# 真正需要我们解决的是I/O问题

# 非阻塞IO
View Code

16,其他

技术分享图片
#server 

#多进程应用
# import socket
# from multiprocessing import Process
#
# def talk(conn):
#     conn.send(b‘connected‘)
#     res=conn.recv(1024)
#     print(res)
#
# if __name__==‘__main__‘:
#     sk=socket.socket()
#     sk.bind((‘127.0.0.1‘,8080))
#     sk.listen(5)
#     while True:
#         conn,addr=sk.accept()
#         p=Process(target=talk,args=(conn,))
#         p.start()
#     conn.close()
#     sk.close()


#多线程应用
# import socket
# from threading import Thread
#
# def talk(conn):
#     conn.send(b‘connected‘)
#     res=conn.recv(1024)
#     print(res)
#     conn.close()
#
#
# sk=socket.socket()
# sk.bind((‘127.0.0.1‘,8080))
# sk.listen(5)
# while True:
#     conn,addr=sk.accept()
#     p=Thread(target=talk,args=(conn,))
#     p.start()
# sk.close()

#聊天
# from gevent import monkey
# monkey.patch_all()
# import gevent
# import socket
# def talk(conn):
#     while True:
#         ret = conn.recv(1024).decode(‘utf-8‘)
#         print(ret)
#         conn.send(ret.upper().encode(‘utf-8‘))
#     conn.close()
#
# sk = socket.socket()
# sk.bind((‘127.0.0.1‘,8080))
# sk.listen()
# while True:
#     conn,addr = sk.accept()
#     gevent.spawn(talk,conn)
# sk.close()

#非阻塞IO
# import socket
# sk = socket.socket()
# sk.bind((‘127.0.0.1‘,8080))
# sk.listen()
# sk.setblocking(False)
# conn_lst = []
#
# while True:
#     try:
#         conn,addr = sk.accept()   #非阻塞  有链接来
#         conn_lst.append(conn)
#     except BlockingIOError:
#         del_lst = []
#         for c in conn_lst:    # 才能执行这一句
#             try:
#                 msg = c.recv(10).decode(‘utf-8‘)  # recv不会阻塞
#                 if not msg:
#                     c.close()
#                     del_lst.append(c)
#                 else:
#                     print(msg)
#                     c.send(msg.upper().encode(‘utf-8‘))
#             except BlockingIOError:
#                 pass
#         if del_lst:
#             for del_item in del_lst:
#                 conn_lst.remove(del_item)

#IO多路复用
import socket
import select
sk = socket.socket()
sk.bind((127.0.0.1,8099))
sk.listen()

read_lst = [sk]
while True:
    rl,wl,xl = select.select(read_lst,[],[])   # select阻塞,rl可以读的 wl可以写的 xl可以改的  [sk,conn]
    for item in rl:
        if item == sk:
            conn,addr = item.accept()  # 有数据等待着它接收
            read_lst.append(conn)
        else:
            ret = item.recv(1024).decode(utf-8)
            if not ret:
                item.close()
                read_lst.remove(item)
            else:
                print(ret)
                item.send((received %s%ret).encode(utf-8))

# readlst [sk,conn,conn2,conn3]  100 问一百次

# select poll 随着要检测的数据增加 效率会下降

# select  有数目的限制
# poll    能处理的对象更多
# epoll   能处理多对象 不是使用轮询  回调函数 —— linux


#client

# import socket
# sk = socket.socket()
# sk.connect((‘127.0.0.1‘,8080))
# ret = sk.recv(1024)
# print(ret)
# msg = input(‘>>>‘)
# sk.send(msg.encode(‘utf-8‘))
# sk.close

#聊天
# import time
# import socket
# import threading
# def my_client():
#     sk = socket.socket()
#     sk.connect((‘127.0.0.1‘,8080))
#     while True:
#         sk.send(b‘hi‘)
#         ret = sk.recv(1024).decode(‘utf-8‘)
#         print(ret)
#         time.sleep(1)
#     sk.close()
# for i in range(500):
#     threading.Thread(target=my_client).start()

#非阻塞IO
#非阻塞IO
# import time
# import socket
# import threading
# def func():
#     sk = socket.socket()
#     sk.connect((‘127.0.0.1‘,8080))
#     time.sleep(1)
#     sk.send(b‘hi‘)
#     print(sk.recv(10))
#     sk.close()
#
# for i in range(10):
#     threading.Thread(target=func,).start()

#IO多路复用
# import time
# import socket
# import threading
# def client_async(args):
#     sk = socket.socket()
#     sk.connect((‘127.0.0.1‘,8099))
#     for i in range(10):
#         time.sleep(2)
#         sk.send((‘%s[%s] :hello‘%(args,i)).encode(‘utf-8‘))
#         print(sk.recv(1024))
#     sk.close()
#
# for i in range(10):
#     threading.Thread(target=client_async,args=(‘*‘*i,)).start()

# selector_demo
#服务端
# from socket import *
# import selectors
#
# sel=selectors.DefaultSelector()   # 创建一个默认的多路复用模型
# def accept(sk):
#     conn,addr=sk.accept()
#     sel.register(conn,selectors.EVENT_READ,read)
#
# def read(conn):
#     try:
#         data=conn.recv(1024)
#         if not data:   #win8 win10
#             print(‘closing‘,conn)
#             sel.unregister(conn)
#             conn.close()
#             return
#         conn.send(data.upper()+b‘_SB‘)
#     except Exception:    # linux操作系统
#         print(‘closing‘, conn)
#         sel.unregister(conn)
#         conn.close()
#
# sk=socket(AF_INET,SOCK_STREAM)
# sk.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
# sk.bind((‘127.0.0.1‘,8088))
# sk.listen(5)
# sk.setblocking(False) #设置socket的接口为非阻塞
# sel.register(sk,selectors.EVENT_READ,accept) #相当于网select的读列表里append了一个文件句柄server_fileobj,并且绑定了一个回调函数accept
#
# while True:
#     events=sel.select() #检测所有的fileobj,是否有完成wait data的   #[sk,conn]
#     for sel_obj,mask in events:   # 有人触动了你在sel当中注册的对象
#         callback=sel_obj.data #callback=accpet   # sel_obj.data就能拿到当初注册的时候写的accept/read方法
#         callback(sel_obj.fileobj) #accpet(sk)/read(conn)
View Code
技术分享图片
操作系统的发展
    # 没有操作系统
    # 批处理系统 
    # 多道程序系统
    # 分时系统
    # 实时系统
    # 通用操作系统
# 操作系统的功能
# 进程
    # 进程是操作系统中资源分配的最小单位
    # 进程调度算法
        # 先来先服务算法 FCFS
        # 短作业优先算法
        # 时间片轮转算法
        # 多级反馈算法
    # 异步
    # 同步  

    # 阻塞
    # 非阻塞

# 进程的状态 :就绪 运行 阻塞
# 进程的创建与结束
#父进程 子进程

start开启一个进程
join 让主进程等待子进程结束

事件 异步阻塞
# 事件 标志 同时 是所有的进程都陷入阻塞

# 通过队列实现了 主进程与子进程的通信   子进程与子进程之间的通信

# 协程
# IO模型

# Lock锁 —— 降低了程序的执行效率 但是保证了数据的安全性
# 信号量 —— 多把钥匙公用一把锁  sem


# 进程包含着线程
# GIL  全局解释器锁 Cpython解释器
# 去掉GIL 保证数据安全 细粒度的锁 效率更低

# 高计算 多进程
# 高IO   多线程  —— 爬虫 网络

# 线程 更加的轻量级
# 线程是CPU调度的最小单位
# 进程是资源分配的最小单位

# 开启线程的时空开销 都比 开启进程要小
# 且 cpu在线程之间切换 比 在进程之间切换快
# 一个程序中 可以同时有多进程和线程


setblocking = False
recv()

同步  干完一件事 再干一件事儿
异步  同时处理多个任务

recv 数据

阻塞IO    : 工作效率低
非阻塞IO  : 工作效率高,CPU的负担
IO多路复用: 在有多个对象需要IO阻塞的时候,能够有效的减少阻塞带来的时间损耗,
             且能够在一定程度上减少CPU的负担
异步IO : asyncio  异步IO  工作效率高 CPU的负担少
View Code

 

以上是关于并发编程的主要内容,如果未能解决你的问题,请参考以下文章

golang代码片段(摘抄)

《java并发编程实战》

Java并发编程实战 04死锁了怎么办?

Java并发编程实战 04死锁了怎么办?

Java编程思想之二十 并发

golang goroutine例子[golang并发代码片段]