Python并发编程之线程的玩法

Posted Aspirantlu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python并发编程之线程的玩法相关的知识,希望对你有一定的参考价值。

一、线程基础以及守护进程

线程是CPU调度的最小单位

全局解释器锁

全局解释器锁GIL(global interpreter lock)

全局解释器锁的出现主要是为了完成垃圾回收机制的回收机制,对不同线程的引用计数的变化记录的更加精准。

全局解释器锁导致了同一个进程中的多个线程只能有一个线程真正被CPU执行。

GIL锁每执行700条指令才会进行一次(轮转)切换(从一个线程切换到另外一个线程)

节省的是IO操作(不占用CPU)的时间,而不是CPU计算的时间,因为CPU的计算速度非常快,大多数情况下,我们没有办法把一条进程中所有的IO操作都规避掉。

threading模块

import time
from threading import Thread, current_thread, enumerate, active_count


def func(i):
    print('start%s' % i, current_thread().ident)  # 函数中获取当前线程id
    time.sleep(1)
    print('end%s' % i)


if __name__ == '__main__':
    t1 = []
    for i in range(3):
        t = Thread(target=func, args=(i,))
        t.start()
        print(t.ident)  # 查看当前线程id
        t1.append(t)
    print(enumerate(), active_count())
    for t in t1:
        t.join()
print('所有线程执行完毕')

线程是不能从外部强制终止(terminate),所有的子线程只能是自己执行完代码之后就关闭。

current_thread 获取当前的线程对象

current_thread().ident 或者 线程对象.ident 获取当前线程id。

enumerate返回一个列表,存储了所有活着的线程对象,包括主线程

active_count返回一个数字,存储了所有活着的线程个数。

【注意】enumerate导入之后,会和内置函数enumerate重名,需要做特殊的处理

  • from threading import enumerate as en

  • import threading

    threading.enumerate()

面向对象方式开启一个线程

from threading import Thread


class MyThread(Thread):
    def __init__(self, a, b):
        super(MyThread, self).__init__()
        self.a = a
        self.b = b

    def run(self):
        print(self.ident)


t = MyThread(1, 3)
t.start()  # 开启线程,才在线程中执行run方法
print(t.ident)

线程之间的数据是共享的

from threading import Thread

n = 100


def func():
    global n
    n -= 1


t_li = []
for i in range(100):
    t = Thread(target=func)
    t.start()
    t_li.append(t)
for t in t_li:
    t.join()
print(n)

结果是:0

守护线程

  • 主线程会等待子线程结束之后才结束,为什么?

因为主线程结束,进程就会结束。

  • 守护线程随着主线程的结束而结束
  • 守护进程会随着主进程的代码结束而结束,如果主进程代码之后还有其他子进程在运行,守护进程不守护。
  • 守护线程会随着主线程的结束而结束,如果主线程代码结束之后还有其他子线程在运行,守护线程也守护。
import time
from threading import Thread


def son():
    while True:
        print('in son')
        time.sleep(1)


def son2():
    for i in range(3):
        print('in son2...')
        time.sleep(1)


# flag a
t = Thread(target=son)
t.daemon = True
t.start()
# flag b a-->b用时0s
Thread(target=son2).start()

为什么守护线程会在主线程的代码结束之后继续守护其他子线程?

答:因为守护进程和守护线程的结束原理不同。守护进程需要主进程来回收资源,守护线程是随着主线程的结束而结束,其他子线程–>主线程结束–>主进程结束–>整个进程中所有的资源都被回收,守护线程也会被回收。

二、线程锁(互斥锁)

线程之间也存在数据不安全

import dis

a = 0


def func():
    global a
    a += 1


dis.dis(func)  # 得到func方法中的代码翻译成CPU指令
"""
结果
0 LOAD_GLOBAL              0 (a)
2 LOAD_CONST               1 (1)
4 INPLACE_ADD
6 STORE_GLOBAL             0 (a)
8 LOAD_CONST               0 (None)
10 RETURN_VALUE
"""

+=、-=、*=、/=、while、if、带返回值的方法(都是先计算后赋值,前提要涉及到全局变量或静态变量) 等都是数据不安全的,append、pop、queue、logging模块等都是数据安全的。

列表中的方法或者字典中的方法去操作全局变量的时候,数据是安全的。

只有一个线程,永远不会出现线程不安全现象。

采用加锁的方式来保证数据安全。

from threading import Thread, Lock

n = 0


def add(lock):
    for i in range(500000):
        global n
        with lock:
            n += 1


def sub(lock):
    for i in range(500000):
        global n
        with lock:
            n -= 1


t_li = []
lock = Lock()
for i in range(2):
    t1 = Thread(target=add, args=(lock,))
    t1.start()
    t2 = Thread(target=sub, args=(lock,))
    t2.start()
    t_li.append(t1)
    t_li.append(t2)
for t in t_li:
    t.join()
print(n)

线程安全的单例模式

import time
from threading import Thread, Lock


class A:
    __instance = None
    lock = Lock()

    def __new__(cls, *args, **kwargs):
        with cls.lock:
            if not cls.__instance:
                time.sleep(0.00001)
                cls.__instance = super().__new__(cls)
        return cls.__instance


def func():
    a = A()
    print(a)


for i in range(10):
    Thread(target=func).start()

不用考虑加锁的小技巧

  • 不要操作全局变量
  • 不要在类中操作静态变量

因为多个线程同时操作全局变量/静态变量,会产生数据不安全现象。

三、线程锁(递归锁)

from threading import Lock, RLock

# Lock 互斥锁
# RLock 递归(recursion)锁

l = Lock()
l.acquire()
print('希望被锁住的代码')
l.release()

rl = RLock()  # 在同一个线程中可以被acquire多次
rl.acquire()
rl.acquire()
rl.acquire()
print('希望被锁住的代码')
rl.release()

from threading import Thread, RLock


def func(i, lock):
    lock.acquire()
    lock.acquire()
    print(i, ':start')
    lock.release()
    lock.release()
    print(i, ':end')


lock = RLock()
for i in range(5):
    Thread(target=func, args=(i, lock)).start()

互斥锁与递归锁

递归锁在同一个线程中可以被acquire多次,而互斥锁不行

互斥锁效率高,递归锁效率相对低

多把互斥锁容易产生死锁现象,递归锁可以快速解决死锁

四、死锁

死锁:指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象。

死锁现象是怎么产生的?

答:有多把锁,并且在多个线程中交叉使用。与互斥锁、递归锁无关,都会发生死锁。如果是互斥锁,出现了死锁现象,最快速的解决方案是把所有的互斥锁都改成一把递归锁(noodle_lock = fork_lock = RLock()),程序的效率会降低。

from threading import Thread, Lock
import time
noodle_lock = Lock()
fork_lock = Lock()


def eat1(name):
    noodle_lock.acquire()
    print(name, '抢到面了')
    fork_lock.acquire()
    print(name, '抢到叉子了')
    print(name, '吃面')
    time.sleep(0.0001)
    fork_lock.release()
    print(name, '放下叉子了')
    noodle_lock.release()
    print(name, '放下面了')


def eat2(name):
    fork_lock.acquire()
    print(name, '抢到叉子了')
    noodle_lock.acquire()
    print(name, '抢到面了')
    print(name, '吃面')
    noodle_lock.release()
    print(name, '放下面了')
    fork_lock.release()
    print(name, '放下叉子了')


Thread(target=eat1, args=('lucy',)).start()
Thread(target=eat2, args=('jack',)).start()
Thread(target=eat1, args=('rose',)).start()
Thread(target=eat2, args=('disen',)).start()

五、队列

队列:线程之间数据安全的容器

线程队列:数据安全,先进先出

原理:加锁 + 链表

Queue

fifo 先进先出的队列

get和put
import queue


q = queue.Queue(3)  # fifo 先进先出的队列

q.put(1)
q.put(2)
print(q.get())
print(q.get())

1
2
get_nowait
import queue

# from queue import Empty  # 不是内置的错误类型,而是queue模块中的错误
q = queue.Queue()  # fifo 先进先出的队列
try:
    q.get_nowait()
except queue.Empty:
    pass
print('队列为空,继续执行其他代码')

put_nowait

用的很少,因为队列满时,抛异常,数据放不进去,丢失了。

LifoQueue

后进先出的队列,也就是栈。last in first out

from queue import LifoQueue
lq = LifoQueue()
lq.put(1)
lq.put(2)
print(lq.get())
print(lq.get())

2
1

PriorityQueue

优先级队列,按照放入数据的第一位数值从小到大输出

from queue import PriorityQueue

priq = PriorityQueue()
priq.put((2, 'lucy'))
priq.put((0, 'rose'))
priq.put((1, 'jack'))
print(priq.get())
print(priq.get())
print(priq.get())

(0, 'rose')
(1, 'jack')
(2, 'lucy')

三种队列使用场景

先进先出:用于处理服务类任务(买票任务)

后进先出:算法中用的比较多

优先级队列:比如,VIP制度,VIP用户优先;

六、相关面试题

请聊聊进程队列的特点和实现原理

特点:实现进程之间的通信;数据安全;先进先出。

实现原理:基于管道 + 锁 实现的,管道是基于文件级别的socket + pickle 实现的。

你了解生产者消费者模型吗,如何实现

了解

为什么了解?工作经验

	采集图片/爬取音乐:由于要爬取大量的数据,想提高爬取效率

	有用过一个生产者消费者模型,这个模型是我自己写的,消息中间件,用的是xxx(redis),我获取网页的过程作为生产者,分析网页,获取所有歌曲歌曲链接的过程作为消费者。

	自己写监控,或者是自己写邮件报警系统,监控程序作为生产者,一旦发现有问题的程序,就需要把要发送的邮件信息交给消息中间件redis,消费者就从中间件中取值,然后来处理发邮件的逻辑。

什么时候用过?

	项目 或者 例子,结合上面一起

在python中实现生产者消费者模型可以用哪些机制

	消息中间件

	celery(分布式框架):定时发短信的任务

从你的角度说说进程在计算机中扮演什么角色

进程用来管理一个运行中的程序的资源,是资源分配的最小单位

进程与进程之间内存是隔离的

进程是由操作系统负责调度的,并且多个进程之间是一种竞争关系,所以我们应该对进程的三状态时刻关注,尽量减少进程中的IO操作,或者在进程里面开线程来规避IO,让我们写的程序在运行的时候能够更多的占用CPU资源。

为什么线程之间的数据不安全

线程之间数据共享

多线程的情况下,

	如果在计算某一个变量的时候,还要进行赋值操作,这个过程不是由一条完整的CPU指令完成的;

	如果在判断某个bool表达式之后,再做某些操作,这个过程也不是由一条完整的CPU指令完成的;

	在中间发生了GIL锁的切换(时间片的轮转),可能会导致数据不安全。

读程序,请确认执行到最后number的长度是否一定为 1

import threading
import time

# loop = 1E7  # 10000000.
loop = int(1E7)  # 10000000


def _add(loop: int = 1):
    global numbers
    for _ in range(loop):
        numbers.append(0)


def _sub(loop: int = 1):
    global numbers
    for _ in range(loop):
        while not numbers:
            time.sleep(1E-8)
        numbers.pop()


numbers = [0]
ta = threading.Thread(target=_add, args=(loop,))
ts = threading.Thread(target=_sub, args=(loop,))
# ts1 = threading.Thread(target=_sub, args=(loop,))

ta.start()
ts.start()
# ts1.start()

ta.join()
ts.join()
# ts1.join()

因为只开启了一个进行pop操作的线程,如果开启多个pop操作的线程,必须在while前面加锁,因为可能有两个线程,一个执行了while not numbers,发生了GIL的切换,另外一个线程执行完了代码,numbers刚好没有了数据,导致结果一个pop成功,一个pop不成功。

所以number长度一定为1,如果把注释去了,不一定为1

读程序,请确认执行到最后number的长度是否一定为 1

import threading
import time

loop = int(1E7)


def _add(loop: int = 1):
    global numbers
    for _ in range(loop):
        numbers.append(0)


def _sub(loop: int = 1):
    global numbers
    for _ in range(loop):
        while not numbers:
            time.sleep(1E-8)
        numbers.pop()


numbers = [0]
ta = threading.Thread(target=_add, args=(loop,))
ts = threading.Thread(target=_sub, args=(loop,))

ta.start()
ta.join()
ts.start()
ts.join()

一定为1,因为是同步的。

读程序,请确认执行到最后number是否一定为 0

import threading

loop = int(1E7)


def _add(loop: int = 1):
    global numbers
    for _ in range(loop):
        numbers += 1


def _sub(loop: int = 1):
    global numbers
    for _ in range(loop):
        numbers -= 1


numbers = 0
ta = threading.Thread(target=_add, args=(loop,))
ts = threading.Thread(target=_sub, args=(loop,))

ta.start()
ta.join()
ts.start()
ts.join()

一定等于0,因为是同步的。

读程序,请确认执行到最后number是否一定为 0

import threading

loop = int(1E7)


def _add(loop: int = 1):
    global numbers
    for _ in range(loop):
        numbers += 1


def _sub(loop: int = 1):
    global numbers
    for _ in range(loop):
        numbers -= 1


numbers = 0
ta = threading.Thread(target=_add, args=(loop,))
ts = threading.Thread(target=_sub, args=(loop,))

ta.start()
ts.start()
ta.join()
ts.join()

不一定为0,因为是异步的且存在 += 操作

七、判断数据是否安全

是否数据共享,是同步还是异步(数据共享并且异步的情况下)

  • +=、-=、*=、/=、a = 计算之后赋值给变量
  • if、while 条件,这两个判断是由多个线程完成的

这两种情况下,数据不安全。

八、进程池 & 线程池

以前,有多少个任务就开多少个进程或线程。

什么是池

要在程序开始的时候,还没有提交任务,先创建几个线程或者进程,放在一个池子里,这就是池

为什么要用池

如果先开好进程/线程,那么有任务之后就可以直接使用这个池中的数据了;并且开好的进程/线程会一直存在在池中,可以被多个任务反复利用,这样极大的减少了开启/关闭/调度进程/调度线程的时间开销。

池中的线程/进程个数控制了操作系统需要调用的任务个数,控制池中的单位,有利于提高操作系统的效率,减轻操作系统的负担。

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# threading模块 没有提供池
# multiprocessing模块 仿照threading增加了Pool(逐渐被淘汰)
# concurrent.futures模块 线程池,进程池都能够用相似的方式开启/使用
ThreadPoolExecutor()  # 参数代表开启多少个线程,线程的个数一般起cpu个数*4(或者*5)
ProcessPoolExecutor()  # 参数代表开启多少个进程,进程的个数一般起cpu个数+1

创建线程池并提交任务

from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import time


def func(a, b):
    print(current_thread().ident, a, b)
    time.sleep(1)


tp = ThreadPoolExecutor(4)  # 创建线程池对象
for i in range(20):
    # tp.submit(func, i, i + 1)
    # 向池中提交任务
    tp.submit(func, a=i, b=i + 1)  # 位置传参,关键字传参都可以

创建进程池并提交任务

from concurrent.futures import ProcessPoolExecutor
import os
import time


def func(a, b):
    print(os.getpid(), 'start', a, b)
    time.sleep(1)
    print(os.getpid(), 'end', a, b)


if __name__ == '__main__':
    tp = ProcessPoolExecutor(4)  Python并发编程之线程的玩法

Python并发编程之进程的玩法

Python并发编程之进程的玩法

Python并发编程之进程的玩法

python并发编程之进程池,线程池concurrent.futures

python并发编程之多线程编程