并发编程之多线程

Posted 休耕

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程之多线程相关的知识,希望对你有一定的参考价值。

一、threading模块介绍

  multiprocess模块的完全模仿了threading模块的接口,二者在使用层面,有很大的相似性,因而不再详细介绍

  官网链接:点击进入

二、开启线程的两种方式

  multiprocess模块的完全模仿了threading模块的接口,二者在使用层面,有很大的相似性

import time, random
# from multiprocessing import Process
from threading import Thread


def piao(name):
    print(\'%s piaoing\' % name)
    time.sleep(random.randrange(1, 5))
    print(\'%s piao end\' % name)

if __name__ == \'__main__\':
    t1 = Thread(target=piao, args=(\'egon\', ))
    t1.start()  # 主线程向操作系统发信号,又开了一个线程
    print("主线程")   # 执行角度看是主线程,从资源角度看是主进程
# 这个程序总体是一个进程、两个线程
"""
egon piaoing
主进程
egon piao end
"""
开启线程方式一
import time, random
# from multiprocessing import Process
from threading import Thread

class MyThread(Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print("%s piaoing" % self.name)
        time.sleep(random.randrange(1, 5))
        print("%s piao end" % self.name)

if __name__ == \'__main__\':
    t1 = MyThread(\'egon\')
    t1.start()  # 主线程向操作系统发信号,又开了一个线程
    print("主线程")
"""
egon piaoing
主线程
egon piao end
"""
方式二:定制线程

三、在一个进程下开启线程与在一个进程下开启多个子进程的区别

import time
from multiprocessing import Process
from threading import Thread


def piao(name):
    print(\'%s piaoing\' % name)
    time.sleep(2)
    print(\'%s piao end\' % name)

if __name__ == \'__main__\':
    # p1 = Process(target=piao, args=(\'进程\', ))
    # p1.start()
    """
    主线程
    进程 piaoing
    进程 piao end
    """


    t1 = Thread(target=piao, args=(\'线程\', ))
    t1.start()
    """
    线程 piaoing
    主线程
    线程 piao end
    """
    print("主线程")
# 对比可知,线程开销远小于进程,因为进程需要申请内存空间。
1、进程开销远大于线程
from threading import Thread
from multiprocessing import Process

n = 100
def task():
    global n
    n = 0

if __name__ == \'__main__\':
    """进程验证:
    p1 = Process(target=task,)
    p1.start()   # 会把子进程的n改为了0,看是否影响主进程
    p1.join()
    print("主进程", n)   # 主进程 100
    # 由此可见进程间是隔离的,子进程变量修改不影响主进程
    """

    """线程验证"""
    t1 = Thread(target=task, )
    t1.start()
    t1.join()
    print("主线程", n)   # 主线程 0
2、同一进程内的多个线程共享进程地址空间
from threading import Thread
from multiprocessing import Process, current_process  # current_process查看进程ID号
import os   # os.getpid()也可以查看进程ID

n = 100
def task():
    # print(current_process().pid)
    print(\'子进程PID:%s   父进程的PID:%s\' % (os.getpid(), os.getppid()))

if __name__ == \'__main__\':
    p1 = Process(target=task,)
    p1.start()

    # print("主线程", current_process().pid)
    print("主线程", os.getpid())
"""
主线程 6455
子进程PID:6456   父进程的PID:6455
"""
3、pid观察
from threading import Thread
import os   # os.getpid()也可以查看进程ID

n = 100
def task():
    # print(current_process().pid)
    print(\'线程的进程 PID:%s\' % os.getpid())

if __name__ == \'__main__\':
    t1 = Thread(target=task,)
    t1.start()

    # print("主线程", current_process().pid)
    print("主线程", os.getpid())
"""说明两个线程是同一个进程:
线程的进程 PID:6493
主线程 6493
"""
4、研究线程:线程都属于同一个进程

四、练习

1、基于多线程实现并发的套接字通信

# -*- coding:utf-8 -*-
__author__ = \'Qiushi Huang\'

from socket import *
from threading import Thread

# 通讯和建立链接分开,启动不同的线程,大家是并发执行。
def communicate(conn):
    while True:
        try:
            data = conn.recv(1024)
            if not data:break
            conn.send(data.upper())
        except ConnectionResetError:
            break
    conn.close()


def server(ip, port):
    server = socket(AF_INET, SOCK_STREAM)
    server.bind((ip, port))
    server.listen(5)

    while True:
        conn, addr = server.accept()   # 建链接
        t = Thread(target=communicate, args=(conn,))  # 建一个链接创一个线程
        t.start()
        # communicate(conn)

    server.close()


if __name__ == \'__main__\':
    server(\'127.0.0.1\', 8091)   # 主线程

"""
这种解决方案的问题是:当客户端越来越多后,线程也会越来越多,会带来服务崩溃的问题。
"""
多线程并发服务端
# -*- coding:utf-8 -*-
__author__ = \'Qiushi Huang\'

# 使用时,可以一个程序运行多次,这是多个不同的in
from socket import *

client = socket(AF_INET, SOCK_STREAM)
client.connect(("127.0.0.1", 8091))

while True:
    msg = input(">>").strip()
    if not msg:continue
    client.send(msg.encode("utf-8"))
    data = client.recv(1024)
    print(data.decode("utf-8"))

client.close()
客户端

2、编写一个简单的文本处理工具,具备三个任务,一个接收用户输入,一个将用户输入的内容格式化成大写,一个将格式化后的结果存入文件

from threading import Thread
msg_l=[]
format_l=[]
def talk():
    while True:
        msg=input(\'>>: \').strip()
        if not msg:continue
        msg_l.append(msg)

def format_msg():
    while True:
        if msg_l:
            res=msg_l.pop()
            format_l.append(res.upper())

def save():
    while True:
        if format_l:
            with open(\'db.txt\',\'a\',encoding=\'utf-8\') as f:
                res=format_l.pop()
                f.write(\'%s\\n\' %res)

if __name__ == \'__main__\':
    t1=Thread(target=talk)
    t2=Thread(target=format_msg)
    t3=Thread(target=save)
    t1.start()
    t2.start()
    t3.start()
多线程并发执行多项任务

五、线程对象的属性和方法

1、Thread实例对象的方法

isAlive(): 返回线程是否活动的。
getName(): 返回线程名。
setName(): 设置线程名。

2、threading模块提供的一些方法

threading.currentThread(): 返回当前的线程变量。
threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

3、属性和方法的应用与验证

from threading import Thread, currentThread   # 得到线程对象的方法
from threading import active_count    # 得到活跃进程数
from threading import enumerate   # 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
import time

# 需要注意的是线程没有子线程的概念,线程都是属于进程的
def task():
    print("%s is running" % currentThread().getName())   # 对象下有一个getName()方法
    time.sleep(2)
    print("%s is done" % currentThread().getName())

if __name__ == \'__main__\':
    getName()方法返回线程名
    t = Thread(target=task, name=\'子线程1\')
    t.start()
    print("主进程", currentThread().getName())
    """
    子线程1 is running
    主进程 MainThread
    子线程1 is done
    """
getName方法得到线程名
from threading import Thread, currentThread   # 得到线程对象的方法
from threading import active_count    # 得到活跃进程数
from threading import enumerate   # 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
import time

def task():
    print("%s is running" % currentThread().getName())   # 对象下有一个getName()方法
    time.sleep(2)
    print("%s is done" % currentThread().getName())

if __name__ == \'__main__\':
    setName()方法设置线程名
    t = Thread(target=task, name=\'子线程1\')
    t.start()
    t.setName(\'儿子线程1\')   # 修改进程名称
    currentThread().setName("主线程")   # 设置主线程名称(默认是MainThread)
    print(t.isAlive())    # 判断线程是否存活
    print("主进程", currentThread().getName())
    """
    子线程1 is running
    True
    主进程 主线程
    儿子线程1 is done
    """
setName方法和isAlive方法
from threading import Thread, currentThread   # 得到线程对象的方法
from threading import active_count    # 得到活跃进程数
from threading import enumerate   # 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
import time

def task():
    print("%s is running" % currentThread().getName())   # 对象下有一个getName()方法
    time.sleep(2)
    print("%s is done" % currentThread().getName())

if __name__ == \'__main__\':
    t = Thread(target=task, name=\'子线程1\')
    t.start()
    t.setName(\'儿子线程1\')  # 修改进程名称
    t.join()  # 主线程等子进程运行完毕再执行
    currentThread().setName("主线程")  # 设置主线程名称(默认是MainThread)
    print(t.isAlive())  # 判断线程是否存活
    print("主进程", currentThread().getName())
    """
    子线程1 is running
    儿子线程1 is done
    False
    主进程 主线程
    """
join方法主线程等子线程运行完执行
from threading import Thread, currentThread   # 得到线程对象的方法
from threading import active_count    # 得到活跃进程数
from threading import enumerate   # 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
import time

def task():
    print("%s is running" % currentThread().getName())   # 对象下有一个getName()方法
    time.sleep(2)
    print("%s is done" % currentThread().getName())

if __name__ == \'__main__\':
    # 测试threading.active_count方法
    t = Thread(target=task, name=\'子线程1\')
    t.start()
    print(active_count())
    """
    子线程1 is running
    2
    子线程1 is done
    """
threading.active_count方法
from threading import Thread, currentThread   # 得到线程对象的方法
from threading import active_count    # 得到活跃进程数
from threading import enumerate   # 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
import time

def task():
    print("%s is running" % currentThread().getName())   # 对象下有一个getName()方法
    time.sleep(2)
    print("%s is done" % currentThread().getName())

if __name__ == \'__main__\':
    # 对上面改写添加一个join()
    t = Thread(target=task, name=\'子线程1\')
    t.start()
    t.join()   # 运行完才执行主线程,因此后面打印的活跃线程数是一个
    print(active_count())
    """
    子线程1 is running
    子线程1 is done
    1
    """
对线程添加join方法,执行active_count
from threading import Thread, currentThread   # 得到线程对象的方法
from threading import active_count    # 得到活跃进程数
from threading import enumerate   # 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
import time

def task():
    print("%s is running" % currentThread().getName())   # 对象下有一个getName()方法
    time.sleep(2)
    print("%s is done" % currentThread().getName())

if __name__ == \'__main__\':
    # threading.enumerate()方法:返回一个包含正在运行的线程的list
    t = Thread(target=task, name=\'子线程1\')
    t.start()
    print(enumerate())
    """
    子线程1 is running
    [<_MainThread(MainThread, started 4320744256)>, <Thread(子线程1, started 123145383735296)>]
    子线程1 is done
    """
threading.enumerate()方法:返回一个包含正在运行的线程的list

六、守护线程

  一个进程内,如果不开线程,默认就是一个主线程,主线程代码运行完毕,进程被销毁。

  一个进程内,开多个线程的情况下,主线程在代码运行完毕后,还要等其他线程工作完才死掉,进程销毁。

  守护线程守护主线程,等到主线程死了才会被销毁。在有其他线程的情况下,主线程代码运行完后,等其他非守护线程结束,守护线程才会死掉。

  无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁。

  需要强调的是:运行完毕并非终止运行。运行完毕的真正含义:

  1、对主进程来说,运行完毕指的是主进程代码运行完毕。

  2、对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才能运行完毕。

  详细解释

  1、主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束

  2、主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。

from threading import Thread
import time

def sayhi(name):
    time.sleep(2)
    print("%s say hello" % name)

if __name__ == \'__main__\':
    t = Thread(target=sayhi, args=(\'egon\',))

    # 守护线程必须在t.start()前设置
    # 守护线程设置方式一:
    t.daemon=True
    # 守护线程设置方式二:
    # t.setDaemon(True)

    t.start()   # 立马创建子线程,但需要等待两秒,因此程序会先执行下面的代码

    print("主线程")
    print(t.is_alive())
# 这一行代码执行完后,主线程执行完毕,由于主线程之外,只有一个守护线程,主线程不需要等守护线程执行结束,因此主线程和守护进程终止,进程结束。
"""
主线程
True
"""

  练习:思考下述代码的执行结果有可能是哪些情况?为什么?

from threading import Thread
import time

def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
    print(456)
    time.sleep(3)
    print("end456")

if __name__ == \'__main__\':
    t1=Thread(target=foo)
    t2=Thread(target=bar)

    t1.daemon=True   # t1是守护线程
    t1.start()
    t2.start()
    print("main-------")   # 主线程结束后,会等待非守护线程结束
# 由于非守护线程需要等待的时间比守护线程长,因此线程都会得到执行
"""
123
456
main------
end123
end456
"""
由于非守护线程需要等待的时间比守护线程长,因此线程都会得到执行

七、GIL全局解释锁(Global Interpreter Lock)

链接:http://www.cnblogs.com/linhaifeng/articles/7449853.html

  后期需要详细分析这个部分的内容。

八、同步锁

1、三个需要注意的点

  1、线程抢的是GIL锁,GIL锁相当于执行权限,拿到执行权限后才能拿到互斥锁Lock,其他线程也可以抢到GIL,但如果发现Lock仍然没有被释放则阻塞,即便是拿到执行权限GIL也要立刻交出来。

  2、join是等待所有,即整体串行,而锁只是锁住修改共享数据的部分,即部分串行,要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁都可以实现,毫无疑问,互斥锁的部分串行效率要更高。

  3、一定要主要本小节最后GIL和互斥锁的经典分析。

2、GIL和Lock的对比

  Python已经有了一个GIL来保证同一时间只能有一个线程来执行,为什么还需要lock?

    锁的目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据。

    保护不同的数据就应该加不同的锁。

  GIL 与Lock是两把锁,保护的数据不一样,GIL是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据),Lock是保护用户自己开发的应用程序的数据,很明显GIL不负责这件事,只能用户自定义加锁处理,即Lock。

过程分析:所有线程抢的是GIL锁,或者说所有线程抢的是执行权限

  线程1抢到GIL锁,拿到执行权限,开始执行,然后加了一把Lock,还没有执行完毕,即线程1还未释放Lock,有可能线程2抢到GIL锁,开始执行,执行过程中发现Lock还没有被线程1释放,于是线程2进入阻塞,被夺走执行权限,有可能线程1拿到GIL,然后正常执行到释放Lock。。。这就导致了串行运行的效果

  既然是串行,那我们执行

  t1.start()

  t1.join

  t2.start()

  t2.join()

  这也是串行执行啊。

  需知join是等待t1所有的代码执行完,相当于锁住了t1的所有代码,而Lock只是锁住一部分操作共享数据的代码

  因为Python解释器帮你自动定期进行内存回收,你可以理解为python解释器里有一个独立的线程,每过一段时间它起wake up做一次全局轮询看看哪些内存数据是可以被清空的,此时你自己的程序 里的线程和 py解释器自己的线程是并发运行的,假设你的线程删除了一个变量,py解释器的垃圾回收线程在清空这个变量的过程中的clearing时刻,可能一个其它线程正好又重新给这个还没来及得清空的内存空间赋值了,结果就有可能新赋值的数据被删除了,为了解决类似的问题,python解释器简单粗暴的加了锁,即当一个线程运行时,其它人都不能动,这样就解决了上述的问题, 这可以说是Python早期版本的遗留问题。

  锁通常被用来实现对共享资源的同步访问。为每一个共享资源创建一个Lock对象,当你需要访问该资源时,调用acquire方法来获取锁对象(如果

以上是关于并发编程之多线程的主要内容,如果未能解决你的问题,请参考以下文章

python并发编程之多线程

并发编程之多线程之间通讯

python并发编程之多线程基础知识点

并发编程之多线程基础-守护线程与非守护线程

并发编程之多线程(Java)

Python并发编程之多线程