如何在 Vert.x (java) 中使用 Jackson 流式传输 JSON 结果

Posted

技术标签:

【中文标题】如何在 Vert.x (java) 中使用 Jackson 流式传输 JSON 结果【英文标题】:How to stream JSON result with Jackson in Vert.x (java) 【发布时间】:2014-12-22 15:49:41 【问题描述】:

我正在使用 Vert.x 为我的 Java 类数据库服务制作一个 REST api。 将 JSON 结果作为 String 写入请求的流中并不太难,如下所示:

...
routeMatcher.get("/myservice/api/v1/query/:query", req -> 

    // get query
    String querySring = req.params().get("query");           
    Query query = jsonMapper.readValue(querySring, Query.class);

    // my service creates a list of resulting records...
    List<Record> result = myservice.query(query);                
    String jsonResult = jsonMapper.writeValueAsString(result);

    // write entire string to response
    req.response().headers().set("Content-Type", "application/json; charset=UTF-8");
    req.response().end(jsonResult);    
);
...

但是我想使用 Jackson 的方法将 Java 列表流式传输到请求对象:

ObjectMapper objectMapper = new ObjectMapper();
objectMapper.writeValue(Outputstream, result);

但我不知道如何将 Jackson 的 Outputstream 参数连接到 Vert.x 的 re.response(),因为他们有自己的 Buffer 系统,似乎与 Jackson 的 java.io.Outputstream 参数不兼容。

我不能将 Jackson 与 Vert.x 结合使用吗?我应该使用 Vert.x 自己的JSON library 手动编写自定义序列化程序吗?其他建议?

【问题讨论】:

【参考方案1】:

我假设您正在生成巨大的 JSON 文档,因为小的字符串输出就足够了:objectMapper.writeValue(&lt;String&gt;, result);

流有问题。 ObjectMapper 不知道结果大小,您最终会遇到异常:

java.lang.IllegalStateException: You must set the Content-Length header to be the total size of the message body BEFORE sending any data if you are not using HTTP chunked encoding.
        at org.vertx.java.core.http.impl.DefaultHttpServerResponse.write(DefaultHttpServerResponse.java:474)

因此,在您的示例中,我将使用临时文件作为 JSON 输出,然后将它们刷新到响应中(我尚未测试代码)

File tmpFile = File.createTempFile("tmp", ".json");
mapper.writeValue(tmpFile, result);
req.response().sendFile(tmpFile.getAbsolutePath(), (result) -> tmpFile.delete());

如果您最初知道内容长度,可以使用以下代码将 OutputStream 映射到 WriteStream

import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.streams.WriteStream;

import java.io.IOException;
import java.io.OutputStream;

public class OutputWriterStream extends OutputStream 

    public WriteStream writeStream;
    public Runnable closeHandler;

    @Override
    public void write(int b) throws IOException 
        throw new UnsupportedOperationException();
    

    @Override
    public void write(byte[] b, int off, int len) throws IOException 
        if (off == 0 && len == b.length) 
            writeStream.write(new Buffer(b));
            return;
        

        byte[] bytes = new byte[len];
        System.arraycopy(b, off, bytes, 0, len);
        writeStream.write(new Buffer(bytes));
    

    @Override
    public void write(byte[] b) throws IOException 
        writeStream.write(new Buffer(b));
    

    @Override
    public void close() throws IOException 
        closeHandler.run();
    

【讨论】:

【参考方案2】:

这可能会好一点(并针对 Vertx3 进行了更新)答案:

import io.vertx.core.file.AsyncFile;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.streams.WriteStream;

import java.io.IOException;
import java.io.OutputStream;

public class OutputWriterStream extends OutputStream 

    public OutputWriterStream(final WriteStream response) 
        this.response = response;
        this.buffer = new byte[8192];
    

    @Override
    public synchronized void write(final int b) throws IOException 
        buffer[counter++] = (byte) b;

        if (counter >= buffer.length) 
            flush();
        
    

    @Override
    public void flush() throws IOException 
        super.flush();

        if (counter > 0) 
            byte[] remaining = buffer;

            if (counter < buffer.length) 
                remaining = new byte[counter];

                System.arraycopy(buffer, 0, remaining, 0, counter);
            

            response.write(Buffer.buffer(remaining));
            counter = 0;
        
    

    @Override
    public void close() throws IOException 
        flush();

        super.close();

        if (response instanceof HttpServerResponse) 
            try 
                response.end();
            
            catch (final IllegalStateException ignore) 
            
        
        else if (response instanceof AsyncFile) 
            ((AsyncFile) response).close();
        
    

    private final WriteStream<Buffer> response;
    private final byte[] buffer;
    private int counter = 0;


【讨论】:

以上是关于如何在 Vert.x (java) 中使用 Jackson 流式传输 JSON 结果的主要内容,如果未能解决你的问题,请参考以下文章

Vert.x!这是目前最快的 Java 框架

Vert.x 操作数据库

java 11 java.net.http.Websocket 与 undertow 或 vert.x、netty 相比效率如何

Vert.x!目前最快的 Java 框架

Vert.x中EventBus中的使用

Vert.x 操作Redis