什么是线程?
线程是CPU上的执行单位。
线程和进程的区别
1、进程是资源的集合,是一个资源单位。线程是CPU上是执行单位。所以开进程开销与远大于开线程
2、进程单独开辟内存空间。同一个进程内多线程共享同一个内存空间
a = 100
def task():
global a
a = 0
if __name__ == ‘__main__‘:
# p = Process(target=task, )
# p.start()
# p.join()
# print(a) # 100
t = Thread(target=task, )
t.start()
t.join()
print(a) # 0
3、开多个进程,每个进程有不同的pid。在主进程下开启多个线程,每个线程的pid和主进程的pid一样
Thread对象的其他用法
from threading import Thread, current_thread, active_count, enumerate
import time
def task():
print("子进程", current_thread().getName(), current_thread().name)
# current_thread().getName() == current_thread().name
time.sleep(2)
if __name__ == ‘__main__‘:
t = Thread(target=task, name="sb")
t.start()
print("子线程", t.getName())
t.setName("我是子线程") # 设置进程名
print("子线程", t.name) # t.name == t.getName() 查看进程名
# t.join()
print(t.is_alive()) # 查看进程是否活动
# t.is_alive() == t.isAlive
print(active_count()) # 返回正在运行的线程数量,
print(len(enumerate()))
# 与len(threading.enumerate())有相同的结果。
练习
from threading import Thread
import time
def foo():
print("123")
time.sleep(1)
print("end123")
def bar():
print(456)
time.sleep(3)
print("end 456")
if __name__ == ‘__main__‘:
t1 = Thread(target=foo,)
t1.daemon = True
t2 = Thread(target=bar)
t1.start()
t2.start()
print("zhu")
"""
123
456
zhu
end123
end 456
"""
线程互斥锁
不加锁
from threading import Thread
import time
n = 100
def task():
global n
temp = n
time.sleep(0.1)
n = temp - 1
if __name__ == ‘__main__‘:
t_l = []
for i in range(10):
t = Thread(target=task)
t_l.append(t)
t.start()
for t in t_l:
t.join()
print("zhu", n) # 99
加了锁
from threading import Thread, Lock
import time
n = 100
def task():
global n
mutex.acquire()
temp = n
time.sleep(0.1)
n = temp - 1
mutex.release()
if __name__ == ‘__main__‘:
mutex = Lock()
t_l = []
for i in range(10):
t = Thread(target=task)
t_l.append(t)
t.start()
for t in t_l:
t.join()
print("zhu", n) # 90
GIL全局解释器锁
注意:
GIL并不是Python的特性,Python完全可以不依赖于GIL。它是在实现Python解析器(CPython)时所引入的一个概念。
什么是GIL?
全局解释器锁,GIL本质就是一把互斥锁。解释器级别的一把互斥锁。
GIL和互斥锁的区别
GIL 与Lock是两把锁,保护的数据不一样,前者是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据),后者是保护用户自己开发的应用程序的数据。
验证python test.py只会产生一个进程
#test.py内容
import os,time
print(os.getpid())
time.sleep(1000)
#打开终端执行
python3 test.py
#在windows下查看
tasklist |findstr python
#在linux下下查看
ps aux |grep python
https://www.luffycity.com/python-book/73-duo-jin-cheng-shi-xian/746-gilquan-ju-jie-shi-qi-suo.html
信号量
互斥锁 同时只允许一个线程更改数据,而信号量(Semaphore)是同时允许一定数量的线程更改数据 任务拿到锁去执行。
from threading import Thread,Semaphore
import threading
import time
def func():
sm.acquire()
print(‘%s get sm‘ %threading.current_thread().getName())
time.sleep(3)
sm.release()
if __name__ == ‘__main__‘:
sm=Semaphore(5)
for i in range(23):
t=Thread(target=func)
t.start()
Event事件
对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。
### 定时器(Timer)
定时器,指定n秒后执行某操作
from threading import Timer
def hello():
print("hello, world")
t = Timer(1, hello)
t.start() # 1秒后执行
### 线程queue
from threading import Event
event.is_set():返回event的状态值;
event.wait():如果 event.isSet()==False将阻塞线程;
event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
event.clear():恢复event的状态值为False。
代码实例
from threading import Thread,Event
import threading
import time,random
def conn_mysql():
count=1
while not event.is_set():
if count > 3:
raise TimeoutError(‘链接超时‘)
print(‘<%s>第%s次尝试链接‘ % (threading.current_thread().getName(), count))
event.wait(0.5)
count += 1
print(‘<%s>链接成功‘ % threading.current_thread().getName())
def check_mysql():
print(‘\\033[45m[%s]正在检查mysql\\033[0m‘ % threading.current_thread().getName())
time.sleep(random.randint(2, 4))
event.set()
if __name__ == ‘__main__‘:
event=Event()
conn1=Thread(target=conn_mysql)
conn2=Thread(target=conn_mysql)
check=Thread(target=check_mysql)
conn1.start()
conn2.start()
check.start()
定时器
from threading import Timer
def hello():
print("hello, world")
t = Timer(1, hello)
t.start()
多线程FTP
服务端
from threading import Thread
import socket
def communicate(conn):
while True:
try:
data = conn.recv(1024).decode()
if not data:
break
print(data)
conn.send(data.upper().encode())
except ConnectionRefusedError as e:
print(e)
break
def server(ip, port):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((ip, port))
server.listen(5)
while True:
conn, addr = server.accept()
t = Thread(target=communicate, args=(conn,))
t.start()
server.close()
if __name__ == ‘__main__‘:
server(‘127.0.0.1‘, 9999)
线程池&进程池
官网:https://docs.python.org/dev/library/concurrent.futures.html
concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用
基本方法
、submit(fn, *args, **kwargs)
异步提交任务
2、map(func, *iterables, timeout=None, chunksize=1)
取代for循环submit的操作
3、shutdown(wait=True)
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前
4、result(timeout=None)
取得结果
5、add_done_callback(fn)
回调函数
用法
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import os,time,random
def task(n):
print(‘%s is runing‘ %os.getpid())
time.sleep(random.randint(1,3))
return n**2
if __name__ == ‘__main__‘:
executor=ProcessPoolExecutor(max_workers=3)
futures=[]
for i in range(11):
future=executor.submit(task,i)
futures.append(future)
executor.shutdown(True)
print(‘+++>‘)
for future in futures:
print(future.result())
爬虫小练习
from concurrent.futures import ThreadPoolExecutor
import requests
import time
def get(url):
print(‘GET %s‘ %url)
response=requests.get(url)
time.sleep(3)
return {‘url‘:url,‘content‘:response.text}
def parse(res):
res=res.result()
print(‘%s parse res is %s‘ %(res[‘url‘],len(res[‘content‘])))
if __name__ == ‘__main__‘:
urls=[
‘http://www.cnblogs.com/linhaifeng‘,
‘https://www.python.org‘,
‘https://www.openstack.org‘,
]
pool=ThreadPoolExecutor(2)
for url in urls:
pool.submit(get,url).add_done_callback(parse)
基于进程池实现FTP
服务端
from socket import *
from concurrent.futures import ThreadPoolExecutor
def communicate(conn):
while True:
try:
data=conn.recv(1024)
if not data:break
conn.send(data.upper())
except ConnectionResetError:
break
conn.close()
def server(ip,port):
server = socket(AF_INET, SOCK_STREAM)
server.bind((ip,port))
server.listen(5)
while True:
conn, addr = server.accept()
pool.submit(communicate,conn)
server.close()
if __name__ == ‘__main__‘:
pool=ThreadPoolExecutor(2)
server(‘127.0.0.1‘, 8081)
同步与异步
同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果,再执行下一行代码,导致程序是串行执行
ef la(name):
print(‘%s is laing‘ % name)
time.sleep(random.randint(3, 5))
res=random.randint(7, 13)*‘#‘
return {‘name‘: name, ‘res‘: res}
def weigh(shit):
name = shit[‘name‘]
size = len(shit[‘res‘])
print(‘%s 拉了 《%s》kg‘ %(name,size))
if __name__ == ‘__main__‘:
pool = ThreadPoolExecutor(13)
shit1 = pool.submit(la, ‘alex‘).result()
weigh(shit1)
shit2 = pool.submit(la, ‘wupeiqi‘).result()
weigh(shit2)
shit3 = pool.submit(la, ‘yuanhao‘).result()
weigh(shit3)
异步调用:提交完任务后,不地等待任务执行完毕。
from concurrent.futures import ThreadPoolExecutor
import time
import random
def la(name):
print(‘%s is laing‘ %name)
time.sleep(random.randint(3,5))
res=random.randint(7,13)*‘#‘
return {‘name‘:name,‘res‘:res}
def weigh(shit):
shit=shit.result()
name=shit[‘name‘]
size=len(shit[‘res‘])
print(‘%s 拉了 《%s》kg‘ %(name,size))
if __name__ == ‘__main__‘:
pool=ThreadPoolExecutor(13)
pool.submit(la,‘alex‘).add_done_callback(weigh)
pool.submit(la,‘wupeiqi‘).add_done_callback(weigh)
pool.submit(la,‘yuanhao‘).add_done_callback(weigh)