如何可靠地重现此 python 代码中的竞争条件?

Posted

技术标签:

【中文标题】如何可靠地重现此 python 代码中的竞争条件?【英文标题】:How can I reproduce the race conditions in this python code reliably? 【发布时间】:2013-10-26 03:40:33 【问题描述】:

上下文

我最近发布了timer class for review on Code Review。我有一种直觉,因为我曾经看到 1 个单元测试失败,但无法重现该失败。因此,我发布了代码审查。

我得到了一些很好的反馈,突出了代码中的各种竞争条件。 (我想)我理解了问题和解决方案,但是在进行任何修复之前,我想通过单元测试来暴露错误。当我尝试时,我意识到这很困难。各种堆栈交换答案表明我必须控制线程的执行以暴露错误,并且任何人为的时间不一定可以移植到不同的机器上。这似乎是我试图解决的问题之外的许多意外复杂性。

相反,我尝试使用the best static analysis (SA) tool for python,PyLint,看看它是否能找出任何错误,但它不能。为什么人类可以通过代码审查(本质上是 SA)找到错误,而 SA 工具却不能?

害怕尝试get Valgrind working with python(这听起来像牦牛剃须),我决定在不先复制它们的情况下修复错误。现在我陷入了困境。

现在是代码。

from threading import Timer, Lock
from time import time

class NotRunningError(Exception): pass
class AlreadyRunningError(Exception): pass


class KitchenTimer(object):
    '''
    Loosely models a clockwork kitchen timer with the following differences:
        You can start the timer with arbitrary duration (e.g. 1.2 seconds).
        The timer calls back a given function when time's up.
        Querying the time remaining has 0.1 second accuracy.
    '''

    PRECISION_NUM_DECIMAL_PLACES = 1
    RUNNING = "RUNNING"
    STOPPED = "STOPPED"
    TIMEUP  = "TIMEUP"

    def __init__(self):
        self._stateLock = Lock()
        with self._stateLock:
            self._state = self.STOPPED
            self._timeRemaining = 0

    def start(self, duration=1, whenTimeup=None):
        '''
        Starts the timer to count down from the given duration and call whenTimeup when time's up.
        '''
        with self._stateLock:
            if self.isRunning():
                raise AlreadyRunningError
            else:
                self._state = self.RUNNING
                self.duration = duration
                self._userWhenTimeup = whenTimeup
                self._startTime = time()
                self._timer = Timer(duration, self._whenTimeup)
                self._timer.start()

    def stop(self):
        '''
        Stops the timer, preventing whenTimeup callback.
        '''
        with self._stateLock:
            if self.isRunning():
                self._timer.cancel()
                self._state = self.STOPPED
                self._timeRemaining = self.duration - self._elapsedTime()
            else:
                raise NotRunningError()

    def isRunning(self):
        return self._state == self.RUNNING

    def isStopped(self):
        return self._state == self.STOPPED

    def isTimeup(self):
        return self._state == self.TIMEUP

    @property
    def timeRemaining(self):
        if self.isRunning():
            self._timeRemaining = self.duration - self._elapsedTime()
        return round(self._timeRemaining, self.PRECISION_NUM_DECIMAL_PLACES)

    def _whenTimeup(self):
        with self._stateLock:
            self._state = self.TIMEUP
            self._timeRemaining = 0
            if callable(self._userWhenTimeup):
                self._userWhenTimeup()

    def _elapsedTime(self):
        return time() - self._startTime

问题

在此代码示例的上下文中,我如何公开竞争条件、修复它们并证明它们已修复?

加分

适用于其他实现和问题而不是专门针对此代码的测试框架的加分项。

外卖

我的结论是,重现已识别竞态条件的技术解决方案是控制两个线程的同步性,以确保它们按照会暴露错误的顺序执行。这里的重点是它们是已经确定的竞争条件。我发现识别竞争条件的最佳方法是将您的代码提交代码审查并鼓励更多专家对其进行分析。

【问题讨论】:

PyLint 对线程一无所知 - 这就是它没有帮助的原因。一般来说,您在这里解决非常困难的问题。 Follow the references here 你会发现它们没有多大帮助:-( 一般来说很难,但这并不意味着不可能,对吧?我正在寻找特定于此示例的答案。到目前为止,我设法检测竞争条件的唯一方法是通过代码审查。但它会可靠地检测到它们吗?是否有更快的方法来确定我是否已修复(或引入)并发错误? 更好的方法是进行防御性编码,在这种情况下也同步 timeRemaining 方法,正如@perreal 建议的那样。就测试而言,蛮力是一个相当可靠的选择。 通常最好仔细考虑并确保不存在竞争条件,因为这些条件很难检测到(运行程序可能只有百万分之一)。 也许before_after可以帮忙:oreills.co.uk/2015/03/01/testing-race-conditions-in-python.html 【参考方案1】:

传统上,在多线程代码中强制竞争条件是通过信号量完成的,因此您可以强制一个线程等到另一个线程达到某个边缘条件后再继续。

例如,您的对象有一些代码来检查start 是否在对象已经运行时未被调用。您可以通过执行以下操作强制此条件以确保其行为符合预期:

开始KitchenTimer 在处于运行状态时在信号量上设置计时器块 在另一个线程中启动相同的计时器 捕捉AlreadyRunningError

要做到这一点,您可能需要扩展 KitchenTimer 类。正式的单元测试通常会使用定义为在关键时刻阻塞的模拟对象。模拟对象是一个比我在这里可以解决的更大的话题,但是谷歌搜索“python 模拟对象”会发现很多文档和许多实现可供选择。

这是一种强制代码抛出 AlreadyRunningError 的方法:

import threading

class TestKitchenTimer(KitchenTimer):

    _runningLock = threading.Condition()

    def start(self, duration=1, whenTimeUp=None):
        KitchenTimer.start(self, duration, whenTimeUp)
        with self._runningLock:
            print "waiting on _runningLock"
            self._runningLock.wait()

    def resume(self):
        with self._runningLock:
            self._runningLock.notify()

timer = TestKitchenTimer()

# Start the timer in a subthread. This thread will block as soon as
# it is started.
thread_1 = threading.Thread(target = timer.start, args = (10, None))
thread_1.start()

# Attempt to start the timer in a second thread, causing it to throw
# an AlreadyRunningError.
try:
    thread_2 = threading.Thread(target = timer.start, args = (10, None))
    thread_2.start()
except AlreadyRunningError:
    print "AlreadyRunningError"
    timer.resume()
    timer.stop()

通读代码,确定您要测试的一些边界条件,然后考虑您需要在哪里暂停计时器以强制该条件出现,并添加条件、信号量、事件等来实现它发生。例如如果在计时器运行 whenTimeUp 回调时,另一个线程试图停止它会发生什么?您可以通过让计时器在输入_whenTimeUp 后立即等待来强制该条件:

import threading

class TestKitchenTimer(KitchenTimer):

    _runningLock = threading.Condition()

    def _whenTimeup(self):
        with self._runningLock:
            self._runningLock.wait()
        KitchenTimer._whenTimeup(self)

    def resume(self):
        with self._runningLock:
            self._runningLock.notify()

def TimeupCallback():
    print "TimeupCallback was called"

timer = TestKitchenTimer()

# The timer thread will block when the timer expires, but before the callback
# is invoked.
thread_1 = threading.Thread(target = timer.start, args = (1, TimeupCallback))
thread_1.start()
sleep(2)

# The timer is now blocked. In the parent thread, we stop it.
timer.stop()
print "timer is stopped: %r" % timer.isStopped()

# Now allow the countdown thread to resume.
timer.resume()

对您要测试的类进行子类化并不是一种很好的测试方法:您基本上必须重写所有方法才能测试每个方法的竞争条件,此时有一个很好的方法要提出的论点是您并没有真正测试原始代码。相反,您可能会发现将信号量直接放在 KitchenTimer 对象中但默认初始化为 None 更简洁,并让您的方法在获取或等待锁之前检查 if testRunningLock is not None:。然后,您可以强制对您提交的实际代码进行竞争。

一些关于 Python 模拟框架的阅读可能会有所帮助。事实上,我不确定 mock 是否有助于测试这段代码:它几乎完全是自包含的,不依赖于许多外部对象。但是模拟教程有时会涉及到这些问题。我没有使用过任何这些,但是这些文档是一个很好的开始:

Getting Started with Mock Using Fudge Python Mock Testing Techniques and Tools

【讨论】:

我可以通过快速连续两次调用 start 来强制代码抛出 AlreadyRunningError。 你能举个例子来重现“如果在计时器运行 whenTimeUp 回调时,另一个线程试图停止它会发生什么?”这是最初在代码审查中确定的竞争条件之一,我正在努力重现。 “快速连续”方法与“运行多个线程”的蛮力方法没有什么不同。因为它依赖于两个 close-by 语句在其间很短的延迟内执行的机会。我同意一般来说你不能真正产生一般的比赛条件。如果您通过分析发现它可能会遇到竞争条件,那么它会遇到。无需测试(因为您已经在脑海中进行了测试)。 @justhalf 对“快速继任”造成的混乱表示歉意。我的意思是,如果我开始()一个持续时间为 10 秒的 KitchenTimer,并且我尝试在该持续时间内再次开始()它,我应该得到 AlreadyRunningError。这是定时器的外部行为。无论 start() 是在同一个线程还是不同线程中调用,它都应该发生。 当然,刚刚编辑了答案以包含一个示例,用于公开 _whenTimeup() 和 stop() 之间的竞争条件。还修复了我原始示例中的一个错误,哎呀。【参考方案2】:

测试线程(非)安全代码的最常见解决方案是启动大量线程并希望获得最好的结果。我和我可以想象的其他人遇到的问题是它依赖于机会并且它使测试变得“繁重”。

当我不久前遇到这个问题时,我想追求精确而不是蛮力。结果是一段测试代码通过让线程相互竞争而导致竞争条件。

简单的代码

spam = []

def set_spam():
    spam[:] = foo()
    use(spam)

如果从多个线程调用set_spam,则在修改和使用spam 之间存在竞争条件。让我们尝试一致地重现它。

如何导致竞争条件

class TriggeredThread(threading.Thread):
    def __init__(self, sequence=None, *args, **kwargs):
        self.sequence = sequence
        self.lock = threading.Condition()
        self.event = threading.Event()
        threading.Thread.__init__(self, *args, **kwargs)

    def __enter__(self):
        self.lock.acquire()
        while not self.event.is_set():
            self.lock.wait()
        self.event.clear()

    def __exit__(self, *args):
        self.lock.release()
        if self.sequence:
            next(self.sequence).trigger()

    def trigger(self):
        with self.lock:
            self.event.set()
            self.lock.notify()

然后来演示这个线程的使用:

spam = []  # Use a list to share values across threads.
results = []  # Register the results.

def set_spam():
    thread = threading.current_thread()
    with thread:  # Acquires the lock.
        # Set 'spam' to thread name
        spam[:] = [thread.name]
    # Thread 'releases' the lock upon exiting the context.
    # The next thread is triggered and this thread waits for a trigger.
    with thread:
        # Since each thread overwrites the content of the 'spam'
        # list, this should only result in True for the last thread.
        results.append(spam == [thread.name])

threads = [
    TriggeredThread(name='a', target=set_spam),
    TriggeredThread(name='b', target=set_spam),
    TriggeredThread(name='c', target=set_spam)]

# Create a shifted sequence of threads and share it among the threads.
thread_sequence = itertools.cycle(threads[1:] + threads[:1])
for thread in threads:
    thread.sequence = thread_sequence

# Start each thread
[thread.start() for thread in threads]
# Trigger first thread.
# That thread will trigger the next thread, and so on.
threads[0].trigger()
# Wait for each thread to finish.
[thread.join() for thread in threads]
# The last thread 'has won the race' overwriting the value
# for 'spam', thus [False, False, True].
# If set_spam were thread-safe, all results would be true.
assert results == [False, False, True], "race condition triggered"
assert results == [True, True, True], "code is thread-safe"

我想我已经对这种结构进行了足够的解释,因此您可以根据自己的情况实施它。我认为这非常适合“加分”部分:

适用于其他实现和问题而不是专门针对此代码的测试框架的加分项。

解决竞争条件

共享变量

每个线程问题都以自己特定的方式解决。在上面的示例中,我通过跨线程共享一个值引起了竞争条件。使用全局变量(例如模块属性)时可能会出现类似问题。解决此类问题的关键可能是使用线程本地存储:

# The thread local storage is a global.
# This may seem weird at first, but it isn't actually shared among threads.
data = threading.local()
data.spam = []  # This list only exists in this thread.
results = []  # Results *are* shared though.

def set_spam():
    thread = threading.current_thread()
    # 'get' or set the 'spam' list. This actually creates a new list.
    # If the list was shared among threads this would cause a race-condition.
    data.spam = getattr(data, 'spam', [])
    with thread:
        data.spam[:] = [thread.name]
    with thread:
        results.append(data.spam == [thread.name])

# Start the threads as in the example above.

assert all(results)  # All results should be True.

并发读/写

一个常见的线程问题是多个线程同时读取和/或写入数据持有者的问题。这个问题是通过实现读写锁来解决的。读写锁的实际实现可能会有所不同。您可以选择读优先锁、写优先锁或随机选择。

我确信有一些例子描述了这种锁定技术。我稍后可能会写一个例子,因为这已经是一个很长的答案了。 ;-)

注意事项

看看the threading module documentation 并尝试一下。由于每个线程问题不同,因此适用不同的解决方案。

关于线程的主题,请看一下 Python GIL(全局解释器锁)。需要注意的是,线程实际上可能不是优化性能的最佳方法(但这不是您的目标)。我觉得这个演示很不错:https://www.youtube.com/watch?v=zEaosS1U5qY

【讨论】:

我喜欢你的 hack 导致特定的线程操作顺序......让我在本地测试一下,看看我是否能用它! 但是,运行您的示例大约 2K 次后,我没有遇到任何故障...也许您的答案中的代码同步得有点太好了? @qarma 如果你说的是assert results == [False, False, True];这是对导致失败的断言。如果代码是线程安全的,它应该只包含True。问题在于共享变量spam。线程名称存储在其中并由另一个线程更改。当您将代码运行一百万次时,它将总是产生相同的结果,从而有效且可靠地导致“竞争条件”。 啊!好的,我接受这一点,我希望你从一个偶尔出现的竞争示例开始,然后展示如何可靠地触发竞争。不过,这无助于识别一般意义上的比赛,因为您必须在比赛的两点之间手动插入另一个with。有时很难做到,例如表达式foo[x] += 1 很激烈,但你不能在不重写的情况下将with 注入其中。 @qarma 是的,这是我的代码的主要问题。需要with 语句,以便线程可以触发另一个线程。我已经研究了如何逐行运行线程(类似于调试器“步骤”),但无法理解它。 with 语句确实为开发人员提供了使线程在给定点启动/停止的一些灵活性。【参考方案3】:

你可以使用很多线程来测试它:

import sys, random, thread
def timeup():
    sys.stdout.write("Timer:: Up %f" % time())

def trdfunc(kt, tid):
    while True :
        sleep(1)
        if not kt.isRunning():
            if kt.start(1, timeup):
                sys.stdout.write("[%d]: started\n" % tid)
        else:
            if random.random() < 0.1:
                kt.stop()
                sys.stdout.write("[%d]: stopped\n" % tid)
        sys.stdout.write("[%d] remains %f\n" % ( tid, kt.timeRemaining))

kt = KitchenTimer()
kt.start(1, timeup)
for i in range(1, 100):
    thread.start_new_thread ( trdfunc, (kt, i) )
trdfunc(kt, 0)

我看到的几个问题:

当线程发现计时器未运行并尝试启动它时, 由于两者之间的上下文切换,代码通常会引发异常 测试并开始。我认为提出一个例外太多了。或者你可以 有一个原子的 testAndStart 函数

stop 也会出现类似的问题。你可以实现一个 testAndStop 功能。

即使是 timeRemaining 函数中的这段代码:

if self.isRunning():
   self._timeRemaining = self.duration - self._elapsedTime()

需要某种原子性,也许您需要先获取锁 测试正在运行。

如果您打算在线程之间共享此类,则需要解决这些问题。

【讨论】:

您的代码似乎依赖于蛮力和随机性,这如何可靠地重现任何问题或这个特定问题? @qarma,我认为在这种情况下,确定性是计算概率并相应地设置线程数和计时器间隔的问题。 我已尝试运行您的测试代码,但无法理解结果。预期的结果是什么?如何将它们与实际结果进行比较?对于奖励积分,您能否更新您的示例以总结我们的预期/实际结果? 向一个对象抛出大量线程是对系统进行压力测试的好方法,但它绝对不能替代以确定性方式故意测试已知的边缘条件。这绝对不是解决手头问题的办法。【参考方案4】:

一般来说 - 这不是可行的解决方案。您可以通过使用调试器重现这种竞争条件(在代码中的某些位置设置断点,然后,当它遇到一个断点时 - 冻结线程并运行代码直到它遇到另一个断点,然后冻结这个线程并解冻第一个线程,您可以使用这种技术以任何方式交错线程执行)。

问题是 - 您拥有的线程和代码越多,交叉副作用的方法就越多。实际上 - 它会成倍增长。一般来说,没有可行的解决方案来测试它。只有在一些简单的情况下才有可能。

这个问题的解决方案是众所周知的。编写意识到其副作用的代码,使用锁、信号量或队列等同步原语控制副作用,或者尽可能使用不可变数据。

也许更实用的方法是使用运行时检查来强制执行正确的调用顺序。例如(伪代码):

class RacyObject:
    def __init__(self):
        self.__cnt = 0
        ...

    def isReadyAndLocked(self):
        acquire_object_lock
            if self.__cnt % 2 != 0:
                # another thread is ready to start the Job
                return False
            if self.__is_ready:
                self.__cnt += 1
                return True
            # Job is in progress or doesn't ready yet
            return False
        release_object_lock

    def doJobAndRelease(self):
        acquire_object_lock
            if self.__cnt % 2 != 1:
                raise RaceConditionDetected("Incorrect order")
            self.__cnt += 1
            do_job()
        release_object_lock

如果您在调用doJobAndRelease 之前未检查isReadyAndLock,此代码将引发异常。只需使用一个线程即可轻松测试。

obj = RacyObject()
...
# correct usage
if obj.isReadyAndLocked()
    obj.doJobAndRelease()

【讨论】:

完全不正确。严格的单元测试通常会通过检测对象等到另一个线程进入临界区来公开竞争条件。 在实践中您无法可靠地执行此检查,因为这将取决于上下文切换。我没有看到任何在单元测试中检查锁定正确性的项目。这个问题唯一可行的解​​决方案是动态检查,有一堆针对不同语言和操作系统的竞争检测器(例如,golang 内置了一个),但我不知道 python 有一个。 当然你可以在实践中做到这一点。这就是 threading.Condition、threading.Semaphore 和 threading.Event 类的用途:控制线程执行以强制您的代码执行边缘条件。我在回答中举了例子。 在您的示例中,thread_2 可能会首先启动,而 thread_1 - 其次。至少如果它将在多处理器/多核机器上运行,并且每个线程将在不同的处理器或 CPU 内核上运行。但也许在 python 中这不会是一个问题,因为 GIL,我不确定。 你是对的!这就是为什么我在评论中指出,严格的单元测试将使用另一个信号量来确保第一个线程在第二个线程开始之前进入其临界区。 :-) 我真的不同意你必须检查大量线程之间的指数级交互。几乎所有的竞争条件都可以被识别为两个竞争进程之间的交互——如果你有成百上千个线程,则更有可能发生不受保护的竞争,但这将是其中两个线程。

以上是关于如何可靠地重现此 python 代码中的竞争条件?的主要内容,如果未能解决你的问题,请参考以下文章

寻找竞争条件的方法

如何通过 python 中的 pandas 合并重现 R 中 foverlaps 的相同输出?

C# 中的简单 SQLite ORM,代码有效,但如何解决竞争条件

ASIO signal_set 对于多个 IO 线程不可靠,取决于代码顺序?

此 UPDATE 语句中是不是存在可能的竞争条件?

使用数据库事务防止竞争条件 (Laravel)