如何为 Network.framework 编写 NWProtocolFramer,使用分隔符将流拆分为帧?

Posted

技术标签:

【中文标题】如何为 Network.framework 编写 NWProtocolFramer,使用分隔符将流拆分为帧?【英文标题】:How to write a NWProtocolFramer for Network.framework that splits streams into frames using a delimiter? 【发布时间】:2020-10-08 16:33:24 【问题描述】:

我尝试使用以下代码创建一个成帧器,它将 ASCII 字节流拆分为由管道 ascii 字符分隔的帧:"|"

import Network

fileprivate let pipe = Character("|").asciiValue!

class PipeFramer: NWProtocolFramerImplementation 
    static let label = "Pipe framer"
    static let definition = NWProtocolFramer.Definition(implementation: PipeFramer.self)

    var minLengthUntilNextMessage = 1 
        didSet  print("client: minLength set to", minLengthUntilNextMessage) 
    

    required init(framer: NWProtocolFramer.Instance) 

    func start(framer: NWProtocolFramer.Instance) -> NWProtocolFramer.StartResult  .ready 

    func handleInput(framer: NWProtocolFramer.Instance) -> Int 
        while true 
            var delimiterPosition: Int?
            _ = framer.parseInput(minimumIncompleteLength: minLengthUntilNextMessage, maximumLength: 65535)  buffer, endOfMessage in
                if let buffer = buffer 
                    print("client: parsing buffer: \"\(String(bytes: buffer, encoding: .utf8) ?? buffer.debugDescription)\"")
                    if let indexOfDelimiter = buffer.firstIndex(of: pipe) 
                        minLengthUntilNextMessage = 1
                        delimiterPosition = indexOfDelimiter
                     else 
                        minLengthUntilNextMessage = buffer.count + 1
                    
                 else 
                    print("client: no buffer")
                
                return 0
            

            if let length = delimiterPosition 
                guard framer.deliverInputNoCopy(length: length, message: .init(instance: framer), isComplete: true) else 
                    return 0
                
                _ = framer.parseInput(minimumIncompleteLength: 1, maximumLength: 65535)  _,_ in 1 
             else 
                return minLengthUntilNextMessage
            
        
    

    func handleOutput(framer: NWProtocolFramer.Instance, message: NWProtocolFramer.Message, messageLength: Int, isComplete: Bool) 
        try! framer.writeOutputNoCopy(length: messageLength)
        framer.writeOutput(data: [pipe])
    

    func wakeup(framer: NWProtocolFramer.Instance) 

    func stop(framer: NWProtocolFramer.Instance) -> Bool  return true 

    func cleanup(framer: NWProtocolFramer.Instance)  

问题是,从我得到一个不以"|" 结尾的块的那一刻起,成帧器就会卡在那个块上。因此,在这个不完整块之后的其他块永远不会完全到达 framer.parseInput(...) 调用。因为它总是解析 minimumIncompleteLength 的块,因此永远不会到达下一个 "|" 所在的位置。

下面是这个问题的简单复现:

    创建 TCP 服务器 设置服务器,以便在客户端连接时发送消息块。 使用上面的成帧器连接到服务器(在 1. 中创建)。 开始接收消息。

Swift 代码:

import Network

let client = DispatchQueue(label: "Server")
let server = DispatchQueue(label: "Client")
let networkParameters = NWParameters.tcp
networkParameters.defaultProtocolStack.applicationProtocols.insert(NWProtocolFramer.Options(definition: PipeFramer.definition), at: 0)
let server = try! NWListener(using: .tcp)

server.newConnectionHandler =  connection in
    print("server: new connection from", connection.endpoint)

    print("server (client \(connection.endpoint)): state", connection.state)

    connection.viabilityUpdateHandler =  viable in
        print("server (client \(connection.endpoint)): state", connection.state)
        if viable 
            print("server: sending")
            connection.send(content: "A|Be||Sea".data(using: .utf8)!, isComplete: false, completion: .idempotent)

            serverQueue.asyncAfter(deadline: .now() + 5) 
                print("server: sending second part")
                connection.send(content: " is longer than expected|0|".data(using: .utf8)!, isComplete: true, completion: .idempotent)
            
            serverQueue.asyncAfter(deadline: .now() + 8) 
                print("server: sending last part")
                connection.send(content: "Done|".data(using: .utf8)!, isComplete: true, completion: .idempotent)
            
        
    

    connection.start(queue: serverQueue)


server.stateUpdateHandler =  state in
    print("server:", state)
    if state == .ready, let port = server.port 
        print("server: listening on", port)
    


server.start(queue: serverQueue)

let client = NWConnection(to: .hostPort(host: "localhost", port: server.port!), using: networkParameters)

func receiveNext() 
    client.receiveMessage  (data, context, complete, error) in
        let content: String
        if let data = data 
            content = String(data: data, encoding: .utf8) ?? data.description
         else 
            content = data?.debugDescription ?? "<no data>"
        
        print("client: received \"\(content)\"", context.debugDescription, complete, error?.localizedDescription ?? "No error")

        receiveNext()
    


client.stateUpdateHandler =  state in
    print("client:", state)

    if state == .ready 
        print("client: receiving")
        receiveNext()
    


client.start(queue: clientQueue)

结果:

server: waiting(POSIXErrorCode: Network is down)
server: ready
server: listening on 54894
client: preparing
client: ready
client: receiving
server: new connection from ::1.53179
server (client ::1.53179): state setup
server (client ::1.53179): state ready
server: sending
client: parsing buffer: "A|Be||Sea"
client: minLength set to 1
client: parsing buffer: "Be||Sea"
client: minLength set to 1
client: parsing buffer: "|Sea"
client: minLength set to 1
client: parsing buffer: "Sea"
client: minLength set to 4
client: parsing buffer: ""
client: minLength set to 1
client: received "A" Optional(Network.NWConnection.ContentContext) true No error
client: received "Be" Optional(Network.NWConnection.ContentContext) true No error
client: received "<no data>" Optional(Network.NWConnection.ContentContext) true No error
client: parsing buffer: "Sea"
client: minLength set to 4
server: sending second part
client: parsing buffer: "Sea "
client: minLength set to 5
client: parsing buffer: "Sea i"
client: minLength set to 6
server: sending last part
client: parsing buffer: "Sea is"
client: minLength set to 7
client: parsing buffer: "Sea is "
client: minLength set to 8

请注意,客户端永远不会收到第四条和第五条消息。我应该如何编写 Framer 以便在传入的不完整块之后接收消息?


参考文献

A swift package containing the above code Corresponding discussion on  Developer Forums

【问题讨论】:

【参考方案1】:

我遇到了完全相同的问题...我正在使用的网络协议也有一个简单的分隔符来分隔每个“消息”,并且该协议没有告诉我会发生什么的标头。通常在缓冲区的末尾,只有部分消息没有分隔符,需要读取更多字节才能获取消息的其余部分。像这样的:

|              PACKET A              |              PACKET B             |
|<message>|<message>|<message><mess...age>|<message><message><message><m...essage>
     1         2         4      5a    5b      6        7        8      9a    9b

Note:
delimiter = | - single character
lhsMessage = message 5a 
rhsMessage = message 5b

即使看了 WWDC 和 Apple 的其他示例,我仍然不完全理解 handleInputparseInput 应该如何工作。

我假设我可以简单地使用 (lhsMessage.count + 1) 从 handleInput 返回,它会将部分消息保留在当前 buffer 中并向缓冲区中添加额外的字节(即来自 PACKET B) parseInput 可以检查。

但是,它似乎确实以这种方式工作。相反,我最终将 lhsMessage 的值存储在类 var 中,然后从 parseInput 返回lhsMessage.count,我相信这会将缓冲区中的“光标”移动到结束和强制handleInput 获取新数据包(即数据包B)。

作为 parseInput 的一部分,然后我检查我是否有 lhsMessage,然后假设我找到的分隔符实际上是 rhsMessage。然后我加入 LHS 和 RHS 创建一个completeMessage。此时,我也从 parseInput 返回 (rhsMessage.count + 1) 的值以再次移动光标。

现在要发送这个completeMessage 我无法使用 deliverInputNoCopy,因为组成 completeMessage 的字节不再在缓冲区中 :-)

handleInput 使用 deliverInput 将消息发回。

【讨论】:

以上是关于如何为 Network.framework 编写 NWProtocolFramer,使用分隔符将流拆分为帧?的主要内容,如果未能解决你的问题,请参考以下文章

如何为 Node.js 编写异步函数

如何为@KafkaListener 编写单元测试?

如何为循环编写异常?

如何为 Python socketserver 请求处理程序编写单元测试?

如何为类编写正确的描述方法?

如何为 Product* getProductFromID(std::string) 编写方法定义;