使用Java API 操作Kudu
Posted 杀智勇双全杀
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Java API 操作Kudu相关的知识,希望对你有一定的参考价值。
概述
不是给纯小白看的,也就不谈概念这种废话了,直接上代码。。。使用一个叫KuduPlus的小工具辅助测试。
Maven依赖
<!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 -->
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>jboss</id>
<url>http://repository.jboss.com/nexus/content/groups/public</url>
</repository>
</repositories>
<!-- 版本属性 -->
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<kudu.version>1.9.0-cdh6.2.1</kudu.version>
<junit.version>4.12</junit.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>${kudu.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client-tools</artifactId>
<version>${kudu.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
操作Kudu表
代码太多,使用了//region
和//endregion
,在idea中可以折叠,还是多,又分开写的,合一起也可以。方便在需要的时候直接ctrl+v改参数。
package com.aa.kudu.table;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
//使用Java API对Kudu进行CRUD操作,包含创建表及删除表
public class kuduTableDemo {
//定义kuduClient实例对象
private KuduClient kuduClient = null;
//region Before操作初始化
@Before
public void init() {
//KuduMaster地址信息
String masterAddresses = "192.168.88.20:7051";
//初始化KuduClient实例对象
kuduClient = new KuduClient.KuduClientBuilder(masterAddresses)//建造者模式
.defaultOperationTimeoutMs(10000)//设置Kudu操作的超时时间,默认30s
.defaultSocketReadTimeoutMs(6000)//设置从Socket读数据超时,默认10s
.build();//返回KuduClient类型
}
//endregion
//region 测试连接
@Test
public void testKuduClient() {
System.out.println("kuduClient = " + kuduClient);//kuduClient = org.apache.kudu.client.KuduClient@6e1ec318
}
//endregion
//region 测试创建表create table
/*
创建Kudu表:
create table aa_users(
id int,
name string,
age byte,
primary key(id)
)
*/
//封装方法
private ColumnSchema newColumnSchema(String name, Type type, boolean isKey) {
//创建ColumnSchemaBuilder实例对象
ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type);
column.key(isKey);//设置是否为主键
//构建ColumnSchema
return column.build();
}
@Test
public void createKuduTable() throws KuduException {
//定义各个列,添加到List列表
List<ColumnSchema> columns = new ArrayList<>();
//定义每个列、名称、类型及是否魏主键
columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());//使用默认方法
columns.add(newColumnSchema("name", Type.STRING, false));//使用封装的方法
columns.add(newColumnSchema("age", Type.INT8, false));
//定义schema:public Schema(List<ColumnSchema> columns)
Schema schema = new Schema(columns);
//定义表的属性
CreateTableOptions options = new CreateTableOptions();
//设置分区策略
options.addHashPartitions(Arrays.asList("id"), 3);
//设置副本数目
options.setNumReplicas(1);
//创建Kudu表:public KuduTable createTable(String name, Schema schema, CreateTableOptions builder)
KuduTable kuduTable = kuduClient.createTable("aa_users", schema, options);
System.out.println("kuduTable.getTableId() = " + kuduTable.getTableId());//kuduTable.getTableId() = 291ebdc9de8e44c3a84a3601f13dcf94
}
//endregion
//region 测试删除表(delete table if exists)
@Test
// public void dropKuduTable(String tableName) throws KuduException {//可以传表名
// if(kuduClient.tableExists(tableName)){//if exists再删除,避免报错
// kuduClient.deleteTable(tableName);//按名称删除
// }
// }
public void dropKuduTable() throws KuduException {
if (kuduClient.tableExists("aa_users")) {//if exists再删除,避免报错
System.out.println("存在表aa_users");
kuduClient.deleteTable("aa_users");//按名称删除
System.out.println("已删除表aa_users");
} else {
System.out.println("不存在表aa_users");
}
}
//endregion
//region 创建表(范围分区)
@Test
public void createKuduTableByRange() throws KuduException{
//定义schema信息、列名称、列类型
List<ColumnSchema> columns = new ArrayList<>();
columns.add(new ColumnSchema.ColumnSchemaBuilder("id",Type.INT32).key(true).build());
columns.add(newColumnSchema("name",Type.STRING,false));
columns.add(newColumnSchema("age",Type.INT8,false));
Schema schema = new Schema(columns);
//设置表的属性
CreateTableOptions options = new CreateTableOptions();
//设置分区策略
options.setRangePartitionColumns(Arrays.asList("id"));//设置范围分区字段名称
//id<100
PartialRow upper100 = new PartialRow(schema);
upper100.addInt("id",100);
options.addRangePartition(new PartialRow(schema),upper100);
//100<=id<500
PartialRow lower100 = new PartialRow(schema);
lower100.addInt("id",100);
PartialRow upper500 = new PartialRow(schema);
upper500.addInt("id",500);
options.addRangePartition(lower100,upper500);
//id>=500
PartialRow lower500 = new PartialRow(schema);
lower500.addInt("id",500);
options.addRangePartition(lower500,new PartialRow(schema));
//设置副本数目
options.setNumReplicas(1);
//传递参数,构建表
KuduTable kuduTable = kuduClient.createTable("aa_users_range",schema,options);
System.out.println("kuduTable.getTableId() = " + kuduTable.getTableId());
}
//endregion
//region 创建表(多级分区)
//先哈希再范围,或先哈希再哈希
@Test
public void createKuduTableMulti() throws KuduException{
//构建Schema信息
List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
columnSchemas.add(newColumnSchema("id",Type.INT32,true));
columnSchemas.add(newColumnSchema("age",Type.INT8,true));
columnSchemas.add(newColumnSchema("name",Type.STRING,false));
//定义Schema信息
Schema schema = new Schema(columnSchemas);
//Kudu表的分区策略及分区副本数目设置
CreateTableOptions tableOptions = new CreateTableOptions();
// TODO: 2021/6/29 设置哈希分区
List<String> columnsHash = new ArrayList<>();
columnsHash.add("id");
tableOptions.addHashPartitions(columnsHash,5);
// TODO: 2021/6/29 设置范围分区
List<String> columnsRange = new ArrayList<>();
columnsRange.add("age");
tableOptions.setRangePartitionColumns(columnsRange);
//添加范围分区
PartialRow upper21 = new PartialRow(schema);
upper21.addByte("age",(byte)21);//缺少会报错org.apache.kudu.client.NonRecoverableException: overlapping range partitions: first range partition: UNBOUNDED
tableOptions.addRangePartition(new PartialRow(schema),upper21);
//添加范围分区
PartialRow lower21 = new PartialRow(schema);
lower21.addByte("age",(byte)21);
PartialRow upper41 = new PartialRow(schema);
upper41.addByte("age",(byte)41);
tableOptions.addRangePartition(lower21,upper41);//缺少会报错org.apache.kudu.client.NonRecoverableException: overlapping range partitions
//添加范围分区
PartialRow lower41 = new PartialRow(schema);
lower41.addByte("age",(byte)41);
tableOptions.addRangePartition(lower41,new PartialRow(schema));
//副本数设置
tableOptions.setNumReplicas(1);
//在Kudu中创建表
KuduTable userTable = kuduClient.createTable("aa_users_multi",schema,tableOptions);
System.out.println(userTable.toString());//org.apache.kudu.client.KuduTable@17695df3
/*
node2:8051看到
HASH (id) PARTITIONS 5,
RANGE (age) (
PARTITION VALUES < 21,
PARTITION 21 <= VALUES < 41,
PARTITION VALUES >= 41
)
*/
}
//endregion
//region 添加列
@Test
public void alterKuduTableAddColumn() throws KuduException{
//添加列
AlterTableOptions alterTableOptions = new AlterTableOptions();
alterTableOptions.addColumn("address",Type.STRING,"银河系");
//修改表
AlterTableResponse response = kuduClient.alterTable("aa_users",alterTableOptions);
System.out.println(response.getTableId());//80a90f5ff44a4432a21fff322c8f1659
}
//endregion
//region 删除列
@Test
public void alterKuduTableDropColumn() throws KuduException{
//删除列
AlterTableOptions alterTableOptions = new AlterTableOptions();
alterTableOptions.dropColumn("address");
//修改表
AlterTableResponse response = kuduClient.alterTable("aa_users",alterTableOptions);
System.out.println(response.getTableId());
}
//endregion
//region 释放资源
@After
public void close() throws KuduException {
if (kuduClient != null) {
kuduClient.close();//测试完成后释放资源
}
}
//endregion
}
操作Kudu数据
package com.aa.kudu.data;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Random;
public class kuduDataDemo {
//定义kuduClient实例对象
private KuduClient kuduClient = null;
//region 封装方法
private ColumnSchema newColumnSchema(String name, Type type, boolean isKey) {
//创建ColumnSchemaBuilder实例对象
ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type);
column.key(isKey);//设置是否为主键
//构建ColumnSchema
return column.build();
}
//endregion
//region Before操作初始化
@Before
public void init() {
//KuduMaster地址信息
String masterAddresses = "192.168.88.20:7051";
//初始化KuduClient实例对象
kuduClient = new KuduClient.KuduClientBuilder(masterAddresses)//建造者模式
.defaultOperationTimeoutMs(10000)//设置Kudu操作的超时时间,默认30s
.defaultSocketReadTimeoutMs(6000)//设置从Socket读数据超时,默认10s
.build();//返回KuduClient类型
}
//endregion
//region 测试单条插入insert
@Test
public void insertKuduSingleData() throws KuduException {
//获取操作句柄
KuduTable kuduTable = kuduClient.openTable("aa_users");
//获取kuduSession的实例对象
KuduSession kuduSession 以上是关于使用Java API 操作Kudu的主要内容,如果未能解决你的问题,请参考以下文章
客快物流大数据项目(四十六):Spark操作Kudu dataFrame操作kudu
spark操作kudu,出现异常java.lang.ClassNotFoundException: org.apache.kudu.spark.kudu.DefaultSource
spark操作kudu,出现异常java.lang.ClassNotFoundException: org.apache.kudu.spark.kudu.DefaultSource