如何在 Swift 中实现线程安全的 HashTable (PhoneBook) 数据结构?

Posted

技术标签:

【中文标题】如何在 Swift 中实现线程安全的 HashTable (PhoneBook) 数据结构?【英文标题】:How to implement a Thread Safe HashTable (PhoneBook) Data Structure in Swift? 【发布时间】:2018-08-12 14:32:48 【问题描述】:

我正在尝试实现一个线程安全的电话簿对象。电话簿应该能够添加一个人,并根据他们的姓名和电话号码查找一个人。从实现的角度来看,这仅涉及两个哈希表,一个关联名称 -> 个人,另一个关联电话号码 -> 个人。

需要注意的是我希望这个对象是线程安全的。这意味着我希望能够支持 PhoneBook 中的并发查找,同时确保一次只有一个线程可以将 Person 添加到 PhoneBook。这是基本的读写器问题,我正在尝试使用 GrandCentralDispatch 和调度障碍来解决这个问题。我正在努力解决这个问题,因为我遇到了问题。下面是我的 Swift 游乐场代码:

//: Playground - noun: a place where people can play

import UIKit
import PlaygroundSupport

PlaygroundPage.current.needsIndefiniteExecution = true

public class Person: CustomStringConvertible 
    public var description: String 
        get 
            return "Person: \(name), \(phoneNumber)"
        
    

    public var name: String
    public var phoneNumber: String
    private var readLock = ReaderWriterLock()

    public init(name: String, phoneNumber: String) 
        self.name = name
        self.phoneNumber = phoneNumber
    


    public func uniquePerson() -> Person 
        let randomID = UUID().uuidString
        return Person(name: randomID, phoneNumber: randomID)
    


public enum Qos 
    case threadSafe, none


public class PhoneBook 

    private var qualityOfService: Qos = .none
    public var nameToPersonMap = [String: Person]()
    public var phoneNumberToPersonMap = [String: Person]()
    private var readWriteLock = ReaderWriterLock()


    public init(_ qos: Qos) 
        self.qualityOfService = qos
    

    public func personByName(_ name: String) -> Person? 
        var person: Person? = nil
        if qualityOfService == .threadSafe 
            readWriteLock.concurrentlyRead  [weak self] in
                guard let strongSelf = self else  return 
                person = strongSelf.nameToPersonMap[name]
            
         else 
            person = nameToPersonMap[name]
        

        return person
    

    public func personByPhoneNumber( _ phoneNumber: String) -> Person? 
        var person: Person? = nil
        if qualityOfService == .threadSafe 
            readWriteLock.concurrentlyRead  [weak self] in
                guard let strongSelf = self else  return 
                person = strongSelf.phoneNumberToPersonMap[phoneNumber]
            
         else 
            person = phoneNumberToPersonMap[phoneNumber]
        

        return person
    

    public func addPerson(_ person: Person) 
        if qualityOfService == .threadSafe 
            readWriteLock.exclusivelyWrite  [weak self] in
                guard let strongSelf = self else  return 
                strongSelf.nameToPersonMap[person.name] = person
                strongSelf.phoneNumberToPersonMap[person.phoneNumber] = person
            
         else 
            nameToPersonMap[person.name] = person
            phoneNumberToPersonMap[person.phoneNumber] = person
        
    




// A ReaderWriterLock implemented using GCD and OS Barriers.
public class ReaderWriterLock 

    private let concurrentQueue = DispatchQueue(label: "com.ReaderWriterLock.Queue", attributes: DispatchQueue.Attributes.concurrent)
    private var writeClosure: (() -> Void)!

    public func concurrentlyRead(_ readClosure: (() -> Void)) 
        concurrentQueue.sync 
            readClosure()
        
    

    public func exclusivelyWrite(_ writeClosure: @escaping (() -> Void)) 
        self.writeClosure = writeClosure
        concurrentQueue.async(flags: .barrier)  [weak self] in
            guard let strongSelf = self else  return 
            strongSelf.writeClosure()
        
    



// MARK: Testing the synchronization and thread-safety

for _ in 0..<5 
    let iterations = 1000
    let phoneBook = PhoneBook(.none)

    let concurrentTestQueue = DispatchQueue(label: "com.PhoneBookTest.Queue", attributes: DispatchQueue.Attributes.concurrent)
    for _ in 0..<iterations 
        let person = Person(name: "", phoneNumber: "").uniquePerson()
        concurrentTestQueue.async 
            phoneBook.addPerson(person)
        
    

    sleep(10)
    print(phoneBook.nameToPersonMap.count)

为了测试我的代码,我运行了 1000 个并发线程,这些线程只是将一个新人添加到电话簿。每个 Person 都是唯一的,因此在 1000 个线程完成后,我希望 PhoneBook 包含 1000 个计数。每次执行写入时,我都会执行 dispatch_barrier 调用,更新哈希表并返回。据我所知,这就是我们需要做的。但是,在重复运行 1000 个线程后,我发现 PhoneBook 中的条目数不一致并且到处都是:

Phone Book Entries: 856
Phone Book Entries: 901
Phone Book Entries: 876
Phone Book Entries: 902
Phone Book Entries: 912

谁能帮我弄清楚发生了什么?我的锁定代码是否有问题,或者更糟糕的是,我的测试是如何构建的?我对这个多线程问题空间很陌生,谢谢!

【问题讨论】:

我认为你的问题是 sleep(10) 是一个竞争条件。在需要等到所有工作完成的实际应用程序中,您不会使用它。尝试sleep(30) 作为实验,看看您的结果是否有所改善。 我怎么说等到所有工作都在现实生活中完成?我也觉得关于睡眠的草图。 sleep(30) 也不起作用.. 顺便说一句,我假设这个模型只是为了探索线程安全,但我可能会建议一个稍微不同的结构,即[Person] 的数组,然后具有返回人员数组的方法给定的电话号码或名字(例如使用filter)。我们中的许多人都有多个联系人使用相同的电话号码(例如,共享的业务线,甚至是拥有相同家庭电话的家庭)。我什至为同名的不同人提供了一些条目(例如,我有两个不同的“保罗·威廉姆斯”,他们是真正不同的人)。做任何你想做的事,但只是一个想法。 【参考方案1】:

问题是你的ReaderWriterLock。您将 writeClosure 保存为属性,然后异步调度调用该已保存属性的闭包。但是,如果在此期间另一个 exclusiveWrite 出现,您的 writeClosure 属性将被新的闭包替换。

在这种情况下,这意味着您可以多次添加相同的Person。而且由于您使用的是字典,这些重复项具有相同的键,因此不会导致您看到所有 1000 个条目。

您实际上可以简化ReaderWriterLock,完全消除该属性。我还将concurrentRead 设为泛型,返回值(就像sync 一样),并重新抛出任何错误(如果有)。

public class ReaderWriterLock 
    private let queue = DispatchQueue(label: "com.domain.app.rwLock", attributes: .concurrent)
    
    public func concurrentlyRead<T>(_ block: (() throws -> T)) rethrows -> T 
        return try queue.sync 
            try block()
        
    
    
    public func exclusivelyWrite(_ block: @escaping (() -> Void)) 
        queue.async(flags: .barrier) 
            block()
        
    


其他一些不相关的观察:

    顺便说一句,这个简化的ReaderWriterLock 恰好解决了另一个问题。我们现在已删除的 writeClosure 属性很容易引入强引用循环。

    是的,您对使用[weak self] 很谨慎,因此没有任何强引用循环,但这是可能的。我建议无论您在何处使用闭包属性,都应在完成后将该闭包属性设置为nil,以便解决任何可能意外导致闭包的强引用。这样,一个持久的强引用循环是不可能的。 (另外,闭包本身以及它所具有的任何局部变量或其他外部引用都将被解析。)

    你睡了 10 秒钟。这应该绰绰有余,但我建议不要只添加随机的sleep 调用(因为你永远不能 100% 确定)。幸运的是,您有一个并发队列,因此您可以使用它:

    concurrentTestQueue.async(flags: .barrier)  
        print(phoneBook.count) 
    
    

    由于该障碍,它会等到您放入该队列的所有其他内容完成。

    注意,我不只是打印nameToPersonMap.count。这个数组已经在PhoneBook 中进行了仔细的同步,所以你不能让随机的外部类在没有同步的情况下直接访问它。

    当你有一些你在内部同步的属性时,它应该是private,然后创建一个线程安全的函数/变量来检索你需要的任何东西:

    public class PhoneBook 
    
        private var nameToPersonMap = [String: Person]()
        private var phoneNumberToPersonMap = [String: Person]()
    
        ...
    
        var count: Int 
            return readWriteLock.concurrentlyRead 
                nameToPersonMap.count
            
        
    
    

    您说您正在测试线程安全,但随后使用.none 选项创建了PhoneBook(实现无线程安全)。在那种情况下,我预计会出现问题。您必须使用 .threadSafe 选项创建您的 PhoneBook

    您有许多strongSelf 模式。那是相当不迅速的。在 Swift 中通常不需要它,因为您可以使用 [weak self] 然后进行可选链接。

将所有这些放在一起,这是我最后的游乐场:

PlaygroundPage.current.needsIndefiniteExecution = true

public class Person 
    public let name: String
    public let phoneNumber: String
    
    public init(name: String, phoneNumber: String) 
        self.name = name
        self.phoneNumber = phoneNumber
    
    
    public static func uniquePerson() -> Person 
        let randomID = UUID().uuidString
        return Person(name: randomID, phoneNumber: randomID)
    


extension Person: CustomStringConvertible 
    public var description: String 
        return "Person: \(name), \(phoneNumber)"
    


public enum ThreadSafety  // Changed the name from Qos, because this has nothing to do with quality of service, but is just a question of thread safety
    case threadSafe, none


public class PhoneBook 
    
    private var threadSafety: ThreadSafety
    private var nameToPersonMap = [String: Person]()        // if you're synchronizing these, you really shouldn't expose them to the public
    private var phoneNumberToPersonMap = [String: Person]() // if you're synchronizing these, you really shouldn't expose them to the public
    private var readWriteLock = ReaderWriterLock()
    
    public init(_ threadSafety: ThreadSafety) 
        self.threadSafety = threadSafety
    
    
    public func personByName(_ name: String) -> Person? 
        if threadSafety == .threadSafe 
            return readWriteLock.concurrentlyRead  [weak self] in
                self?.nameToPersonMap[name]
            
         else 
            return nameToPersonMap[name]
        
    
    
    public func personByPhoneNumber(_ phoneNumber: String) -> Person? 
        if threadSafety == .threadSafe 
            return readWriteLock.concurrentlyRead  [weak self] in
                self?.phoneNumberToPersonMap[phoneNumber]
            
         else 
            return phoneNumberToPersonMap[phoneNumber]
        
    
    
    public func addPerson(_ person: Person) 
        if threadSafety == .threadSafe 
            readWriteLock.exclusivelyWrite  [weak self] in
                self?.nameToPersonMap[person.name] = person
                self?.phoneNumberToPersonMap[person.phoneNumber] = person
            
         else 
            nameToPersonMap[person.name] = person
            phoneNumberToPersonMap[person.phoneNumber] = person
        
    
    
    var count: Int 
        return readWriteLock.concurrentlyRead 
            nameToPersonMap.count
        
    


// A ReaderWriterLock implemented using GCD concurrent queue and barriers.

public class ReaderWriterLock 
    private let queue = DispatchQueue(label: "com.domain.app.rwLock", attributes: .concurrent)
    
    public func concurrentlyRead<T>(_ block: (() throws -> T)) rethrows -> T 
        return try queue.sync 
            try block()
        
    
    
    public func exclusivelyWrite(_ block: @escaping (() -> Void)) 
        queue.async(flags: .barrier) 
            block()
        
    



for _ in 0 ..< 5 
    let iterations = 1000
    let phoneBook = PhoneBook(.threadSafe)
    
    let concurrentTestQueue = DispatchQueue(label: "com.PhoneBookTest.Queue", attributes: .concurrent)
    for _ in 0..<iterations 
        let person = Person.uniquePerson()
        concurrentTestQueue.async 
            phoneBook.addPerson(person)
        
    
    
    concurrentTestQueue.async(flags: .barrier) 
        print(phoneBook.count)
    


就个人而言,我倾向于更进一步

将同步移至通用类;和 将模型更改为Person对象的数组,这样: 该机型支持多人同号或手机号;和 您可以根据需要使用值类型。

例如:

public struct Person 
    public let name: String
    public let phoneNumber: String
    
    public static func uniquePerson() -> Person 
        return Person(name: UUID().uuidString, phoneNumber: UUID().uuidString)
    


public struct PhoneBook 
    
    private var synchronizedPeople = Synchronized([Person]())
    
    public func people(name: String? = nil, phone: String? = nil) -> [Person]? 
        return synchronizedPeople.value.filter 
            (name == nil || $0.name == name) && (phone == nil || $0.phoneNumber == phone)
        
    
    
    public func append(_ person: Person) 
        synchronizedPeople.writer  people in
            people.append(person)
        
    
    
    public var count: Int 
        return synchronizedPeople.reader  $0.count 
    


/// A structure to provide thread-safe access to some underlying object using reader-writer pattern.

public class Synchronized<T> 
    /// Private value. Use `public` `value` computed property (or `reader` and `writer` methods)
    /// for safe, thread-safe access to this underlying value.
    
    private var _value: T
    
    /// Private reader-write synchronization queue
    
    private let queue = DispatchQueue(label: Bundle.main.bundleIdentifier! + ".synchronized", qos: .default, attributes: .concurrent)
    
    /// Create `Synchronized` object
    ///
    /// - Parameter value: The initial value to be synchronized.
    
    public init(_ value: T) 
        _value = value
    
    
    /// A threadsafe variable to set and get the underlying object, as a convenience when higher level synchronization is not needed        
    
    public var value: T 
        get  reader  $0  
        set  writer  $0 = newValue  
    
    
    /// A "reader" method to allow thread-safe, read-only concurrent access to the underlying object.
    ///
    /// - Warning: If the underlying object is a reference type, you are responsible for making sure you
    ///            do not mutating anything. If you stick with value types (`struct` or primitive types),
    ///            this will be enforced for you.
    
    public func reader<U>(_ block: (T) throws -> U) rethrows -> U 
        return try queue.sync  try block(_value) 
    
    
    /// A "writer" method to allow thread-safe write with barrier to the underlying object
    
    func writer(_ block: @escaping (inout T) -> Void) 
        queue.async(flags: .barrier) 
            block(&self._value)
        
    

【讨论】:

简直是惊人的答案。非常感谢你,这让我明白了发生了什么......老实说,我没有看到关闭问题是一个新手错误:P 再次感谢。 接受答案 @coping - 是的,因为writer:你不能在struct 中使用mutating 方法来异步执行突变。 @Kiran 是的。有时您只需要一个同步访问器方法,在这种情况下,访问器会产生更自然的代码。例如,也许你只是在重置一个同步整数。 foo.value = 0 感觉比 foo.writer $0 = 0 foo.writer value in value = 0 要自然得多。但是假设我正在递增值,我不能使用访问器(因为递增是一个多步操作,读取值,递增它,然后写入它)并且foo.value += 1 不是线程安全的,就像它一样无辜好像是。你必须foo.writer $0 += 1 或同等学历。 但是如果你发现两者都提供令人困惑,那么消除计算属性并失去其更简洁的语法。 (但恕我直言,您不想消除该方法,因为通常情况下,您确实需要更高级别的同步来实现线程安全。)【参考方案2】:

我不认为你用错了:)。

原始(在macos上)生成:

0  swift                    0x000000010c9c536a PrintStackTraceSignalHandler(void*) + 42
1  swift                    0x000000010c9c47a6 SignalHandler(int) + 662
2  libsystem_platform.dylib 0x00007fffbbdadb3a _sigtramp + 26
3  libsystem_platform.dylib 000000000000000000 _sigtramp + 1143284960
4  libswiftCore.dylib       0x0000000112696944 _T0SSwcp + 36
5  libswiftCore.dylib       0x000000011245fa92 _T0s24_VariantDictionaryBufferO018ensureUniqueNativeC0Sb11reallocated_Sb15capacityChangedtSiF + 1634
6  libswiftCore.dylib       0x0000000112461fd2 _T0s24_VariantDictionaryBufferO17nativeUpdateValueq_Sgq__x6forKeytF + 1074

如果您从 ReaderWriter 队列中删除“.concurrent”,“问题就会消失”。© 如果您恢复 .concurrent,但将写入端的异步调用更改为同步:

swift(10504,0x70000896f000) malloc: *** 对象 0x7fcaa440cee8 错误:已释放对象的校验和不正确 - 对象可能在被释放后被修改。

如果它不迅速,那会有点令人惊讶? 我挖了进去,通过插入一个散列函数用一个 Int 替换了基于“字符串”的数组,用屏障调度替换了 sleep(10) 以刷新任何滞后的块,这使得它更可重现地崩溃,而更有帮助:

x(10534,0x700000f01000) malloc: *** 对象 0x7f8c9ee00008 错误:已释放对象的校验和不正确 - 对象可能在被释放后被修改。

但是当搜索源发现没有 malloc 或 free 时,也许堆栈转储更有用。

无论如何,解决问题的最佳方法:改用 go;它实际上是有道理的。

【讨论】:

删除“.concurrent”会使问题消失,因为那时您正在利用串行队列。默认情况下,串行队列不需要使用读写锁,因为线程一次串行执行一个,这就是它“工作”的原因。此外,您不能使用不是 API 工作方式的障碍同步调度,因此为什么您会崩溃。我不明白你的解决方案,告诉我使用“Go”是不够的。这个问题旨在帮助我在 Swift 中使用它。 我认为你的意图是好的,但不幸的是人们投了反对票。我也必须这样做。 删除 .concurrent 不仅改变了它,使它不再是一个读写器(从而失去了它所带来的好处),而且它实际上也没有解决根本问题。拿他的原始代码,使队列串行,问题仍然存在。问题是异步调用的闭包属性。我敢打赌,当你整理他的代码时,你删除了那些不必要的属性,这才是真正消除问题的原因。

以上是关于如何在 Swift 中实现线程安全的 HashTable (PhoneBook) 数据结构?的主要内容,如果未能解决你的问题,请参考以下文章

通过锁定在 Java 中实现线程安全的 ArrayList

Boost中实现线程安全

如何在 Swift 中实现单例类

在 Swift 中实现用于选择的 UIButton

这是如何在 swift 4 中实现的?

以下 C++ 代码中实现的 DCL(双重检查锁定)是不是是线程安全的?