使用stream load向doris写数据的案例

Posted Z-hhhhh

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用stream load向doris写数据的案例相关的知识,希望对你有一定的参考价值。

使用stream load向doris写数据的案例

代码涉及到的依赖如下

  <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
        <version>4.5.2</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.47</version>
    </dependency>

Java代码如下

public class DorisStreamLoader 
    // FE IP Address
    private final static String HOST = "xxx.xx.xx.xxx";
    // FE port
    private final static int PORT = 8030;
    // db name
    private final static String DATABASE = "test_db";
    // table name
    private final static String TABLE = "...";
    //user name
    private final static String USER = "xxx";
    //user password
    private final static String PASSWD = "xxxx";
    //The path of the local file to be imported
    private final static String LOAD_FILE_NAME = "......csv";

    //http path of stream load task submission
    private final static String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
            HOST, PORT, DATABASE, TABLE);

    //构建HTTP客户端
    private final static HttpClientBuilder httpClientBuilder = HttpClients
            .custom()
            .setRedirectStrategy(new DefaultRedirectStrategy() 
                @Override
                protected boolean isRedirectable(String method) 
                    // If the connection target is FE, you need to deal with 307 redirect。
                    return true;
                
            );

    /**
     * 文件数据导入
     * @param file
     * @throws Exception
     */
    public void load(File file) throws Exception 
        try (CloseableHttpClient client = httpClientBuilder.build()) 
            HttpPut put = new HttpPut(loadUrl);
            put.removeHeaders(HttpHeaders.CONTENT_LENGTH);
            put.removeHeaders(HttpHeaders.TRANSFER_ENCODING);
            put.setHeader(HttpHeaders.EXPECT, "100-continue");
            put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(USER, PASSWD));

            // You can set stream load related properties in the Header, here we set label and column_separator.
            put.setHeader("label", UUID.randomUUID().toString());
            put.setHeader("column_separator", "#");

            // Set up the import file. Here you can also use StringEntity to transfer arbitrary data.
            FileEntity entity = new FileEntity(file);
            put.setEntity(entity);

            try (CloseableHttpResponse response = client.execute(put)) 
                String loadResult = "";
                if (response.getEntity() != null) 
                    loadResult = EntityUtils.toString(response.getEntity());
                

                final int statusCode = response.getStatusLine().getStatusCode();
                if (statusCode != 200) 
                    throw new IOException(String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult));
                
                System.out.println("Get load result: " + loadResult);
            
        
    

    /**
     * JSON格式的数据导入
     * @param jsonData
     * @throws Exception
     */
    public void loadJson(String jsonData) throws Exception 
        try (CloseableHttpClient client = httpClientBuilder.build()) 
            HttpPut put = new HttpPut(loadUrl);
            put.removeHeaders(HttpHeaders.CONTENT_LENGTH);
            put.removeHeaders(HttpHeaders.TRANSFER_ENCODING);
            put.setHeader(HttpHeaders.EXPECT, "100-continue");
            put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(USER, PASSWD));

            // You can set stream load related properties in the Header, here we set label and column_separator.
            put.setHeader("label", UUID.randomUUID().toString());
            put.setHeader("column_separator", ",");
            put.setHeader("format", "json");

            // Set up the import file. Here you can also use StringEntity to transfer arbitrary data.
            StringEntity entity = new StringEntity(jsonData,"UTF-8");
            put.setEntity(entity);

            try (CloseableHttpResponse response = client.execute(put)) 
                String loadResult = "";
                if (response.getEntity() != null) 
                    loadResult = EntityUtils.toString(response.getEntity());
                

                final int statusCode = response.getStatusLine().getStatusCode();
                if (statusCode != 200) 
                    throw new IOException(String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult));
                
                System.out.println("Get load result: " + loadResult);
            
        
    

    /**
     * 封装认证信息
     * @param username
     * @param password
     * @return
     */
    private String basicAuthHeader(String username, String password) 
        final String tobeEncode = username + ":" + password;
        byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
        return "Basic " + new String(encoded);
    

    public static void main(String[] args) throws Exception 
        DorisStreamLoader loader = new DorisStreamLoader();
        //file load
        /*File file = new File(LOAD_FILE_NAME);
        loader.load(file);*/
        //json load
        String jsonData = "";
        loader.loadJson(jsonData);
      
    

需要注意的是:JSON需要转码,否则写入doris中后会出现乱码现象
官网建议不要过于频繁的使用stream load写数据,建议使用流式的方式。stream load适合微批处理。后期准备测一下五秒一次写入数据会怎么样。

参考大佬的文章:https://www.jianshu.com/p/01e47ae333d8

以上是关于使用stream load向doris写数据的案例的主要内容,如果未能解决你的问题,请参考以下文章

使用stream load向doris写数据的案例

Doris 数据模型及自动分区使用案例

Doris 数据模型及自动分区使用案例

Doris 数据模型及自动分区使用案例

Doris 数据模型及自动分区使用案例

flink doris batch案例