python3之threading模块(下)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python3之threading模块(下)相关的知识,希望对你有一定的参考价值。

同步线程

threading.Condition(),Condition使用了一个Lock,所以可以绑定一个共享资源,使多个线程等待这个资源的更新再启动。

当然Condition也可以显示地使用acquire()和release()方法。

一个简单的示例

  1: import logging
  2: import threading
  3: import time
  4: def consumer(cond):
  5:     """
  6:     等待condition设置然后再使用资源
  7:     :param cond:
  8:     :return:
  9:     """
 10:     logging.debug("开启consumer线程")
 11:     with cond:
 12:         cond.wait()
 13:         logging.debug("对consumer线程资源可用")
 14: def producer(cond):
 15:     """
 16:     配置资源
 17:     :param cond:
 18:     :return:
 19:     """
 20:     logging.debug("开始producer线程")
 21:     with cond:
 22:         logging.debug("使资源可用")
 23:         # 唤醒所有等待的线程,老的写法叫notifyAll()
 24:         cond.notify_all()
 25: logging.basicConfig(
 26:     level=logging.DEBUG,
 27:     format="%(asctime)s %(threadName)-2s %(message)s"
 28: )
 29: condition = threading.Condition()
 30: c1 = threading.Thread(name="c1", target=consumer,
 31:                       args=(condition,))
 32: c2 = threading.Thread(name="c2", target=consumer,
 33:                       args=(condition,))
 34: p = threading.Thread(name="p", target=producer,
 35:                      args=(condition, ))
 36: c1.start()
 37: time.sleep(0.2)
 38: c2.start()
 39: time.sleep(0.2)
 40: p.start()

结果:

  1: 2019-01-26 11:56:06,025 c1 开启consumer线程
  2: 2019-01-26 11:56:06,226 c2 开启consumer线程
  3: 2019-01-26 11:56:06,426 p  开始producer线程
  4: 2019-01-26 11:56:06,426 p  使资源可用
  5: 2019-01-26 11:56:06,426 c2 对consumer线程资源可用
  6: 2019-01-26 11:56:06,427 c1 对consumer线程资源可用

屏障barrier是另一种线程同步机制。Barrier建立一个控制点,阻塞所有的参与的线程,直到所有的线程都到达这一点,然后同时释放阻塞的线程。

  1: import threading
  2: import time
  3: 
  4: def worker(barrier):
  5:     print(threading.current_thread().name,
  6:           "waiting for barrier with {} others.".format(barrier.n_waiting))
  7:     # 所有等待的线程都在等待时,所有的线程都被同时释放了。
  8:     worker_id = barrier.wait()
  9:     print(threading.current_thread().name, ‘after barrier‘, worker_id)
 10: NUM_THREAD = 3
 11: barrier = threading.Barrier(NUM_THREAD)
 12: # 推倒式
 13: threads = [
 14:     threading.Thread(
 15:         name="worker - %s" % i,
 16:         target=worker,
 17:         args=(barrier, )
 18:     )
 19:     for i in range(NUM_THREAD)
 20: ]
 21: for t in threads:
 22:     print(t.name, "starting")
 23:     t.start()
 24:     time.sleep(0.1)
 25: for t in threads:
 26:     t.join()

结果:

  1: worker - 0 starting
  2: worker - 0 waiting for barrier with 0 others.
  3: worker - 1 starting
  4: worker - 1 waiting for barrier with 1 others.
  5: worker - 2 starting
  6: worker - 2 waiting for barrier with 2 others.
  7: worker - 2 after barrier 2
  8: worker - 1 after barrier 1
  9: worker - 0 after barrier 0

abort()方法会使所有等待线程接收一个BrokenBarrierError。直到reset方法恢复,重新开始拦截。

限制资源的并发访问

如果多个线程同时访问一个资源,但要限制总数。这个可以使用Semaphore来管理。

使用方法:

  1: s = threading.Semaphore(2)
  2: t = threading.Thread(
  3:     target=worker,
  4:     name="t1",
  5:     args=(s, )
  6: )

线程特定的数据

对于一些需要保护的资源,需要对这些并非资源所有者的线程隐藏。 threading.local()函数会创建一个对象,它能隐藏值,除非在某个线程中设置了这个属性,这个线程才能看到它。

  1: import random
  2: import threading
  3: import logging
  4: def show_value(data):
  5:     try:
  6:         val = data.value
  7:     except AttributeError:
  8:         logging.debug("No value yet")
  9:     else:
 10:         logging.debug("value=%s" % val)
 11: def worker(data):
 12:     show_value(data)
 13:     data.value = random.randint(1, 100)
 14:     show_value(data)
 15: logging.basicConfig(
 16:     level=logging.DEBUG,
 17:     format="(%(threadName)-10s %(message)s)",
 18: )
 19: local_data = threading.local()
 20: show_value(local_data)
 21: local_data.value = 1000
 22: show_value(local_data)
 23: 
 24: # 这个worker是看不到local_data的
 25: for i in range(2):
 26:     t = threading.Thread(target=worker, args=(local_data, ))
 27:     t.start()
 28:     t.join()
 29: # 使用子类,来初始化所有的线程开始时都有相同的值
 30: class MyLocal(threading.local):
 31:     def __init__(self, value):
 32:         super().__init__()
 33:         logging.debug("Initializing %s" % self)
 34:         self.value = value
 35: local_data = MyLocal(1000)
 36: # 同样的worker调用__init__(),每调用一次以设置默认值
 37: for i in range(2):
 38:     t = threading.Thread(target=worker, args=(local_data, ))
 39:     t.start()
 40:     t.join()

结果:

  1: (MainThread No value yet)
  2: (MainThread value=1000)
  3: (Thread-1   No value yet)
  4: (Thread-1   value=76)
  5: (Thread-2   No value yet)
  6: (Thread-2   value=88)
  7: (MainThread Initializing <__main__.MyLocal object at 0x0000022A8026E168>)
  8: (Thread-3   Initializing <__main__.MyLocal object at 0x0000022A8026E168>)
  9: (Thread-3   value=1000)
 10: (Thread-3   value=31)
 11: (Thread-4   Initializing <__main__.MyLocal object at 0x0000022A8026E168>)
 12: (Thread-4   value=1000)
 13: (Thread-4   value=7)

以上是关于python3之threading模块(下)的主要内容,如果未能解决你的问题,请参考以下文章

python3之threading模块(上)

python3之threading模块(中)

(转)Python3入门之线程threading常用方法

python3 正则表达式 re模块之辣眼睛 计算器

Python3 多线程编程(threadthreading模块)

Python3多线程_thread模块的应用