Python并发和线程

Posted xpc199151

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python并发和线程相关的知识,希望对你有一定的参考价值。

基本概念

并发和并行的区别

并行,parallel。

同时做某些事,可以互不干扰的同一时刻做几件事。

并发,concurrency

也是同时做某些事,但是强调的是,一个时间段内有事情要处理。

  • 并发,假同时,一段时间内同时处理多个任务,单核都可以。
  • 并行,真同时,同时处理多个任务。必须多核。

举例

乡村公路一条车道,半幅路面出现了坑,交警指挥交通。众多车辆在这一时刻要通过路面的事件,这就是并发。交警指挥,车辆排队通过另外半幅路面,一个方向放行3分钟,停止该方向通行,换另一个方向放行。

而高速公路的车道,是双向四车道,所有车辆(数据)可以互不干扰的在自己的车道上奔跑(传输),在同一个时刻,每条车道上可能同时有车辆在跑,是同时发生的概念,这是并行。

主流操作系统上完成并发的手段有进程和线程,主流的编程语言提供了用户空间的调度:协程。Python 也不例外。由于现在的操作系统上的进程越来越轻量,导致进程和线程之间的区别越来越少。事实上,Linux 并没有原生的线程,线程是通过进程实现的。python 中每一个进程会启动一个解释器,而线程会共享一个解释器。

并发的解决

“食堂打饭模型”

中午12点,开饭啦,大家都涌向食堂,这就是并发,如果人很多,这就是高并发。

1队列、缓冲区

假设只有一个窗口,陆续涌入食堂的人,排队打菜是比较好的方式。所以,排队(队列)是一种天然解决并发的办法。

排队就是把人排成队列,先进先出,解决了资源使用的问题。而排成的队列,其实就是一个缓冲地带,就是缓冲区。

而假设女生优先,那么这个窗口就有两队,只要有女生来就是可以先打饭,男生队列等着,女生队伍就是一个优先队列。

例如queue模块的类Queue,LifoQueue,PriorityQueue。

2争抢

只开一个窗口,有可能没有秩序,也就是谁挤进去就给谁打饭,挤到窗口的人占据窗口,直到打到饭菜离开,其他人继续争抢,会有一个人占据着窗口,可以视为锁定窗口,窗口就不能为其他人服务了,这是一种锁机制。

谁争抢到资源就上锁,排他性的锁,其他人只能等候。争抢也是一种高并发解决方案,但是,这样不好,因为有可能有人很长时间都抢不到。

3预处理

如果排长队的原因,是由于每个人打菜等候时间长,因为要吃的菜没有,需要现做,没打着饭不走开,锁定着窗口。这个时候食堂可以提前统计大多数人最爱吃的菜品,将最爱吃的80%的热门菜,提前做好,保证供应,20%的冷门菜,现做。

这样大多数人,就算锁定窗口,也很快就释放窗口了。

这是一种提前加载用户需要的数据的思路,预处理思想,缓存常用。

4并行

成百上千人同时来吃饭,一个队伍搞不定的,多开打饭窗口形成多个队列,如同开多个车道一样,并行打菜。开窗口就得扩大食堂,得多雇人在每一个窗口提供服务,造成成本上升。

日常可以通过购买更多服务器,或多开进程、线程实现并行处理,来解决并发问题。注意这些都是水平扩展的思想。

5提速

提高单个窗口的打饭速度,也是解决并发的方式。打饭人员提高工作技能,或为单个窗口配备更多的服务人员,都是提速的办法。提高单个cpu性能,或单个服务器安装更多的cpu.这是一种垂直扩展思想

6消息中间件(可以理解为外部的队列)

上地、西二旗地铁站外的九曲回肠的走廊,缓冲人流,进去之后再多个安检进站。

常见的消息中间件有RabbitMQ、ActiveMQ(Apache)、RocketMQ(阿里Apache)、Kafka(Apache)等。

当然还有其他手段解决并发问题,但是已经列举了最常用的解决方案,一般来说不同的并发场景用不同的策略,而策略可能是多种方式的优化组合。

例如多开食堂(多地),也可以把食堂建在宿舍生活区(就近),所以说,技术来源于生活。

进程和线程

在实现了线程的操作系统中,线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程的实际运作单位。一个程序的执行实例就是一个进程。

进程(process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。

进程和程序的关系

程序是源代码编译后的文件,而这些文件存放在磁盘上。当程序被操作系统加载到内存中,就是进程,进程中存放着指令和数据(资源),它就是线程的容器。

Linux进程有父进程,子进程,windows的进程是平等关系。

线程,有时候被称为轻量级进程(lightweight process,LWP),是程序执行流的最小单位。一个标准的线程是由线程ID,当前指令指针(PC),寄存器集合和堆栈组成。在许多系统中,创建一个线程比创建一个进程快10——100倍。

进程、线程的理解

现代操作系统中提出进程的概念,每一个进程都认为自己独占所有的计算机硬件资源。进程就是独立的王国,进程间不可以随便的共享数据。而线程就是省份,同一个进程内的线程可以共享进程的资源,每一个线程拥有自己独立的堆栈。

线程的状态

就绪(ready):线程能够运行,但在等待被调度,可能线程刚刚创建启动,或刚刚从阻塞中恢复,或者被其他线程抢占。

运行(running):线程正在运行

阻塞(blocked):线程等待外部事件发生而无法运行,如I/O操作。

终止(terminated):线程完成,或退出,或被取消。

 

 Python中进程和线程

进程会启动一个解释器进程,线程共享一个解释器进程。

Python的线程开发

Python的线程开发,使用标准库threading。

Thread类

#签名
def __init__(self,group = None,target = None,name = None,args = (),kwargs = None,*,daemon = None)

target:线程调用的对象,就是目标函数。

name:为线程起一个名字,可以重名。

args:为目标函数传递实参,元祖。

kwargs:为目标函数关键字传参,字典。

线程启动

import threading

#最简单的线程程序

def worker():
    print("i\'m working")
    print("fineshed")
    
t = threading.Thread(target = worker,name = "worker")#线程对象,创建了一个线程对象,target 参数是一个函数,即线程要执行的逻辑
t.start()# start 启动一个线程,执行完毕后,自动退出,Python 没有提供主动退出线程的方法
结果为:
i\'m working
fineshed

通过threading.Thread创建一个线程对象,target是目标函数,name可以指定名称。

线程是一种假并行,它是用来解决并发问题的。

但是线程没有启动,需要调用start方法。线程之所以执行函数,是因为线程中就是执行代码的,而最简单的封装就是函数,所以还是函数调用。函数执行完毕,线程也就退出了。

由于 python 没有提供退出线程的方法,因此我们一定不能在逻辑中定义死循环,不然线程无法退出。

那么,如果不让线程退出,或者让线程一直工作怎么办呢?

为了保证不退出,通常都会有一个 while True 的死循环。

import threading
import time

def worker():
    while True:
        time.sleep(3)
        print("i\'m working")
    print("fineshed")
t = threading.Thread(target = worker,name = "worker")#线程对象
t.start()#启动

上面的程序写成了死循环,线程就会一直工作。上面的程序一直答应I\'m working,因为一直循环,没有退出机制,所以不会打印fineshed。

import threading
import time

#最简单的线程程序

def worker():
    for _ in range(5):
        time.sleep(1)
        print("i\'m working")
    print("fineshed")

def worker1():
    for _ in range(5):
        time.sleep(1)
        print("i\'m working11111")
    print("fineshed11111111")
    
t = threading.Thread(target = worker,name = "worker")#线程对象
t.start()

t = threading.Thread(target = worker1,name = "worker")#线程对象
t.start()

结果为:
i\'m workingi\'m working11111

i\'m working
i\'m working11111
i\'m working
i\'m working11111
i\'m working
i\'m working11111
i\'m working
fineshed
i\'m working11111
fineshed11111111

 上面程序启动了两个线程,两个线程并发的执行,然后直到执行结束,最后的结果那样,是因为print函数并不是线程安全的。被打断了。

import time
import threading
import logging

def worker(num):
    time.sleep(1)
    print(\'work-{}\'.format(num))

for i in range(5):
    t = threading.Thread(target=worker, args=(i, )) # 启动了五个线程,要启动几个就循环几次
    t.start()

结果为:
work-1work-0work-4

work-2work-3

结果是在等待一秒之后,所有线程同时输出了,并且在一个线程的换行符还没有打印出来的时候,下一个线程就输出了,这就涉及到线程安全的问题了。很显然,print 并不是线程安全的。

线程退出

Python没有提供线程退出的办法,线程在下面情况时会退出。

  1. 线程函数内语句块执行完毕。
  2. 线程函数中抛出未处理的异常。
import threading
import time

def worker():
    count = 0
    while True:
        if(count>5):
            #raise RuntimeError(count)
            #return
            break    
        time.sleep(1)
        print("i\'m working")
        count+=1

t = threading.Thread(target = worker,name = "worker")#线程对象
t.start()#启动
print("fineshed")

结果为:
fineshed
i\'m working
i\'m working
i\'m working
i\'m working
i\'m working
i\'m working

Python的线程没有优先级,没有线程组的概念,也不能被销毁,停止,挂起, 那也就没有恢复、中断了。

import threading
import time

#最简单的线程程序

def worker():
    for _ in range(10):
        time.sleep(1)
        print("i\'m working")
    print("fineshed")

def worker1():
    count = 0
    while True:
        time.sleep(1)
        print("i\'m working11111")
        count+=1
        if count>3:
            raise RuntimeError("anc")
    print("fineshed11111111")
    
t = threading.Thread(target = worker,name = "worker")#线程对象
t.start()

t = threading.Thread(target = worker1,name = "worker")#线程对象
t.start()

上面的例子中,后面这个线程循环了4次,抛出了异常, 这个线程停止执行,上面的线程会继续执行,总共执行10次。

线程的传参

import threading
import time

def add(x,y):
    print("{} + {} = {}".format(x,y,x+y,threading.current_thread().ident))

thread1 = threading.Thread(target = add,name = "add",args = (4,5))#线程对象
thread1.start()#启动
time.sleep(2)

thread2 = threading.Thread(target = add,name = "add",args = (5,),kwargs = {"y":4})#线程对象
thread2.start()#启动
time.sleep(2)

thread3 = threading.Thread(target = add,name = "add",kwargs = {"x":5,"y":4})#线程对象
thread3.start()#启动

结果为:
4 + 5 = 9
5 + 4 = 9
5 + 4 = 9

线程传参和函数传参没有什么区别,本质上就是函数传参。

threading的属性和方法

current_thread():返回当前线程对象

main_thread():返回主线程对象

active_thread():当前处于alive状态的线程个数

enumerate():返回所有活着的线程的列表,不包括已经终止的线程和未开始的线程。

get_ident():返回当前线程的ID,非零整数。

import threading
import time

def worker(n = 5):
    print(threading.current_thread())
    print(threading.main_thread())
    for _ in range(n):
        time.sleep(1)
        print("i\'m working")
    print("fineshed")
    
print(threading.current_thread())

t = threading.Thread(target = worker,name = "abc")
t.start()

结果为:
<_MainThread(MainThread, started 4072)>
<Thread(abc, started 4260)>
<_MainThread(MainThread, started 4072)>
i\'m working
i\'m working
i\'m working
i\'m working
i\'m working
fineshed

 

import threading
import time

def showthreadinfo():
    print("currentthread = {}".format(threading.current_thread()))
    print("main thread = {}".format(threading.main_thread()))
    print("active count = {}".format(threading.active_count()))

def worker():
    count = 0
    showthreadinfo()
    while True:
        if(count>5):
            #raise RuntimeError(count)
            #return
            break    
        time.sleep(1)
        count+=1
        print("i\'m working")

t = threading.Thread(target = worker,name = "worker")#线程对象
t.start()#启动

print("fineshed")

结果为:
currentthread = <Thread(worker, started 2436)>fineshed
main thread = <_MainThread(MainThread, started 844)>
active count = 6

i\'m working
i\'m working
i\'m working
i\'m working
i\'m working
i\'m working
import threading
import time

def worker(n = 5):
    print(threading.current_thread())
    print(threading.main_thread())
    print(threading.active_count())
    print(threading.enumerate())
    for _ in range(n):
        time.sleep(1)
        print("i\'m working")
    print("fineshed")
    
print(threading.current_thread())

t = threading.Thread(target = worker,name = "abc")
t.start()


print(threading.enumerate())

结果为:
<_MainThread(MainThread, started 4072)>
<Thread(abc, started 5036)>[<_MainThread(MainThread, started 4072)>, <Thread(Thread-4, started daemon 3852)>, <Heartbeat(Thread-5, started daemon 5508)>, <HistorySavingThread(IPythonHistorySavingThread, started 4560)>, <ParentPollerWindows(Thread-3, started daemon 5004)>, <Thread(abc, started 5036)>]

<_MainThread(MainThread, started 4072)>
6
[<_MainThread(MainThread, started 4072)>, <Thread(Thread-4, started daemon 3852)>, <Heartbeat(Thread-5, started daemon 5508)>, <HistorySavingThread(IPythonHistorySavingThread, started 4560)>, <ParentPollerWindows(Thread-3, started daemon 5004)>, <Thread(abc, started 5036)>]
i\'m working
i\'m working
i\'m working
i\'m working
i\'m working
fineshed

Thread实例的属性和方法

name:只是一个名字,只是一个标识,名称可以重名,getName(),setName()获取、设置这个名词。

ident:线程ID,它是非0整数,线程启动后才会有ID,否则为None,线程退出,此id依旧可以访问,此ID可以重复使用。

is_alive():返回线程是否活着。

注意:线程的name这是一个名词,可以重复,但是ID必须唯一。但可以在线程退出后再利用。

import threading
import time

def worker():
    count = 0
    while True:
        if(count>5):
            #raise RuntimeError(count)
            #return
            break    
        time.sleep(1)
        count+=1
        print(threading.current_thread().name)

t = threading.Thread(target = worker,name = "worker")#线程对象
print(t.ident)
t.start()#启动

while True:
    time.sleep(1)
    if t.is_alive():
        print("{} {} alive".format(t.name,t.ident))
    else:
        print("{} {} dead".format(t.name,t.ident))
        #t.start()#可以吗?不可以,threads can only be started once

start():启动线程,每一个线程必须且只能执行该方法一次。

run():运行线程函数。

为了演示,派生一个Thread的子类。

start方法

import threading
import time

def worker():
    count = 0
    while True:
        if(count>5):
            #raise RuntimeError(count)
            #return
            break    
        time.sleep(1)
        count+=1
        print("working running")

class MyThread(threading.Thread):
    def start(self):
        print("start______________________")
        super().start()
        
    def run(self):
        print("run________________________")
        super().run()
        
    
t = MyThread(target = worker,name = "worker")#线程对象
t.start()#启动

结果为:
start______________________
run________________________
working running
working running
working running
working running
working running
working running

run方法

import threading
import time

def worker():
    count = 0
    while True:
        if(count>5):
            #raise RuntimeError(count)
            #return
            break    
        time.sleep(1)
        count+=1
        print("working running")

class MyThread(threading.Thread):
    def start(self):
        print("start______________________")
        super().start()
        
    def run(self):
        print("run________________________")
        super().run()
        
    
t = MyThread(target = worker,name = "worker")#线程对象
#t.start()#启动
t.run()

结果为:
run________________________
working running
working running
working running
working running
working running
working running

start()方法会调用run()方法,而run()方法可以运行函数。

这两个方法看似功能重复了,这么来看是不是留一个方法就可以了?

start和run的区别?

在线程函数中,增加打印线程的名字的语句,看看能看到什么信息。

import threading
import time

def worker():
    count = 0
    while True:
        if(count>5):
            #raise RuntimeError(count)
            #return
            break    
        time.sleep(1)
        count+=1
        print("working running")
        print(threading.current_thread().name)

class MyThread(threading.Thread):
    def start(self):
        print("start______________________")
        super().start()
        
    def run(self):
        print("run________________________")
        super().run()
        
    
t = MyThread(target = worker,name = "worker")#线程对象
#t.start()#分别执行start和run
t.run()

结果为:
run________________________
working running
MainThread
working running
MainThread
working running
MainThread
working running
MainThread
working running
MainThread
working running
MainThread
import threading
import time

def worker():
    count = 0
    while True:
        if(count>5):
            #raise RuntimeError(count)
            #return
            break    
        time.sleep(1)
        count+=1
        print("working running")
        print(threading.current_thread().name)

class MyThread(threading.Thread):
    def start(self):
        print("start______________________")
        super().start()
        
    def run(self):
        print("run________________________")
        super().run()
        
    
t = MyThread(target = worker,name = "worker")#线程对象
t.start()#分别执行start和run
#t.run()

结果为:
start______________________
run________________________
working running
worker
working running
worker
working running
worker
working running
worker
working running
worker
working running
worker

使用start方法启动线程,启动了一个新的线程,名字叫做worker运行,但是使用run方法的。并没有启动新的线程,就是在主线程中调用了一个普通的函数而已。

因此,启动线程请使用start方法,才能启动多个线程。

多线程

顾名思义,多个线程,一个进程中如果有多个线程,就是多线程,实现一种并发。

import threading
import time

def worker():
    count = 0
    while True:
        if(count>5):
            #raise RuntimeError(count)
            #return
            break    
        time.sleep(1)
        count+=1
        print("working running")
        print(threading.current_thread().name,threading.current_thread().ident)

class MyThread(threading.Thread):
    def start(self):
        print("start______________________")
        super().start()
        
    def run(self):
        print("run________________________")
        super().run()
        
    
t1 = MyThread(target = worker,name = "worker1")
t2 = MyThread(target = worker,name = "worker2")
t1.start()
t2.start()

结果为:
start______________________
run________________________start______________________

run________________________
working running
worker1 8124
working running
worker2 1212
working running
worker1 8124
working running
worker2 1212
working running
worker1 8124
working running
worker2 1212
working running
worker1 8124
working running
worker2 1212
working running
worker1 8124
working running
worker2 1212
working running
worker1 8124
working running
worker2 1212

可以看到worker1和worker2交替执行,改成run方法试一试?

import threading
import time

def worker():
    count = 0
    while True:
        if(count>5):
            #raise RuntimeError(count)
            #return
            break    
        time.sleep(1)
        count+=1
        print("working running")
        print(threading.current_thread().name,threading.current_thread().ident)

class MyThread(threading.Thread):
    def start(self):
        print("start______________________")
        super().start()
        
    def run(self):
        print("run________________________")
        super().run()
        
    
t1 = MyThread(target = worker,name = "worker1")
t2 = MyThread(target = worker,name = "worker2")
#t1.start()
#t2.start()
t1.run()
t2.run()

结果为:
run________________________
working running
MainThread 844
working running
MainThread 844
working running
MainThread 844
working running
MainThread 844
working running
MainThread 844
working running
MainThread 844
run________________________
working running
MainThread 844
working running
MainThread 844
working running
MainThread 844
working running
MainThread 844
working running
MainThread 844
working running
MainThread 844

没有开新的线程,这就是普通函数调用,所以执行完t1.run(),然后执行t2.run(),这里就不是多线程。

当时用start方法启动线程后,进程内多个活动的线程并行的工作,就是多线程。

一个进程中至少有一个线程,并作为程序的入口,这个线程就是主线程,一个进程至少有一个主线程。其他线程称为工作线程。

线程安全

这个只能在ipython中演示,Python命令行,pycharm都不能演示出效果。

import threading

def worker():
    for x in range(100):
        print("{} is running".format(threading.current_thread().name))
        
for x in range(1,5):
    name = "worker{}".format(x)
    t = threading.Thread(target = worker,name =name)
    t.start()

 

 看代码,应该是一行行打印,但是很多字符串打印在了一起,为什么?

说明,print函数被打断了,被线程切换打断了,print函数分为两步,第一步打印字符串,第二部换行,就在这之间,发生了线程的切换。

这说明print函数是线程不安全的。

线程安全:线程执行一段代码,不会产生不确定的结果,那这段代码就是线程安全的。

上例中,本以为print应该是打印文本之后紧跟着一个换行的,但是有时候确实好几个文本在一起,后面跟上换上。而且发生这种情况的时机不确定,所以,print函数不是线程安全函数。

如果是这样,多线程编程的时候,print输出日志,不能保证一个输出一定后面立即换行了。怎么办?

1.不让print打印换行。

import threading

def worker():
    for x in range(100):
        print("{} is running.\\n".format(threading.current_thread().name),end = "")
        
for x in range(1,5):
    nam

以上是关于Python并发和线程的主要内容,如果未能解决你的问题,请参考以下文章

多个用户访问同一段代码

JUC并发编程 共享模式之工具 JUC CountdownLatch(倒计时锁) -- CountdownLatch应用(等待多个线程准备完毕( 可以覆盖上次的打印内)等待多个远程调用结束)(代码片段

线程学习知识点总结

Python中级精华-并发之启动和停止线程

并发技术12线程锁技术的使用

java并发线程锁技术的使用