通过 Beast websocket 发送连续数据块时无法找到“Broken Pipe”错误的原因
Posted
技术标签:
【中文标题】通过 Beast websocket 发送连续数据块时无法找到“Broken Pipe”错误的原因【英文标题】:Unable to find the reason for "Broken Pipe" error while sending continuous data chunks through Beast websocket 【发布时间】:2019-03-29 13:15:57 【问题描述】:我正在使用 IBM Watson 语音到文本 Web 服务 API 进行流式音频识别。我在 C++(std 11)中创建了一个带有 boost (beast 1.68.0) 库的 web-socket。
我已成功连接到 IBM 服务器,并希望通过以下方式向服务器发送 231,296 字节的原始音频数据。
"action": "start",
"content-type": "audio/l16;rate=44100"
websocket.binary(true);
<bytes of binary audio data 50,000 bytes>
<bytes of binary audio data 50,000 bytes>
<bytes of binary audio data 50,000 bytes>
<bytes of binary audio data 50,000 bytes>
<bytes of binary audio data 31,296 bytes>
websocket.binary(false);
"action": "stop"
IBMServer 的预期结果是:
"results": [
"alternatives": [
"confidence": xxxx,
"transcript": "call Rohan Chauhan "
],"final": true
], "result_index": 0
但我没有得到想要的结果:错误提示 “断管”
DataSize is: 50000 | mIsLast is : 0
DataSize is: 50000 | mIsLast is : 0
what : Broken pipe
DataSize is: 50000 | mIsLast is : 0
what : Operation canceled
DataSize is: 50000 | mIsLast is : 0
what : Operation canceled
DataSize is: 31296 | mIsLast is : 0
what : Operation canceled
这是我的代码,它改编自 beast 库中给出的 sample example。
Foo.hpp
class IbmWebsocketSession: public std::enable_shared_from_this<IbmWebsocketSession>
protected:
char binarydata[50000];
std::string TextStart;
std::string TextStop;
public:
explicit IbmWebsocketSession(net::io_context& ioc, ssl::context& ctx, SttService* ibmWatsonobj) :
mResolver(ioc), mWebSocket(ioc, ctx)
TextStart ="\"action\":\"start\",\"content-type\": \"audio/l16;rate=44100\"";
TextStop = "\"action\":\"stop\"";
/**********************************************************************
* Desc : Send start frame
**********************************************************************/
void send_start(beast::error_code ec);
/**********************************************************************
* Desc : Send Binary data
**********************************************************************/
void send_binary(beast::error_code ec);
/**********************************************************************
* Desc : Send Stop frame
**********************************************************************/
void send_stop(beast::error_code ec);
/**********************************************************************
* Desc : Read the file for binary data to be sent
**********************************************************************/
void readFile(char *bdata, unsigned int *Len, unsigned int *start_pos,bool *ReachedEOF);
Foo.cpp
void IbmWebsocketSession::on_ssl_handshake(beast::error_code ec)
if(ec)
return fail(ec, "connect");
// Perform the websocket handshake
ws_.async_handshake_ex(host, "/speech-to-text/api/v1/recognize", [Token](request_type& reqHead) reqHead.insert(http::field::authorization,Token);,bind(&IbmWebsocketSession::send_start, shared_from_this(),placeholders::_1));
void IbmWebsocketSession::send_start(beast::error_code ec)
if(ec)
return fail(ec, "ssl_handshake");
ws_.async_write(net::buffer(TextStart),
bind(&IbmWebsocketSession::send_binary, shared_from_this(),placeholders::_1));
void IbmWebsocketSession::send_binary(beast::error_code ec)
if(ec)
return fail(ec, "send_start");
readFile(binarydata, &Datasize, &StartPos, &IsLast);
ws_.binary(true);
if (!IsLast)
ws_.async_write(net::buffer(binarydata, Datasize),
bind(&IbmWebsocketSession::send_binary, shared_from_this(),
placeholders::_1));
else
IbmWebsocketSession::on_binarysent(ec);
void IbmWebsocketSession::on_binarysent(beast::error_code ec)
if(ec)
return fail(ec, "send_binary");
ws_.binary(false);
ws_.async_write(net::buffer(TextStop),
bind(&IbmWebsocketSession::read_response, shared_from_this(), placeholders::_1));
void IbmWebsocketSession::readFile(char *bdata, unsigned int *Len, unsigned int *start_pos,bool *ReachedEOF)
unsigned int end = 0;
unsigned int start = 0;
unsigned int length = 0;
// Creation of ifstream class object to read the file
ifstream infile(filepath, ifstream::binary);
if (infile)
// Get the size of the file
infile.seekg(0, ios::end);
end = infile.tellg();
infile.seekg(*start_pos, ios::beg);
start = infile.tellg();
length = end - start;
if ((size_t) length < 150)
*Len = (size_t) length;
*ReachedEOF = true;
// cout << "Reached end of File (last 150 bytes)" << endl;
else if ((size_t) length <= 50000) //Maximumbytes to send are 50000
*Len = (size_t) length;
*start_pos += (size_t) length;
*ReachedEOF = false;
infile.read(bdata, length);
else
*Len = 50000;
*start_pos += 50000;
*ReachedEOF = false;
infile.read(bdata, 50000);
infile.close();
这里有什么建议吗?
【问题讨论】:
StartPos
、IsLast
等变量在类中声明。
阅读boost::asio::buffer。您使用从局部变量创建的 buffer
调用 async_
。async_
立即返回。所以buffer
引用了被破坏的数据。
TextStart 和 mTextStop 是传递给异步函数的局部变量。
你可以在TextStart
前面加上static
关键字,这样它的生命周期是不定式的。正如@rafix07 指出的那样,buffer
只是对数据块的引用,因此它们指向的值必须具有适当的生命周期。目前你有 UB,遗憾的是它不会导致崩溃。
@MarekR 我正在使用 c++11。我已经在课堂上声明了static TextStart
和static TextStop
,但错误仍然存在。我不确定是什么导致了这个“断管”错误
【参考方案1】:
来自 boost 的 documentation 我们在 websocket::async_write
上有以下摘录
该函数用于异步写入一条完整的消息。这 call 总是立即返回。异步操作将 继续,直到满足以下条件之一:
已写入完整的消息。
发生错误。
因此,当您创建要传递给它的缓冲区对象net::buffer(TextStart)
时,例如传递给它的buffer
的生命周期仅在函数返回之前。可能即使在函数返回您之后,异步写入仍在按照文档在缓冲区上运行,但内容不再有效,因为 buffer
是一个局部变量。
要解决此问题,您可以将 TextStart
设为静态或将其声明为您的班级成员并将其复制到 boost::asio::buffer 有很多关于如何做到这一点的示例。注意我只在IbmWebsocketSession::send_start
函数中提到了TextStart。整个代码中的问题几乎相同。
来自 IBM Watson 的API definition,发起连接需要某种格式,然后可以将其表示为字符串。您有字符串,但缺少正确的格式,因此连接被对等方关闭,并且您正在写入关闭的套接字,因此管道损坏。
启动连接需要:
var message =
action: 'start',
content-type: 'audio/l16;rate=22050'
;
根据您的要求可以表示为string TextStart = "action: 'start',\r\ncontent-type: 'audio\/l16;rate=44100'"
。
根据聊天中的讨论,OP通过添加代码解决了这个问题:
if (!IsLast )
ws_.async_write(net::buffer(binarydata, Datasize),
bind(&IbmWebsocketSession::send_binary, shared_from_this(),
placeholders::_1));
else
if (mIbmWatsonobj->IsGstFileWriteDone()) //checks for the file write completion
IbmWebsocketSession::on_binarysent(ec);
else
std::this_thread::sleep_for(std::chrono::seconds(1));
IbmWebsocketSession::send_binary(ec);
讨论的原因在于,在同一组字节上完成文件写入之前,更多字节被发送到客户端。 OP 现在会在尝试发送更多字节之前验证这一点。
【讨论】:
我确实尝试将TextStart
和TextStop
设为静态。但我遇到了同样的问题。我尝试将它们设为类的成员变量,然后在构造函数中设置所需的文本。仍然有同样的破损管道。
请立即检查更新的代码 sn-p。但是错误还是一样
std::string mTextStop = "\"action\":\"stop\"";
仍然是一个局部变量
哦,不,我的错。忘了从那里删除它。我还尝试在.hpp
中添加静态关键字,然后在.cpp
中将它们声明为std::string IbmWebsocketSession::TextStart;
和TextStop
类似。没有变化
现在是调试的好时机。尝试注释掉从 handshake
到 send_start
和 send_binary
的功能,看看哪个函数导致管道损坏以上是关于通过 Beast websocket 发送连续数据块时无法找到“Broken Pipe”错误的原因的主要内容,如果未能解决你的问题,请参考以下文章
Boost :: Beast Websocket双向流(C ++)
通过 Websockets 从 Python Flask 服务器连续向客户端发送数据
通过Websocket从Python Flask服务器连续向客户端发送数据
Boost Beast websocket 服务器异步接受失败,缓冲区溢出