多线程

Posted dissipate

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程相关的知识,希望对你有一定的参考价值。

  

from threading import Event,Thread
import logging,time

FORMAT=%(asctime)s %(threadName)s %(thread)d %(message)s
logging.basicConfig(format=FORMAT,level=logging.INFO)

def boss(event:Event):
    logging.info(boss is waitting)
    event.wait()
    logging.info(boss: work is done)

def worker(event:Event,count=10):
    logging.info(worker start)
    cups=[]
    while True:
        cups.append(1)
        time.sleep(0.5)
        logging.info(append 1)
        if len(cups)>=10:
            event.set()
            break
    logging.info(worker finished, cups={}.format(cups))

event=Event()
w=Thread(target=worker,args=(event,))
b=Thread(target=boss,args=(event,))
w.start()
b.start()

 

使用同一个Event对象的标记flag

event.wait()就是等到flag变为True,或等到超时返回False,不限制等待时间

 

from threading import Event,Thread
import logging,time
logging.basicConfig(level=logging.INFO)

def do(event:Event,interval:int):
    while not event.wait(interval):
        logging.info(do sth)

e=Event()
Thread(target=do,args=(e,3)).start()

# e.wait(10)
time.sleep(10)
e.set()
print(main exited)

wait优于sleep,wait会主动出让时间片,其它线程可以调度,而sleep会占用时间片,不出让

 

Timer类的实现

from threading import Thread,Event
import logging,datetime
logging.basicConfig(level=logging.INFO)

def add(x:int,y:int):
    logging.info(x+y)

class Timer():
    def __init__(self,interval,fn,*args,**kwargs):
        self.interval=interval
        self.fn=fn
        self.args=args
        self.kwargs=kwargs
        self.event=Event()

    def start(self):
        Thread(target=self.__run).start()

    def cancel(self):
        self.event.set()

    def __run(self):
        start=datetime.datetime.now()
        logging.info(waiting)

        self.event.wait(self.interval)
        if not self.event.is_set():
            self.fn(*self.args,**self.kwargs)

        delta=(datetime.datetime.now()-start).total_seconds()
        logging.info(finished {}.format(delta))

t=Timer(5,add,3,9)
t.start()
e=Event()
e.wait(3)
t.cancel()

 

Lock:

lock 存在共享资源争抢的地方都可以使用索,从而保证只有一个使用者可以完全使用这个资源

import logging,threading
logging.basicConfig(level=logging.INFO)
cups=[]
lock=threading.Lock()

def w(lock:threading.Lock,n=100):
    while True:
        lock.acquire()
        count=len(cups)
        logging.info(count)
        if count>=n:
            lock.release()
            break
        cups.append(1)
        lock.release()
        logging.info({} produce.format(threading.current_thread()))


for b in range(10):
    threading.Thread(target=w,args=(lock,20)).start()

threading.Event().wait(1)
print(len(cups))
print(threading.enumerate())

 

import threading,logging,time
FORMAT=%(name)s %(asctime)s %(threadName)s %(thread)d %(message)s
logging.basicConfig(format=FORMAT,level=logging.INFO)

cups=[]

def worker(count=10):
    logging.info(produce cups)
    flag=True
    while True:
        if len(cups)>=count:
            flag=False
        time.sleep(0.001)
        if not flag:
            break
        if flag:
            cups.append(1)
    logging.info({} done,cups: {}.format(threading.current_thread(),len(cups)))

for _ in range(10):
    threading.Thread(target=worker,args=(20,)).start()
threading.Event().wait(1)
print(len(cups))

 

Lock

一旦线程获得锁,其它试图获得锁的线程将被 阻塞,

acquire(blocking=True,timeout=-1) 默认阻塞,可以设置阻塞超时时间,非阻塞时,timeout禁止设置,成功获取锁,返回True,否则返回False
release() 释放锁,可以从任何线程调用release(),已上锁的Lock会被重置为unlocked,未上锁调用release(),抛出RuntimeError异常

 

 

Counter类

import threading,time
class C:
    def __init__(self):
        self.__val=0
    @property
    def value(self):
        return self.__val
    def inc(self):
        lock.acquire()
        self.__val+=1
        lock.release()
    def dec(self):
        lock.acquire()
        self.__val-=1
        lock.release()
lock=threading.Lock()
def run(c:C,count=100):
    for _ in range(count):
        for p in range(-50,50):
            if p<0:
                # lock.acquire()
                c.dec()
                # lock.release()
            else:
                # lock.acquire()
                c.inc()
                # lock.release()
c=C()
c1=10
c2=10000
for b in range(c1):
    threading.Thread(target=run,args=(c,c2)).start()
# time.sleep(0.001)
print(c.value)
while True:
    time.sleep(0.5)
    print(threading.enumerate())
    print(c.value)

类中加锁 或者 操作加锁

 

通过with 或 try finally确保锁释放

import threading,time

class C:
    def __init__(self):
        self.__val=0
        self.__lock=threading.Lock()
    @property
    def value(self):
        with self.__lock:
            return self.__val
    def inc(self):
        try:
            self.__lock.acquire()
            self.__val+=1
        finally:
            self.__lock.release()
    def dec(self):
        try:
            self.__lock.acquire()
            self.__val-=1
        finally:
            self.__lock.release()
    def dec(self):
        with self.__lock:
            self.__val-=1
def run(c:C,count=100):
    for _ in range(count):
        for p in range(-50,50):
            if p<0:
                c.inc()
            else:
                c.dec()
c=C()
c1=10
c2=1000
for _ in range(c1):
    threading.Thread(target=run,args=(c,c2)).start()

while True:
    if threading.active_count() == 1:
        print(threading.enumerate())
        print(c.value)
    else:
        print(threading.enumerate())
    time.sleep(0.5)

 

import threading,logging,time
FORMAT=%(name)s %(asctime)s %(threadName)s %(thread)d %(message)s
logging.basicConfig(format=FORMAT,level=logging.INFO)

cups=[]

def worker(lock:threading.Lock,task=100):
    while True:
        if lock.acquire(blocking=False):
            count=len(cups)
            logging.info(number of cups: {}.format(count))

            if count>=task: # 每个子线程必须执行此退出条件
                lock.release()
                break

            cups.append(1)
            lock.release()
            logging.info({} produce {} cup.format(threading.current_thread().name,len(cups)))
            time.sleep(0.5)
    logging.info(worker end,number of cups: {}.format(len(cups)))

lock=threading.Lock()
for _ in range(10):
    threading.Thread(target=worker,args=(lock,40)).start()

while True:
    if threading.active_count() == 1:
        print(threading.enumerate())
        print(len(cups))
    else:
        print(threading.enumerate())
    time.sleep(1)
import threading,logging,time
FORMAT=%(asctime)-15s	[%(threadName)s %(thread)8d] %(message)s
logging.basicConfig(level=logging.INFO,format=FORMAT)

def worker(tasks):
    for task in tasks:
        time.sleep(0.01) # 强行切换线程,使任务依次执行
        if task.lock.acquire(blocking=False):
            logging.info({} {} acquire success.format(threading.current_thread(),task.name))
        else:
            logging.info({} {} acquire failed.format(threading.current_thread(),task.name))

class Task:
    def __init__(self,name):
        self.name=name
        self.lock=threading.Lock()

tasks=[Task(task-{}.format(p)) for p in range(10)]

for b in range(5):
    threading.Thread(target=worker,name=worker-{}.format(b),args=(tasks,)).start()

 

可重入锁RLock          Reentrant Lock

线程相关锁

线程A获得可重入锁,并可以多次获取成功,不会阻塞,最后要在线程A中做和acquire()次数相同的release()

import threading
lock=threading.RLock()
ret=lock.acquire()
print(ret)
ret=lock.acquire(timeout=5)
print(ret)
print(lock.acquire(True))
print(lock.acquire(False))

技术图片

 

 

import threading
def sub(lock:threading.RLock):
    lock.release()

lock=threading.RLock()
print(lock.acquire())
print(lock.acquire(timeout=3))

threading.Thread(target=sub,args=(lock,)).start()

技术图片

 

 

import threading,time
lock=threading.RLock()
print(lock.acquire())
print(~*80)
print(lock.acquire(blocking=True))
print(lock.acquire(blocking=False))
print(lock.acquire(timeout=3))
# for _ in range(5): # RuntimeError
#     lock.release()
# print(lock.acquire(blocking=False,timeout=3)) # ValueError

for _ in range(4): # 使Timer线程不阻塞
    lock.release()
def sub(lock:threading.RLock):
    print({}: {}.format(threading.current_thread(),lock.acquire()))
    print({}: {}.format(threading.current_thread(),lock.acquire()))
    print(lock.release())
    print(lock.release())
    # print(lock.release())

threading.Timer(2,sub,args=(lock,)).start()

不同线程获取同一个RLock时会阻塞

 

threading.Condition:

import threading,logging,random
logging.basicConfig(level=logging.INFO,format=%(asctime)s %(threadName)s %(thread)d %(message)s)

class Dispatcher:
    def __init__(self):
        self.data=0
        self.event=threading.Event()
        self.cond=threading.Condition()

    def produce(self):
        for b in range(100):
            with self.cond:
                self.data=random.randint(1,100)
                logging.info(produce {}.format(self.data))
                # self.cond.notify_all()
                self.cond.notify(2)
            self.event.wait(1)

    def consume(self):
        while True:
            with self.cond:
                self.cond.wait()
                logging.info(thread {} consume {}.format(threading.current_thread(),self.data))
            # self.event.wait(0.5)

v=Dispatcher()
# threading.Thread(target=v.produce).start()
# threading.Event().wait(3)
# threading.Thread(target=v.consume).start()
# threading.Thread(target=v.consume).start()

for b in range(5):
    threading.Thread(target=v.consume,name=consumer-{}.format(b)).start()
threading.Thread(target=v.produce).start()

 

import threading,logging,random
FORMAT=%(asctime)s [%(threadName)s %(thread)d] %(message)s
logging.basicConfig(format=FORMAT,level=logging.CRITICAL)

class Dispatcher:
    def __init__(self):
        self.data=None
        self.event=threading.Event()
    def produce(self,total):
        for _ in range(total):
            data=random.randrange(1,100)
            logging.critical(data)
            self.data=data
            self.event.wait(1)
        self.event.set() # 结束consume的endless loop

    def consume(self):
        while not self.event.is_set():
            logging.critical(received {}.format(self.data))
            self.data=None
            self.event.wait(0.5)

v=Dispatcher()
threading.Thread(target=v.consume,name=consumer).start()
threading.Event().wait(2)
threading.Thread(target=v.produce,args=(10,),name=producer).start()

 

import threading,logging,time
FORMAT=%(name)s %(asctime)s %(threadName)s %(thread)d %(message)s
logging.basicConfig(format=FORMAT,level=logging.INFO)

cups=[]

def worker(lock:threading.Lock,task=100):
    while True:
        if lock.acquire(blocking=False):
            count=len(cups)
            logging.info(number of cups: {}.format(count))

            if count>=task: # 每个子线程必须执行此退出条件
                lock.release()
                break

            cups.append(1)
            lock.release()
            logging.info({} produce {} cup.format(threading.current_thread().name,len(cups)))
            time.sleep(0.5)
    logging.info(worker end,number of cups: {}.format(len(cups)))

lock=threading.Lock()
for _ in range(10):
    threading.Thread(target=worker,args=(lock,40)).start()

while True:
    if threading.active_count() == 1:
        print(threading.enumerate())
        print(len(cups))
    else:
        print(threading.enumerate())
    time.sleep(1)

 

import threading,logging,random
FORMAT=%(asctime)s [ %(threadName)s %(thread)d ] %(message)s
logging.basicConfig(format=FORMAT,level=logging.ERROR)

class Dispatcher:
    def __init__(self):
        self.data=None
        self.event=threading.Event()
        self.cond=threading.Condition()

    def produce(self,total):
        for _ in range(total):
            with self.cond:
                data=random.randint(1,100)
                logging.error(data)
                self.data=data
                self.cond.notify_all()
            self.event.wait(1)

        self.event.set()
        print(88888888888888888888888888)
        with self.cond: # notify最后一次使consumer结束循环
            self.cond.notify_all()

    def consumer(self):
        while not self.event.is_set():
            with self.cond:
                print(3333333333)
                self.cond.wait()
                print(444444444444444444444)
                logging.error(received {}.format(self.data))
                self.data=None
            self.event.wait(0.5)
            print(self.event.is_set())

v=Dispatcher()
threading.Thread(target=v.produce,args=(5 ,),name=producer).start()
threading.Event().wait(1)
threading.Thread(target=v.consumer,name=consumer).start()

 

以上是关于多线程的主要内容,如果未能解决你的问题,请参考以下文章

线程学习知识点总结

多个请求是多线程吗

python小白学习记录 多线程爬取ts片段

多线程编程

多线程编程

python多线程