通过 Beast websocket 发送连续数据块时无法找到“Broken Pipe”错误的原因

Posted

技术标签:

【中文标题】通过 Beast websocket 发送连续数据块时无法找到“Broken Pipe”错误的原因【英文标题】:Unable to find the reason for "Broken Pipe" error while sending continuous data chunks through Beast websocket 【发布时间】:2019-03-29 13:15:57 【问题描述】:

我正在使用 IBM Watson 语音到文本 Web 服务 API 进行流式音频识别。我在 C++(std 11)中创建了一个带有 boost (beast 1.68.0) 库的 web-socket。

我已成功连接到 IBM 服务器,并希望通过以下方式向服务器发送 231,296 字节的原始音频数据。


  "action": "start",
  "content-type": "audio/l16;rate=44100"


websocket.binary(true);
<bytes of binary audio data 50,000 bytes>
<bytes of binary audio data 50,000 bytes>
<bytes of binary audio data 50,000 bytes>
<bytes of binary audio data 50,000 bytes>
<bytes of binary audio data 31,296 bytes>

websocket.binary(false);

  "action": "stop"

IBMServer 的预期结果是:

 "results": [
      "alternatives": [
              "confidence": xxxx, 
               "transcript": "call Rohan Chauhan "
            ],"final": true
      ], "result_index": 0

但我没有得到想要的结果:错误提示 “断管”

DataSize is: 50000 | mIsLast is : 0
DataSize is: 50000 | mIsLast is : 0
what : Broken pipe
DataSize is: 50000 | mIsLast is : 0
what : Operation canceled
DataSize is: 50000 | mIsLast is : 0
what : Operation canceled
DataSize is: 31296 | mIsLast is : 0
what : Operation canceled

这是我的代码,它改编自 beast 库中给出的 sample example。

Foo.hpp

class IbmWebsocketSession: public std::enable_shared_from_this<IbmWebsocketSession> 
protected:
    char binarydata[50000];
    std::string TextStart;
    std::string TextStop;

public:
    explicit IbmWebsocketSession(net::io_context& ioc, ssl::context& ctx, SttService* ibmWatsonobj) :
        mResolver(ioc), mWebSocket(ioc, ctx) 
    TextStart ="\"action\":\"start\",\"content-type\": \"audio/l16;rate=44100\"";
    TextStop = "\"action\":\"stop\"";


   /**********************************************************************
    * Desc  : Send start frame
   **********************************************************************/
    void send_start(beast::error_code ec);
   /**********************************************************************
    * Desc  : Send Binary data
   **********************************************************************/
    void send_binary(beast::error_code ec);
   /**********************************************************************
    * Desc  : Send Stop frame
   **********************************************************************/
    void send_stop(beast::error_code ec);
   /**********************************************************************
    * Desc  : Read the file for binary data to be sent
   **********************************************************************/
    void readFile(char *bdata, unsigned int *Len, unsigned int *start_pos,bool *ReachedEOF);

Foo.cpp

void IbmWebsocketSession::on_ssl_handshake(beast::error_code ec) 
    if(ec)
        return fail(ec, "connect");
// Perform the websocket handshake
    ws_.async_handshake_ex(host, "/speech-to-text/api/v1/recognize", [Token](request_type& reqHead) reqHead.insert(http::field::authorization,Token);,bind(&IbmWebsocketSession::send_start, shared_from_this(),placeholders::_1));


void IbmWebsocketSession::send_start(beast::error_code ec)
    if(ec)
        return fail(ec, "ssl_handshake");

    ws_.async_write(net::buffer(TextStart),
        bind(&IbmWebsocketSession::send_binary, shared_from_this(),placeholders::_1));


void IbmWebsocketSession::send_binary(beast::error_code ec) 
    if(ec)
        return fail(ec, "send_start");
    readFile(binarydata, &Datasize, &StartPos, &IsLast);

    ws_.binary(true);
    if (!IsLast) 
        ws_.async_write(net::buffer(binarydata, Datasize),
            bind(&IbmWebsocketSession::send_binary, shared_from_this(),
                    placeholders::_1));

     else 
        IbmWebsocketSession::on_binarysent(ec);
    


void IbmWebsocketSession::on_binarysent(beast::error_code ec) 
    if(ec)
        return fail(ec, "send_binary");

    ws_.binary(false);
    ws_.async_write(net::buffer(TextStop),
           bind(&IbmWebsocketSession::read_response, shared_from_this(), placeholders::_1));


void IbmWebsocketSession::readFile(char *bdata, unsigned int *Len, unsigned int *start_pos,bool *ReachedEOF) 

    unsigned int end = 0;
    unsigned int start = 0;
    unsigned int length = 0;

    // Creation of ifstream class object to read the file
    ifstream infile(filepath, ifstream::binary);

    if (infile) 
        // Get the size of the file
        infile.seekg(0, ios::end);
        end = infile.tellg();

        infile.seekg(*start_pos, ios::beg);
        start = infile.tellg();

        length = end - start;
    

    if ((size_t) length < 150) 
        *Len = (size_t) length;
        *ReachedEOF = true;
    // cout << "Reached end of File (last 150 bytes)" << endl;

     else if ((size_t) length <= 50000)   //Maximumbytes to send are 50000
        *Len = (size_t) length;
        *start_pos += (size_t) length;
        *ReachedEOF = false;
        infile.read(bdata, length);

     else 
        *Len = 50000;
        *start_pos += 50000;
        *ReachedEOF = false;
        infile.read(bdata, 50000);
    

    infile.close();

这里有什么建议吗?

【问题讨论】:

StartPosIsLast 等变量在类中声明。 阅读boost::asio::buffer。您使用从局部变量创建的 buffer 调用 async_async_ 立即返回。所以buffer 引用了被破坏的数据。 TextStart 和 mTextStop 是传递给异步函数的局部变量。 你可以在TextStart前面加上static关键字,这样它的生命周期是不定式的。正如@rafix07 指出的那样,buffer 只是对数据块的引用,因此它们指向的值必须具有适当的生命周期。目前你有 UB,遗憾的是它不会导致崩溃。 @MarekR 我正在使用 c++11。我已经在课堂上声明了static TextStartstatic TextStop,但错误仍然存​​在。我不确定是什么导致了这个“断管”错误 【参考方案1】:

来自 boost 的 documentation 我们在 websocket::async_write 上有以下摘录

该函数用于异步写入一条完整的消息。这 call 总是立即返回。异步操作将 继续,直到满足以下条件之一:

    已写入完整的消息。

    发生错误。

因此,当您创建要传递给它的缓冲区对象net::buffer(TextStart) 时,例如传递给它的buffer 的生命周期仅在函数返回之前。可能即使在函数返回您之后,异步写入仍在按照文档在缓冲区上运行,但内容不再有效,因为 buffer 是一个局部变量。

要解决此问题,您可以将 TextStart 设为静态或将其声明为您的班级成员并将其复制到 boost::asio::buffer 有很多关于如何做到这一点的示例。注意我只在IbmWebsocketSession::send_start 函数中提到了TextStart。整个代码中的问题几乎相同。

来自 IBM Watson 的API definition,发起连接需要某种格式,然后可以将其表示为字符串。您有字符串,但缺少正确的格式,因此连接被对等方关闭,并且您正在写入关闭的套接字,因此管道损坏。

启动连接需要:

  var message = 
    action: 'start',
    content-type: 'audio/l16;rate=22050'
  ;

根据您的要求可以表示为string TextStart = "action: 'start',\r\ncontent-type: 'audio\/l16;rate=44100'"

根据聊天中的讨论,OP通过添加代码解决了这个问题:

if (!IsLast ) 
    ws_.async_write(net::buffer(binarydata, Datasize),
    bind(&IbmWebsocketSession::send_binary, shared_from_this(),
    placeholders::_1));
 
else 
     if (mIbmWatsonobj->IsGstFileWriteDone())  //checks for the file write completion
         IbmWebsocketSession::on_binarysent(ec);
      else 
         std::this_thread::sleep_for(std::chrono::seconds(1));
         IbmWebsocketSession::send_binary(ec);
     

讨论的原因在于,在同一组字节上完成文件写入之前,更多字节被发送到客户端。 OP 现在会在尝试发送更多字节之前验证这一点。

【讨论】:

我确实尝试将TextStartTextStop 设为静态。但我遇到了同样的问题。我尝试将它们设为类的成员变量,然后在构造函数中设置所需的文本。仍然有同样的破损管道。 请立即检查更新的代码 sn-p。但是错误还是一样 std::string mTextStop = "\"action\":\"stop\""; 仍然是一个局部变量 哦,不,我的错。忘了从那里删除它。我还尝试在.hpp 中添加静态关键字,然后在.cpp 中将它们声明为std::string IbmWebsocketSession::TextStart;TextStop 类似。没有变化 现在是调试的好时机。尝试注释掉从 handshakesend_startsend_binary 的功能,看看哪个函数导致管道损坏

以上是关于通过 Beast websocket 发送连续数据块时无法找到“Broken Pipe”错误的原因的主要内容,如果未能解决你的问题,请参考以下文章

Boost :: Beast Websocket双向流(C ++)

通过 Websockets 从 Python Flask 服务器连续向客户端发送数据

通过Websocket从Python Flask服务器连续向客户端发送数据

Boost Beast websocket 服务器异步接受失败,缓冲区溢出

如何正确写c++ boost beast websocket server

如何使用 boost beast websocket 客户端收听 websocket 提要?