锁 队列 和池

Posted usherwang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了锁 队列 和池相关的知识,希望对你有一定的参考价值。

1.锁

#线程中是不是会产生数据不安全
#   共享内存
a = 0
def add_f():
   global a
   for i in range(200000):
       a += 1
?
def sub_f():
   global a
   for i in range(200000):
       a -= 1
?
from threading import Thread
?
t1 = Thread(target=add_f)
t1.start()
t2 = Thread(target=sub_f)
t2.start()
t1.join()
t2.join()
print(a)
a = 0
def func():
   global a
   a -= 1
import dis
dis.dis(func)
?
#即便是线程 即便有GIL 也会出现数据不安全的问题
#   1.操作的是全局变量
#   2.做一下操作
#       += -= *= /+ 先计算再赋值才容易出现数据不安全的问题
#       包括 lst[0] += 1 dic[‘key‘]-=1
?
a = 0
def func():
   global a
   a += 1
?
import dis
dis.dis(func)
?
?
a = 0
def add_f(lock):
   global a
   for i in range(200000):
       with lock:
           a += 1
?
def sub_f(lock):
   global a
   for i in range(200000):
       with lock:
           a -= 1
?
from threading import Thread,Lock
lock = Lock()
t1 = Thread(target=add_f,args=(lock,))
t1.start()
t2 = Thread(target=sub_f,args=(lock,))
t2.start()
t1.join()
t2.join()
print(a)
#加锁会影响程序的执行效率,但是保证了数据的安全
?
#互斥锁是锁中的一种:在同一个线程中,不能连续acquire多次
from threading import Lock
lock = Lock()
lock.acquire()
print(‘*‘*20)
lock.release()
lock.acquire()
print(‘-‘*20)
lock.release()

2.单例模式

import time
from threading import Lock
class A:
   __instance = None
   lock = Lock()
   def __new__(cls, *args, **kwargs):
       with cls.lock:
           if not cls.__instance:
               time.sleep(0.1)
               cls.__instance = super().__new__(cls)
       return cls.__instance
   def __init__(self,name,age):
       self.name = name
       self.age = age
?
def func():
   a = A(‘alex‘, 84)
   print(a)
?
from threading import Thread
for i in range(10):
   t = Thread(target=func)
   t.start()

3.死锁现象

import time
from threading import Thread,Lock
noodle_lock = Lock()
fork_lock = Lock()
def eat1(name,noodle_lock,fork_lock):
   noodle_lock.acquire()
   print(‘%s抢到面了‘%name)
   fork_lock.acquire()
   print(‘%s抢到叉子了‘ % name)
   print(‘%s吃了一口面‘%name)
   time.sleep(0.1)
   fork_lock.release()
   print(‘%s放下叉子了‘ % name)
   noodle_lock.release()
   print(‘%s放下面了‘ % name)
?
def eat2(name,noodle_lock,fork_lock):
   fork_lock.acquire()
   print(‘%s抢到叉子了‘ % name)
   noodle_lock.acquire()
   print(‘%s抢到面了‘%name)
   print(‘%s吃了一口面‘%name)
   time.sleep(0.1)
   noodle_lock.release()
   print(‘%s放下面了‘ % name)
   fork_lock.release()
   print(‘%s放下叉子了‘ % name)
?
lst = [‘alex‘,‘wusir‘,‘taibai‘,‘yuan‘]
Thread(target=eat1,args=(lst[0],noodle_lock,fork_lock)).start()
Thread(target=eat2,args=(lst[1],noodle_lock,fork_lock)).start()
Thread(target=eat1,args=(lst[2],noodle_lock,fork_lock)).start()
Thread(target=eat2,args=(lst[3],noodle_lock,fork_lock)).start()
?
?
?
?
?
# 锁
   # 互斥锁
       # 在一个线程中连续多次acquire会死锁
   # 递归锁
       # 在一个线程中连续多次acquire不会死锁
   # 死锁现象
       # 死锁现象是怎么发生的?
           # 1.有多把锁,一把以上
           # 2.多把锁交替使用
   # 怎么解决
       # 递归锁 —— 将多把互斥锁变成了一把递归锁
           # 快速解决问题
           # 效率差
       # ***递归锁也会发生死锁现象,多把锁交替使用的时候
       # 优化代码逻辑
           # 可以使用互斥锁 解决问题
           # 效率相对好
           # 解决问题的效率相对低

4.递归锁

from threading import RLock
# rlock = RLock()
# rlock.acquire()
# print(‘*‘*20)
# rlock.acquire()
# print(‘-‘*20)
# rlock.acquire()
# print(‘*‘*20)
?
# 在同一个线程中,可以连续acuqire多次不会被锁住
?
# 递归锁:
   # 好 :在同一个进程中多次acquire也不会发生阻塞
   # 不好 :占用了更多资源
import time
from threading import RLock,Thread
# noodle_lock = RLock()
# fork_lock = RLock()
noodle_lock = fork_lock = RLock()
print(noodle_lock,fork_lock)
def eat1(name,noodle_lock,fork_lock):
   noodle_lock.acquire()
   print(‘%s抢到面了‘%name)
   fork_lock.acquire()
   print(‘%s抢到叉子了‘ % name)
   print(‘%s吃了一口面‘%name)
   time.sleep(0.1)
   fork_lock.release()
   print(‘%s放下叉子了‘ % name)
   noodle_lock.release()
   print(‘%s放下面了‘ % name)
?
def eat2(name,noodle_lock,fork_lock):
   fork_lock.acquire()
   print(‘%s抢到叉子了‘ % name)
   noodle_lock.acquire()
   print(‘%s抢到面了‘%name)
   print(‘%s吃了一口面‘%name)
   time.sleep(0.1)
   noodle_lock.release()
   print(‘%s放下面了‘ % name)
   fork_lock.release()
   print(‘%s放下叉子了‘ % name)
?
lst = [‘alex‘,‘wusir‘,‘taibai‘,‘yuan‘]
Thread(target=eat1,args=(lst[0],noodle_lock,fork_lock)).start()
Thread(target=eat2,args=(lst[1],noodle_lock,fork_lock)).start()
Thread(target=eat1,args=(lst[2],noodle_lock,fork_lock)).start()
Thread(target=eat2,args=(lst[3],noodle_lock,fork_lock)).start()

5.互斥锁解决死锁问题

import time
from threading import Lock,Thread
lock = Lock()
def eat1(name,noodle_lock,fork_lock):
   lock.acquire()
   print(‘%s抢到面了‘%name)
   print(‘%s抢到叉子了‘ % name)
   print(‘%s吃了一口面‘%name)
   time.sleep(0.1)
   print(‘%s放下叉子了‘ % name)
   print(‘%s放下面了‘ % name)
   lock.release()
?
def eat2(name,noodle_lock,fork_lock):
   lock.acquire()
   print(‘%s抢到叉子了‘ % name)
   print(‘%s抢到面了‘%name)
   print(‘%s吃了一口面‘%name)
   time.sleep(0.1)
   print(‘%s放下面了‘ % name)
   print(‘%s放下叉子了‘ % name)
   lock.release()
?
lst = [‘alex‘,‘wusir‘,‘taibai‘,‘yuan‘]
Thread(target=eat1,args=(lst[0],noodle_lock,fork_lock)).start()
Thread(target=eat2,args=(lst[1],noodle_lock,fork_lock)).start()
Thread(target=eat1,args=(lst[2],noodle_lock,fork_lock)).start()
Thread(target=eat2,args=(lst[3],noodle_lock,fork_lock)).start()

6.队列

import queue
# 线程之间的通信 线程安全
from queue import Queue  # 先进先出队列
q = Queue(5)
q.put(0)
q.put(1)
q.put(2)
q.put(3)
q.put(4)
print(‘444444‘)
?
?
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
?
# 使用多线程 实现一个请求网页 并且把网页写到文件中
# 生产者消费者模型来完成
?
# 5个线程负责请求网页 把结果放在队列里
# 2个线程 负责从队列中获取网页代码 写入文件
?
from queue import LifoQueue  # 后进先出队列
last in first out
lfq = LifoQueue(4)
lfq.put(1)
lfq.put(3)
lfq.put(2)
print(lfq.get())
print(lfq.get())
print(lfq.get())
?
# 先进先出
   # 写一个server,所有的用户的请求放在队列里
       # 先来先服务的思想
# 后进先出
   # 算法
# 优先级队列
   # 自动的排序
   # 抢票的用户级别 100000 100001
   # 告警级别
from queue import PriorityQueue
pq = PriorityQueue()
pq.put((10,‘alex‘))
pq.put((6,‘wusir‘))
pq.put((20,‘yuan‘))
print(pq.get())
print(pq.get())
print(pq.get())

7.池

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
?
# 池
   # 进程池
   # 线程池
# 为什么要有池?
# 10000
# 池
# 预先的开启固定个数的进程数,当任务来临的时候,直接提交给已经开好的进程
# 让这个进程去执行就可以了
# 节省了进程,线程的开启 关闭 切换都需要时间
# 并且减轻了操作系统调度的负担
?
# concurrent.futures
import os
import time
import random
from concurrent.futures import ProcessPoolExecutor
submit + shutdown
def func():
   print(‘start‘,os.getpid())
   time.sleep(random.randint(1,3))
   print(‘end‘, os.getpid())
if __name__ == ‘__main__‘:
   p = ProcessPoolExecutor(5)
   for i in range(10):
       p.submit(func)
   p.shutdown()   # 关闭池之后就不能继续提交任务,并且会阻塞,直到已经提交的任务完成
   print(‘main‘,os.getpid())
?
# 任务的参数 + 返回值
def func(i,name):
   print(‘start‘,os.getpid())
   time.sleep(random.randint(1,3))
   print(‘end‘, os.getpid())
   return ‘%s * %s‘%(i,os.getpid())
if __name__ == ‘__main__‘:
   p = ProcessPoolExecutor(5)
   ret_l = []
   for i in range(10):
       ret = p.submit(func,i,‘alex‘)
       ret_l.append(ret)
   for ret in ret_l:
       print(‘ret-->‘,ret.result())  # ret.result() 同步阻塞
   print(‘main‘,os.getpid())
?
# 开销大
# 一个池中的任务个数限制了我们程序的并发个数
?
# 线程池
from concurrent.futures import ThreadPoolExecutor
def func(i):
   print(‘start‘, os.getpid())
   time.sleep(random.randint(1,3))
   print(‘end‘, os.getpid())
   return ‘%s * %s‘%(i,os.getpid())
tp = ThreadPoolExecutor(20)
ret_l = []
for i in range(10):
   ret = tp.submit(func,i)
   ret_l.append(ret)
tp.shutdown()
print(‘main‘)
for ret in ret_l:
   print(‘------>‘,ret.result())
?
?
?
from concurrent.futures import ThreadPoolExecutor
def func(i):
   print(‘start‘, os.getpid())
   time.sleep(random.randint(1,3))
   print(‘end‘, os.getpid())
   return ‘%s * %s‘%(i,os.getpid())
tp = ThreadPoolExecutor(20)
ret = tp.map(func,range(20))
for i in ret:
   print(i)
ret_l = []
for i in range(10):
   ret = tp.submit(func,i)
   ret_l.append(ret)
tp.shutdown()
print(‘main‘)
?
# 回调函数
import requests
from concurrent.futures import ThreadPoolExecutor
def get_page(url):
   res = requests.get(url)
   return {‘url‘:url,‘content‘:res.text}
?
def parserpage(ret):
   dic = ret.result()
   print(dic[‘url‘])
tp = ThreadPoolExecutor(5)
url_lst = [
   ‘http://www.baidu.com‘,   # 3
   ‘http://www.cnblogs.com‘, # 1
   ‘http://www.douban.com‘,  # 1
   ‘http://www.tencent.com‘,
   ‘http://www.cnblogs.com/Eva-J/articles/8306047.html‘,
   ‘http://www.cnblogs.com/Eva-J/articles/7206498.html‘,
]
ret_l = []
for url in url_lst:
   ret = tp.submit(get_page,url)
   ret_l.append(ret)
   ret.add_done_callback(parserpage)
?
?
# ThreadPoolExcutor
# ProcessPoolExcutor
?
# 创建一个池子
# tp = ThreadPoolExcutor(池中线程/进程的个数)
# 异步提交任务
# ret = tp.submit(函数,参数1,参数2....)
# 获取返回值
# ret.result()
# 在异步的执行完所有任务之后,主线程/主进程才开始执行的代码
# tp.shutdown() 阻塞 直到所有的任务都执行完毕
# map方法
# ret = tp.map(func,iterable) 迭代获取iterable中的内容,作为func的参数,让子线程来执行对应的任务
# for i in ret: 每一个都是任务的返回值
# 回调函数
# ret.add_done_callback(函数名)
# 要在ret对应的任务执行完毕之后,直接继续执行add_done_callback绑定的函数中的内容,并且ret的结果会作为参数返回给绑定的函数
?
# 5个进程
# 20个线程
# 5*20 = 100个并发

 

以上是关于锁 队列 和池的主要内容,如果未能解决你的问题,请参考以下文章

# Java 常用代码片段

# Java 常用代码片段

多线程编程之无锁队列

无锁队列的C代码

CAS无锁队列的实现

高性能无锁队列,代码注释