使用stream load向doris写数据的案例
Posted Z-hhhhh
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用stream load向doris写数据的案例相关的知识,希望对你有一定的参考价值。
使用stream load向doris写数据的案例
代码涉及到的依赖如下
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
Java代码如下
public class DorisStreamLoader
// FE IP Address
private final static String HOST = "xxx.xx.xx.xxx";
// FE port
private final static int PORT = 8030;
// db name
private final static String DATABASE = "test_db";
// table name
private final static String TABLE = "...";
//user name
private final static String USER = "xxx";
//user password
private final static String PASSWD = "xxxx";
//The path of the local file to be imported
private final static String LOAD_FILE_NAME = "......csv";
//http path of stream load task submission
private final static String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
HOST, PORT, DATABASE, TABLE);
//构建HTTP客户端
private final static HttpClientBuilder httpClientBuilder = HttpClients
.custom()
.setRedirectStrategy(new DefaultRedirectStrategy()
@Override
protected boolean isRedirectable(String method)
// If the connection target is FE, you need to deal with 307 redirect。
return true;
);
/**
* 文件数据导入
* @param file
* @throws Exception
*/
public void load(File file) throws Exception
try (CloseableHttpClient client = httpClientBuilder.build())
HttpPut put = new HttpPut(loadUrl);
put.removeHeaders(HttpHeaders.CONTENT_LENGTH);
put.removeHeaders(HttpHeaders.TRANSFER_ENCODING);
put.setHeader(HttpHeaders.EXPECT, "100-continue");
put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(USER, PASSWD));
// You can set stream load related properties in the Header, here we set label and column_separator.
put.setHeader("label", UUID.randomUUID().toString());
put.setHeader("column_separator", "#");
// Set up the import file. Here you can also use StringEntity to transfer arbitrary data.
FileEntity entity = new FileEntity(file);
put.setEntity(entity);
try (CloseableHttpResponse response = client.execute(put))
String loadResult = "";
if (response.getEntity() != null)
loadResult = EntityUtils.toString(response.getEntity());
final int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200)
throw new IOException(String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult));
System.out.println("Get load result: " + loadResult);
/**
* JSON格式的数据导入
* @param jsonData
* @throws Exception
*/
public void loadJson(String jsonData) throws Exception
try (CloseableHttpClient client = httpClientBuilder.build())
HttpPut put = new HttpPut(loadUrl);
put.removeHeaders(HttpHeaders.CONTENT_LENGTH);
put.removeHeaders(HttpHeaders.TRANSFER_ENCODING);
put.setHeader(HttpHeaders.EXPECT, "100-continue");
put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(USER, PASSWD));
// You can set stream load related properties in the Header, here we set label and column_separator.
put.setHeader("label", UUID.randomUUID().toString());
put.setHeader("column_separator", ",");
put.setHeader("format", "json");
// Set up the import file. Here you can also use StringEntity to transfer arbitrary data.
StringEntity entity = new StringEntity(jsonData,"UTF-8");
put.setEntity(entity);
try (CloseableHttpResponse response = client.execute(put))
String loadResult = "";
if (response.getEntity() != null)
loadResult = EntityUtils.toString(response.getEntity());
final int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200)
throw new IOException(String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult));
System.out.println("Get load result: " + loadResult);
/**
* 封装认证信息
* @param username
* @param password
* @return
*/
private String basicAuthHeader(String username, String password)
final String tobeEncode = username + ":" + password;
byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
return "Basic " + new String(encoded);
public static void main(String[] args) throws Exception
DorisStreamLoader loader = new DorisStreamLoader();
//file load
/*File file = new File(LOAD_FILE_NAME);
loader.load(file);*/
//json load
String jsonData = "";
loader.loadJson(jsonData);
需要注意的是:JSON需要转码,否则写入doris中后会出现乱码现象
官网建议不要过于频繁的使用stream load写数据,建议使用流式的方式。stream load适合微批处理。后期准备测一下五秒一次写入数据会怎么样。
参考大佬的文章:https://www.jianshu.com/p/01e47ae333d8
以上是关于使用stream load向doris写数据的案例的主要内容,如果未能解决你的问题,请参考以下文章