C ++ - Boost streambuf放气的行为不一致?

Posted

技术标签:

【中文标题】C ++ - Boost streambuf放气的行为不一致?【英文标题】:C++ - Inconsistent behavior with Boost streambuf deflating? 【发布时间】:2021-02-19 17:26:47 【问题描述】:

问题陈述

我正在尝试使用他们的websocket API 建立与 OKEX 交易所的 websocket 连接。我正在使用Boost::Beast websockets。

问题是OKEX的服务器没有遵循正确的permessage_deflate压缩协议,发送的消息是incorrectly deflated。所以我试图自己夸大消息。问题是它不起作用......让我发疯的是我得到的行为有些不一致。

实际代码

我的代码主要是从previously linked to thread 复制和粘贴的。为简单起见,我删除了所有预处理器宏,并对套接字值进行了硬编码。

inflate 代码取自 Raj Advani's answer here。

这是main.cpp 文件:

#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <istream>
#include "zlib.h"
#include <iostream>

namespace net       = boost::asio;
namespace ssl       = net::ssl;
namespace beast     = boost::beast;
namespace http      = beast::http;
namespace websocket = beast::websocket;
using tcp = net::ip::tcp;
using stream_t = websocket::stream<ssl::stream<tcp::socket>>;

int inflate(const void *src, int srcLen, void *dst, int dstLen) 
    z_stream strm  = 0;
    strm.total_in  = strm.avail_in  = srcLen;
    strm.total_out = strm.avail_out = dstLen;
    strm.next_in   = (Bytef *) src;
    strm.next_out  = (Bytef *) dst;

    strm.zalloc = Z_NULL;
    strm.zfree  = Z_NULL;
    strm.opaque = Z_NULL;

    int err = -1;
    int ret = -1;

    err = inflateInit2(&strm, (15 + 32)); //15 window bits, and the +32 tells zlib to to detect if using gzip or zlib
    if (err == Z_OK) 
        err = inflate(&strm, Z_FINISH);
        if (err == Z_STREAM_END) 
            ret = strm.total_out;
        
        else 
            inflateEnd(&strm);
            return err;
        
    
    else 
        inflateEnd(&strm);
        return err;
    

    inflateEnd(&strm);
    return ret;



int main(int argc, char** argv) 

    std::string host = "real.okex.com";
    auto const port  = "8443";
    auto const path  = "/ws/v3";

    net::io_context ioc;
    ssl::context ctx ssl::context::sslv23 ;
    tcp::resolver resolver ioc ;
    stream_t s ioc, ctx ;
    ctx.set_verify_mode(ssl::verify_none);
    tcp::resolver::results_type results = resolver.resolve(host, port);
    net::connect(
            beast::get_lowest_layer(s),
            //s.next_layer().next_layer(),
            results.begin());

    // SSL handshake
    s.next_layer().handshake(ssl::stream_base::client);
    s.handshake(host + ":" + port, path);

    std::cout << "connected." << std::endl;

    // send request to the websocket
    s.write(net::buffer("'op':'subscribe', 'args':['spot/ticker:ETH-USDT']"));

    
        net::streambuf buffer;
        s.read(buffer);

//        auto data_it = buffer.data().begin();
//        std::cout<<"Iterating over data of size:" << buffer.data().size()<<endl; // LINE 85
//        int i = 0;
//        while (data_it != buffer.data().end()) 
//            std::cout << "buffer data["<<i++<<"] size:" << (data_it->size())<<endl;
//            data_it++;
//        

        net::streambuf out_buffer;
        const int error_code_out = inflate(&buffer, buffer.size(), &out_buffer, 10000000);

        std::cout << "received. size:"<<buffer.size()<<" data: "<< &buffer << std::endl;
        std::cout << "deflated. error?"<< error_code_out << " data: " << &out_buffer << std::endl;
    

代码输出+问题

通货膨胀说buffer的大小是117。我认为这是合理的,但由于某种原因,我在解压时得到Z_DATA_ERROR,让我相信还有更多的数据需要解析...... .

所以我查阅了net::streambuf 的文档,发现显然有多个可以读取的缓冲区,所以也许我只使用了一个缓冲区?我运行了注释掉的代码(不包括中间的LINE 85 行)并且它从未通过循环......我认为这很奇怪。我输入那条线,然后突然间我有了几百个缓冲区? (截断的)输出类似于:

connected.
Iterating over data of size:117
buffer data[0] size:117
buffer data[1] size:72198326954657960
buffer data[2] size:140735485986592
buffer data[3] size:140618848326656
buffer data[4] size:140618848326656
.. many more lines of this...
buffer data[121] size:7089075335985461349
buffer data[122] size:3472329396561475632
buffer data[123] size:8747116609081390898
buffer data[124] size:3472329396561475632
buffer data[125] size:3472387902693336678
buffer data[126] size:
Process finished with exit code 139 (interrupted by signal 11: SIGSEGV)

如您所见,它崩溃了。我不知道发生了什么。我现在不知道如何解码streambuf ......而the documentation 似乎假设了很多我没有的背景知识。我尝试使用buffer.data(),将buffer 转换为char* 数组,所有这些都导致我出现完全相同的行为......

不知道该怎么办。欢迎任何帮助


供参考:Python 实现

import websockets
import asyncio
import zlib

def inflate(data):
    decompress = zlib.decompressobj(-zlib.MAX_WBITS)
    inflated = decompress.decompress(data)
    inflated += decompress.flush()
    return inflated

async def main():
    client = await websockets.connect("wss://real.okex.com:8443/ws/v3")
    await client.send("'op':'subscribe', 'args':['spot/ticker:ETH-USDT']")
    r = await client.recv()
    print(len(r), r)
    print(inflate(r))


if __name__ == '__main__':
    asyncio.run(main())

【问题讨论】:

【参考方案1】:

附言:

最后我记得之前看到过OKEX返回的特定类型的乱码,确实我之前看过这个服务器。

这个问题在概念上与Boost inflate algorithm decompress 重复,但您的特定代码有(很多)更大的问题,值得分析:

我很困惑。这是怎么编译的?

const int error_code_out = inflate(&buffer, buffer.size(), &out_buffer, 10000000);

Buffer 不是 char 数组或类似数组。它甚至不是 POD 类型。这是streambuf。就这样使用它。

您抱怨“这有点不一致”?这完全有道理,因为这就是您所要求的:调用 Undefined Behaviour 是获得“不一致”的好方法。或鼻守护进程。

现在,接下来是:为什么编译器不警告?

罪魁祸首是::inflate重载了同名的ZLIB函数。让我们把我们的放在一个命名空间中来解开。

接下来,它需要void* 参数。呃。他们是reinterpet_cast&lt;&gt;-ed 到Byte*太多讨厌这段代码。它不顾一切地使用 C 风格的强制转换,抛弃 const,完全无视这个论点甚至不是 POD。

让我们让它稍微安全:

namespace mylib 
    int inflate(uint8_t const *src, int srcLen, uint8_t *dst, int dstLen) 
        z_stream strm  ;
        strm.total_in  = strm.avail_in  = srcLen;
        strm.total_out = strm.avail_out = dstLen;
        strm.next_in = const_cast<Bytef*>(static_cast<Bytef const*>(src));
        strm.next_out = static_cast<Bytef*>(dst);

我们不要掩饰你甚至明确地、故意地 承诺输出缓冲区大小准确的功能 10000000。你应该问自己,当你 写的。

现在代码表达了意图,我们可以期待编译器诊断我们的错误。当然,它确实如此,因为 C++ 编译器就是这样做的。

修复调用

让我们避免与net::streambuf 混淆。您可以将字符串或向量用作动态缓冲区。这可能并不总是那么有效,但让我们在这里关注可理解的代码:

std::vector<uint8_t> in, out;
auto in_buffer = net::dynamic_buffer(in);
s.read(in_buffer);

out.resize(1024); // make sure it's enough
const int err = mylib::inflate(
        in.data(), in.size(),
        out.data(), out.size());

看,现在你知道你正在传递什么。而且会没事的。

让我们避免打印输入数据(这是二进制乱码......)。

std::cout << "received. size:" << in_buffer.size() << std::endl;
std::cout << "deflated. error?" << err << " data: "
          << std::string(out.begin(), out.end())
          << std::endl;

现在,膨胀仍然失败,但这是服务器的问题,请参阅Boost inflate algorithm decompress - 它还显示了缓冲区处理的一些替代方案。

固定代码

#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <istream>
#include "zlib.h"
#include <iostream>

namespace net       = boost::asio;
namespace ssl       = net::ssl;
namespace beast     = boost::beast;
namespace http      = beast::http;
namespace websocket = beast::websocket;
using tcp = net::ip::tcp;
using stream_t = websocket::stream<ssl::stream<tcp::socket>>;

namespace mylib 
    int inflate(uint8_t const *src, int srcLen, uint8_t *dst, int dstLen) 
        z_stream strm  ;
        strm.total_in  = strm.avail_in  = srcLen;
        strm.total_out = strm.avail_out = dstLen;
        strm.next_in = const_cast<Bytef*>(static_cast<Bytef const*>(src));
        strm.next_out = static_cast<Bytef*>(dst);

        strm.zalloc = Z_NULL;
        strm.zfree  = Z_NULL;
        strm.opaque = Z_NULL;

        int err = -1;
        int ret = -1;

        err = inflateInit2(&strm, (15 + 32)); //15 window bits, and the +32 tells zlib to to detect if using gzip or zlib
        if (err == Z_OK) 
            err = inflate(&strm, Z_FINISH);
            if (err == Z_STREAM_END) 
                ret = strm.total_out;
            
            else 
                inflateEnd(&strm);
                return err;
            
        
        else 
            inflateEnd(&strm);
            return err;
        

        inflateEnd(&strm);
        return ret;
    


int main() 
    std::string host = "real.okex.com";
    auto const port  = "8443";
    auto const path  = "/ws/v3";

    net::io_context ioc;
    tcp::resolver resolver ioc ;

    ssl::context ctx  ssl::context::sslv23 ;
    ctx.set_verify_mode(ssl::verify_none);

    stream_t s ioc, ctx ;
    net::connect(beast::get_lowest_layer(s), resolver.resolve(host, port));

    // SSL handshake
    s.next_layer().handshake(ssl::stream_base::client);

    
        websocket::permessage_deflate opt;
        opt.client_enable = true; // for clients
        opt.server_enable = true; // for servers
        s.set_option(opt);
    

    s.handshake(host + ":" + port, path);

    std::cerr << "connected." << std::endl;

    // send request to the websocket
    s.write(net::buffer("'op':'subscribe', 'args':['spot/ticker:ETH-USDT']"));

    
        std::vector<uint8_t> in, out;
        auto in_buffer = net::dynamic_buffer(in);
        s.read(in_buffer);

        // std::cout.write(reinterpret_cast<char const*>(in.data()), in.size());

        out.resize(1024); // make sure it's enough
        const int err = mylib::inflate(
                in.data(), in.size(),
                out.data(), out.size());

        std::cerr << "received. size:" << in_buffer.size() << std::endl;
        //std::cerr << "received. data:" << std::string(in.begin(), in.end()) << std::endl;
        std::cerr << "deflated. error?" << err << " data: "
                  << std::string(out.begin(), out.end())
                  << std::endl;
    

【讨论】:

首先,感谢您对这段代码的耐心等待,我对 C++ 并不陌生,但对这种类型的工作绝对是新手。我将最大大小写为 10000,我从来不知道我必须相应地调整缓冲区的大小......它为什么会破裂是有道理的 我想我需要更多地了解dynamic_buffer 的作用...... 您的问题不在于您没有调整缓冲区的大小。您将 void* 传递给 C++ 类,就好像它指向 100000 字节的缓冲区:) 无论如何,我终于解决了它。见my new answer【参考方案2】:

我有一个older answer 来解决代码问题。我已经找到了解决剩余问题的方法:服务器响应。

解决方案

OKEX 不仅不遵守 WS 标准来启用每条消息的通缩,而且它还会突然终止数据。但是,事实证明,如果您保留部分膨胀的结果,实际上是可以的。

我实现它的方式是直接使用 ZLIB,而是使用beast::zlib::inflate_stream。这有一个更灵活的界面,可以让我们得到我们需要的结果:

namespace mylib 
    auto inflate(std::vector<uint8_t> const& in, std::vector<uint8_t>& out) 
        boost::system::error_code ec;
        beast::zlib::z_params zp;
        zp.next_in   = (Bytef*)in.data();
        zp.avail_in  = in.size();
        zp.next_out  = out.data();
        zp.avail_out = out.size();

        beast::zlib::inflate_stream zs;
        zs.write(zp, beast::zlib::Flush::full, ec);

        return ec;
    

现在我们像这样使用它:

std::vector<uint8_t> in, out;
auto in_buffer = net::dynamic_buffer(in);
s.read(in_buffer);

out.resize(1024); // make sure it's enough
auto ec = mylib::inflate(in, out);

std::cout << "deflated. " << ec.message() << std::endl;
std::cout << std::string(out.begin(), out.end()) << std::endl;

它会打印出来

connected.
deflated. unexpected end of deflate stream
"event":"error","message":"Unrecognized request: 'op':'subscribe', 'args':['spot/ticker:ETH-USDT']\u0000","errorCode":30039

因此,尽管放气流意外结束,但数据是有效且完整的 JSON。

PS.2:请参阅newer answer,它设法克服了最后一道障碍

完整列表

Live On Compiler Exporer

#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <istream>
#include "zlib.h"
#include <iostream>

namespace net       = boost::asio;
namespace ssl       = net::ssl;
namespace beast     = boost::beast;
namespace http      = beast::http;
namespace websocket = beast::websocket;
using tcp = net::ip::tcp;
using stream_t = websocket::stream<ssl::stream<tcp::socket>>;

namespace mylib 
    auto inflate(std::vector<uint8_t> const& in, std::vector<uint8_t>& out) 
        boost::system::error_code ec;
        beast::zlib::z_params zp;
        zp.next_in   = (Bytef*)in.data();
        zp.avail_in  = in.size();
        zp.next_out  = out.data();
        zp.avail_out = out.size();

        beast::zlib::inflate_stream zs;
        zs.write(zp, beast::zlib::Flush::full, ec);

        return ec;
    


int main() 
    std::string host = "real.okex.com";
    auto const port  = "8443";
    auto const path  = "/ws/v3";

    net::io_context ioc;
    tcp::resolver resolver ioc ;

    ssl::context ctx  ssl::context::sslv23 ;
    ctx.set_verify_mode(ssl::verify_none);

    stream_t s ioc, ctx ;
    net::connect(beast::get_lowest_layer(s), resolver.resolve(host, port));

    // SSL handshake
    s.next_layer().handshake(ssl::stream_base::client);

    
        websocket::permessage_deflate opt;
        opt.client_enable = true; // for clients
        opt.server_enable = true; // for servers
        s.set_option(opt);
    

    s.handshake(host + ":" + port, path);

    std::cout << "connected." << std::endl;

    // send request to the websocket
    s.write(net::buffer("'op':'subscribe', 'args':['spot/ticker:ETH-USDT']"));

    
        std::vector<uint8_t> in, out;
        auto in_buffer = net::dynamic_buffer(in);
        s.read(in_buffer);

        out.resize(1024); // make sure it's enough
        auto ec = mylib::inflate(in, out);

        std::cout << "deflated. " << ec.message() << std::endl;
        std::cout << std::string(out.begin(), out.end()) << std::endl;
    

【讨论】:

您能解释一下流的意外结束是什么意思吗?如果您查看我的 Python 实现(请参阅问题中的更新),它完全解释了您的正确性。同样的响应在 Python 中实际上是 113 个字节 主要是想利用这个机会更好地学习 C++。这是整个项目的主要目标:) 这意味着算法期望其他东西继续/完成数据。与 Python 进行比较确实很好。显然,Python 只是默默地忽略了尾随字节。 当然不是 NUL 字符。有一个框架协议可以告诉双方预期的消息大小:tools.ietf.org/html/rfc6455#section-5.1(数据框架/概述)。我的直觉是 Python 使用它来读取电线,但在放气时会忽略长度。 更有意义。我使用过类似的格式,但用于图像。感谢您为此提供的所有帮助!

以上是关于C ++ - Boost streambuf放气的行为不一致?的主要内容,如果未能解决你的问题,请参考以下文章

boost::asio::streambuf 通过 https 检索 xml 数据

使用 boost::asio::async_wait_until 和 boost::asio::streambuf

boost::asio::streambuf 基本用法和注意事项

在 boost::asio 中使用具有可调整大小的 streambuf 的相同 istream

将 streambuf 的内容复制到字符串

Boost asio - 从标准输入异步读取已建立的字符数