C ++ - Boost streambuf放气的行为不一致?
Posted
技术标签:
【中文标题】C ++ - Boost streambuf放气的行为不一致?【英文标题】:C++ - Inconsistent behavior with Boost streambuf deflating? 【发布时间】:2021-02-19 17:26:47 【问题描述】:问题陈述
我正在尝试使用他们的websocket API 建立与 OKEX 交易所的 websocket 连接。我正在使用Boost::Beast
websockets。
问题是OKEX的服务器没有遵循正确的permessage_deflate
压缩协议,发送的消息是incorrectly deflated。所以我试图自己夸大消息。问题是它不起作用......让我发疯的是我得到的行为有些不一致。
实际代码
我的代码主要是从previously linked to thread 复制和粘贴的。为简单起见,我删除了所有预处理器宏,并对套接字值进行了硬编码。
inflate
代码取自 Raj Advani's answer here。
这是main.cpp
文件:
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <istream>
#include "zlib.h"
#include <iostream>
namespace net = boost::asio;
namespace ssl = net::ssl;
namespace beast = boost::beast;
namespace http = beast::http;
namespace websocket = beast::websocket;
using tcp = net::ip::tcp;
using stream_t = websocket::stream<ssl::stream<tcp::socket>>;
int inflate(const void *src, int srcLen, void *dst, int dstLen)
z_stream strm = 0;
strm.total_in = strm.avail_in = srcLen;
strm.total_out = strm.avail_out = dstLen;
strm.next_in = (Bytef *) src;
strm.next_out = (Bytef *) dst;
strm.zalloc = Z_NULL;
strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
int err = -1;
int ret = -1;
err = inflateInit2(&strm, (15 + 32)); //15 window bits, and the +32 tells zlib to to detect if using gzip or zlib
if (err == Z_OK)
err = inflate(&strm, Z_FINISH);
if (err == Z_STREAM_END)
ret = strm.total_out;
else
inflateEnd(&strm);
return err;
else
inflateEnd(&strm);
return err;
inflateEnd(&strm);
return ret;
int main(int argc, char** argv)
std::string host = "real.okex.com";
auto const port = "8443";
auto const path = "/ws/v3";
net::io_context ioc;
ssl::context ctx ssl::context::sslv23 ;
tcp::resolver resolver ioc ;
stream_t s ioc, ctx ;
ctx.set_verify_mode(ssl::verify_none);
tcp::resolver::results_type results = resolver.resolve(host, port);
net::connect(
beast::get_lowest_layer(s),
//s.next_layer().next_layer(),
results.begin());
// SSL handshake
s.next_layer().handshake(ssl::stream_base::client);
s.handshake(host + ":" + port, path);
std::cout << "connected." << std::endl;
// send request to the websocket
s.write(net::buffer("'op':'subscribe', 'args':['spot/ticker:ETH-USDT']"));
net::streambuf buffer;
s.read(buffer);
// auto data_it = buffer.data().begin();
// std::cout<<"Iterating over data of size:" << buffer.data().size()<<endl; // LINE 85
// int i = 0;
// while (data_it != buffer.data().end())
// std::cout << "buffer data["<<i++<<"] size:" << (data_it->size())<<endl;
// data_it++;
//
net::streambuf out_buffer;
const int error_code_out = inflate(&buffer, buffer.size(), &out_buffer, 10000000);
std::cout << "received. size:"<<buffer.size()<<" data: "<< &buffer << std::endl;
std::cout << "deflated. error?"<< error_code_out << " data: " << &out_buffer << std::endl;
代码输出+问题
通货膨胀说buffer
的大小是117。我认为这是合理的,但由于某种原因,我在解压时得到Z_DATA_ERROR
,让我相信还有更多的数据需要解析...... .
所以我查阅了net::streambuf
的文档,发现显然有多个可以读取的缓冲区,所以也许我只使用了一个缓冲区?我运行了注释掉的代码(不包括中间的LINE 85
行)并且它从未通过循环......我认为这很奇怪。我输入那条线,然后突然间我有了几百个缓冲区? (截断的)输出类似于:
connected.
Iterating over data of size:117
buffer data[0] size:117
buffer data[1] size:72198326954657960
buffer data[2] size:140735485986592
buffer data[3] size:140618848326656
buffer data[4] size:140618848326656
.. many more lines of this...
buffer data[121] size:7089075335985461349
buffer data[122] size:3472329396561475632
buffer data[123] size:8747116609081390898
buffer data[124] size:3472329396561475632
buffer data[125] size:3472387902693336678
buffer data[126] size:
Process finished with exit code 139 (interrupted by signal 11: SIGSEGV)
如您所见,它崩溃了。我不知道发生了什么。我现在不知道如何解码streambuf
......而the documentation 似乎假设了很多我没有的背景知识。我尝试使用buffer.data()
,将buffer
转换为char*
数组,所有这些都导致我出现完全相同的行为......
不知道该怎么办。欢迎任何帮助
供参考:Python 实现
import websockets
import asyncio
import zlib
def inflate(data):
decompress = zlib.decompressobj(-zlib.MAX_WBITS)
inflated = decompress.decompress(data)
inflated += decompress.flush()
return inflated
async def main():
client = await websockets.connect("wss://real.okex.com:8443/ws/v3")
await client.send("'op':'subscribe', 'args':['spot/ticker:ETH-USDT']")
r = await client.recv()
print(len(r), r)
print(inflate(r))
if __name__ == '__main__':
asyncio.run(main())
【问题讨论】:
【参考方案1】:附言:
最后我记得之前看到过OKEX返回的特定类型的乱码,确实我之前看过这个服务器。
这个问题在概念上与Boost inflate algorithm decompress 重复,但您的特定代码有(很多)更大的问题,值得分析:
我很困惑。这是怎么编译的?
const int error_code_out = inflate(&buffer, buffer.size(), &out_buffer, 10000000);
Buffer 不是 char 数组或类似数组。它甚至不是 POD 类型。这是streambuf
。就这样使用它。
您抱怨“这有点不一致”?这完全有道理,因为这就是您所要求的:调用 Undefined Behaviour 是获得“不一致”的好方法。或鼻守护进程。
现在,接下来是:为什么编译器不警告?
罪魁祸首是::inflate
重载了同名的ZLIB函数。让我们把我们的放在一个命名空间中来解开。
接下来,它需要void*
参数。呃。他们是reinterpet_cast<>
-ed 到Byte*
。 太多讨厌这段代码。它不顾一切地使用 C 风格的强制转换,抛弃 const
,完全无视这个论点甚至不是 POD。
让我们让它稍微安全:
namespace mylib
int inflate(uint8_t const *src, int srcLen, uint8_t *dst, int dstLen)
z_stream strm ;
strm.total_in = strm.avail_in = srcLen;
strm.total_out = strm.avail_out = dstLen;
strm.next_in = const_cast<Bytef*>(static_cast<Bytef const*>(src));
strm.next_out = static_cast<Bytef*>(dst);
我们不要掩饰你甚至明确地、故意地 承诺输出缓冲区大小准确的功能
10000000
。你应该问自己,当你 写的。
现在代码表达了意图,我们可以期待编译器诊断我们的错误。当然,它确实如此,因为 C++ 编译器就是这样做的。
修复调用
让我们避免与net::streambuf
混淆。您可以将字符串或向量用作动态缓冲区。这可能并不总是那么有效,但让我们在这里关注可理解的代码:
std::vector<uint8_t> in, out;
auto in_buffer = net::dynamic_buffer(in);
s.read(in_buffer);
out.resize(1024); // make sure it's enough
const int err = mylib::inflate(
in.data(), in.size(),
out.data(), out.size());
看,现在你知道你正在传递什么。而且会没事的。
让我们避免打印输入数据(这是二进制乱码......)。
std::cout << "received. size:" << in_buffer.size() << std::endl;
std::cout << "deflated. error?" << err << " data: "
<< std::string(out.begin(), out.end())
<< std::endl;
现在,膨胀仍然失败,但这是服务器的问题,请参阅Boost inflate algorithm decompress - 它还显示了缓冲区处理的一些替代方案。
固定代码
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <istream>
#include "zlib.h"
#include <iostream>
namespace net = boost::asio;
namespace ssl = net::ssl;
namespace beast = boost::beast;
namespace http = beast::http;
namespace websocket = beast::websocket;
using tcp = net::ip::tcp;
using stream_t = websocket::stream<ssl::stream<tcp::socket>>;
namespace mylib
int inflate(uint8_t const *src, int srcLen, uint8_t *dst, int dstLen)
z_stream strm ;
strm.total_in = strm.avail_in = srcLen;
strm.total_out = strm.avail_out = dstLen;
strm.next_in = const_cast<Bytef*>(static_cast<Bytef const*>(src));
strm.next_out = static_cast<Bytef*>(dst);
strm.zalloc = Z_NULL;
strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
int err = -1;
int ret = -1;
err = inflateInit2(&strm, (15 + 32)); //15 window bits, and the +32 tells zlib to to detect if using gzip or zlib
if (err == Z_OK)
err = inflate(&strm, Z_FINISH);
if (err == Z_STREAM_END)
ret = strm.total_out;
else
inflateEnd(&strm);
return err;
else
inflateEnd(&strm);
return err;
inflateEnd(&strm);
return ret;
int main()
std::string host = "real.okex.com";
auto const port = "8443";
auto const path = "/ws/v3";
net::io_context ioc;
tcp::resolver resolver ioc ;
ssl::context ctx ssl::context::sslv23 ;
ctx.set_verify_mode(ssl::verify_none);
stream_t s ioc, ctx ;
net::connect(beast::get_lowest_layer(s), resolver.resolve(host, port));
// SSL handshake
s.next_layer().handshake(ssl::stream_base::client);
websocket::permessage_deflate opt;
opt.client_enable = true; // for clients
opt.server_enable = true; // for servers
s.set_option(opt);
s.handshake(host + ":" + port, path);
std::cerr << "connected." << std::endl;
// send request to the websocket
s.write(net::buffer("'op':'subscribe', 'args':['spot/ticker:ETH-USDT']"));
std::vector<uint8_t> in, out;
auto in_buffer = net::dynamic_buffer(in);
s.read(in_buffer);
// std::cout.write(reinterpret_cast<char const*>(in.data()), in.size());
out.resize(1024); // make sure it's enough
const int err = mylib::inflate(
in.data(), in.size(),
out.data(), out.size());
std::cerr << "received. size:" << in_buffer.size() << std::endl;
//std::cerr << "received. data:" << std::string(in.begin(), in.end()) << std::endl;
std::cerr << "deflated. error?" << err << " data: "
<< std::string(out.begin(), out.end())
<< std::endl;
【讨论】:
首先,感谢您对这段代码的耐心等待,我对 C++ 并不陌生,但对这种类型的工作绝对是新手。我将最大大小写为 10000,我从来不知道我必须相应地调整缓冲区的大小......它为什么会破裂是有道理的 我想我需要更多地了解dynamic_buffer
的作用......
您的问题不在于您没有调整缓冲区的大小。您将 void* 传递给 C++ 类,就好像它指向 100000 字节的缓冲区:) 无论如何,我终于解决了它。见my new answer【参考方案2】:
我有一个older answer 来解决代码问题。我已经找到了解决剩余问题的方法:服务器响应。
解决方案
OKEX 不仅不遵守 WS 标准来启用每条消息的通缩,而且它还会突然终止数据。但是,事实证明,如果您保留部分膨胀的结果,实际上是可以的。
我实现它的方式是不直接使用 ZLIB,而是使用beast::zlib::inflate_stream
。这有一个更灵活的界面,可以让我们得到我们需要的结果:
namespace mylib
auto inflate(std::vector<uint8_t> const& in, std::vector<uint8_t>& out)
boost::system::error_code ec;
beast::zlib::z_params zp;
zp.next_in = (Bytef*)in.data();
zp.avail_in = in.size();
zp.next_out = out.data();
zp.avail_out = out.size();
beast::zlib::inflate_stream zs;
zs.write(zp, beast::zlib::Flush::full, ec);
return ec;
现在我们像这样使用它:
std::vector<uint8_t> in, out;
auto in_buffer = net::dynamic_buffer(in);
s.read(in_buffer);
out.resize(1024); // make sure it's enough
auto ec = mylib::inflate(in, out);
std::cout << "deflated. " << ec.message() << std::endl;
std::cout << std::string(out.begin(), out.end()) << std::endl;
它会打印出来
connected.
deflated. unexpected end of deflate stream
"event":"error","message":"Unrecognized request: 'op':'subscribe', 'args':['spot/ticker:ETH-USDT']\u0000","errorCode":30039
因此,尽管放气流意外结束,但数据是有效且完整的 JSON。
PS.2:请参阅newer answer,它设法克服了最后一道障碍
完整列表
Live On Compiler Exporer
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <istream>
#include "zlib.h"
#include <iostream>
namespace net = boost::asio;
namespace ssl = net::ssl;
namespace beast = boost::beast;
namespace http = beast::http;
namespace websocket = beast::websocket;
using tcp = net::ip::tcp;
using stream_t = websocket::stream<ssl::stream<tcp::socket>>;
namespace mylib
auto inflate(std::vector<uint8_t> const& in, std::vector<uint8_t>& out)
boost::system::error_code ec;
beast::zlib::z_params zp;
zp.next_in = (Bytef*)in.data();
zp.avail_in = in.size();
zp.next_out = out.data();
zp.avail_out = out.size();
beast::zlib::inflate_stream zs;
zs.write(zp, beast::zlib::Flush::full, ec);
return ec;
int main()
std::string host = "real.okex.com";
auto const port = "8443";
auto const path = "/ws/v3";
net::io_context ioc;
tcp::resolver resolver ioc ;
ssl::context ctx ssl::context::sslv23 ;
ctx.set_verify_mode(ssl::verify_none);
stream_t s ioc, ctx ;
net::connect(beast::get_lowest_layer(s), resolver.resolve(host, port));
// SSL handshake
s.next_layer().handshake(ssl::stream_base::client);
websocket::permessage_deflate opt;
opt.client_enable = true; // for clients
opt.server_enable = true; // for servers
s.set_option(opt);
s.handshake(host + ":" + port, path);
std::cout << "connected." << std::endl;
// send request to the websocket
s.write(net::buffer("'op':'subscribe', 'args':['spot/ticker:ETH-USDT']"));
std::vector<uint8_t> in, out;
auto in_buffer = net::dynamic_buffer(in);
s.read(in_buffer);
out.resize(1024); // make sure it's enough
auto ec = mylib::inflate(in, out);
std::cout << "deflated. " << ec.message() << std::endl;
std::cout << std::string(out.begin(), out.end()) << std::endl;
【讨论】:
您能解释一下流的意外结束是什么意思吗?如果您查看我的 Python 实现(请参阅问题中的更新),它完全解释了您的正确性。同样的响应在 Python 中实际上是 113 个字节 主要是想利用这个机会更好地学习 C++。这是整个项目的主要目标:) 这意味着算法期望其他东西继续/完成数据。与 Python 进行比较确实很好。显然,Python 只是默默地忽略了尾随字节。 当然不是 NUL 字符。有一个框架协议可以告诉双方预期的消息大小:tools.ietf.org/html/rfc6455#section-5.1(数据框架/概述)。我的直觉是 Python 使用它来读取电线,但在放气时会忽略长度。 更有意义。我使用过类似的格式,但用于图像。感谢您为此提供的所有帮助!以上是关于C ++ - Boost streambuf放气的行为不一致?的主要内容,如果未能解决你的问题,请参考以下文章
boost::asio::streambuf 通过 https 检索 xml 数据
使用 boost::asio::async_wait_until 和 boost::asio::streambuf
boost::asio::streambuf 基本用法和注意事项