客快物流大数据项目(四十二):Java代码操作Kudu
Posted Lansonli
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了客快物流大数据项目(四十二):Java代码操作Kudu相关的知识,希望对你有一定的参考价值。
目录
Java代码操作Kudu
一、构建maven工程
二、导入依赖
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.9.0-cdh6.2.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client-tools</artifactId>
<version>1.9.0-cdh6.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 -->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2_2.11</artifactId>
<version>1.9.0-cdh6.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
三、创建包结构
包名 | 说明 |
cn.it | 代码所在的包目录 |
四、初始化方法
package cn.it;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Type;
import org.apache.kudu.client.KuduClient;
import org.junit.Before;
public class TestKudu
//定义KuduClient客户端对象
private static KuduClient kuduClient;
//定义表名
private static String tableName = "person";
/**
* 初始化方法
*/
@Before
public void init()
//指定master地址
String masterAddress = "node2.cn";
//创建kudu的数据库连接
kuduClient = new KuduClient.KuduClientBuilder(masterAddress).defaultSocketReadTimeoutMs(6000).build();
//构建表schema的字段信息
//字段名称 数据类型 是否为主键
public ColumnSchema newColumn(String name, Type type, boolean isKey)
ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type);
column.key(isKey);
return column.build();
五、创建表
/** 使用junit进行测试
*
* 创建表
* @throws KuduException
*/
@Test
public void createTable() throws KuduException
//设置表的schema
List<ColumnSchema> columns = new LinkedList<ColumnSchema>();
columns.add(newColumn("CompanyId", Type.INT32, true));
columns.add(newColumn("WorkId", Type.INT32, false));
columns.add(newColumn("Name", Type.STRING, false));
columns.add(newColumn("Gender", Type.STRING, false));
columns.add(newColumn("Photo", Type.STRING, false));
Schema schema = new Schema(columns);
//创建表时提供的所有选项
CreateTableOptions tableOptions = new CreateTableOptions();
//设置表的副本和分区规则
LinkedList<String> list = new LinkedList<String>();
list.add("CompanyId");
//设置表副本数
tableOptions.setNumReplicas(1);
//设置range分区
//tableOptions.setRangePartitionColumns(list);
//设置hash分区和分区的数量
tableOptions.addHashPartitions(list, 3);
try
kuduClient.createTable("person", schema, tableOptions);
catch (Exception e)
e.printStackTrace();
kuduClient.close();
六、插入数据
/**
* 向表中加载数据
* @throws KuduException
*/
@Test
public void loadData() throws KuduException
//打开表
KuduTable kuduTable = kuduClient.openTable(tableName);
//创建KuduSession对象 kudu必须通过KuduSession写入数据
KuduSession kuduSession = kuduClient.newSession();
//采用flush方式 手动刷新
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
kuduSession.setMutationBufferSpace(3000);
//准备数据
for(int i=1; i<=10; i++)
Insert insert = kuduTable.newInsert();
//设置字段的内容
insert.getRow().addInt("CompanyId",i);
insert.getRow().addInt("WorkId",i);
insert.getRow().addString("Name","lisi"+i);
insert.getRow().addString("Gender","male");
insert.getRow().addString("Photo","person"+i);
kuduSession.flush();
kuduSession.apply(insert);
kuduSession.close();
kuduClient.close();
七、查询数据
/**
* 查询表数据
* @throws KuduException
*/
@Test
public void queryData() throws KuduException
//打开表
KuduTable kuduTable = kuduClient.openTable(tableName);
//获取scanner扫描器
KuduScanner.KuduScannerBuilder scannerBuilder = kuduClient.newScannerBuilder(kuduTable);
KuduScanner scanner = scannerBuilder.build();
//遍历
while(scanner.hasMoreRows())
RowResultIterator rowResults = scanner.nextRows();
while (rowResults.hasNext())
RowResult result = rowResults.next();
int companyId = result.getInt("CompanyId");
int workId = result.getInt("WorkId");
String name = result.getString("Name");
String gender = result.getString("Gender");
String photo = result.getString("Photo");
System.out.print("companyId:"+companyId+" ");
System.out.print("workId:"+workId+" ");
System.out.print("name:"+name+" ");
System.out.print("gender:"+gender+" ");
System.out.println("photo:"+photo);
//关闭
scanner.close();
kuduClient.close();
八、修改数据
/**
* 修改数据
* @throws KuduException
*/
@Test
public void upDATEData() throws KuduException
//打开表
KuduTable kuduTable = kuduClient.openTable(tableName);
//构建kuduSession对象
KuduSession kuduSession = kuduClient.newSession();
//设置刷新数据模式,自动提交
kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
//更新数据需要获取UpDATE对象
UpDATE upDATE = kuduTable.newUpDATE();
//获取row对象
PartialRow row = upDATE.getRow();
//设置要更新的数据信息
row.addInt("CompanyId",1);
row.addString("Name","kobe");
//操作这个upDATE对象
kuduSession.apply(upDATE);
kuduSession.close();
九、删除数据
/**
* 删除表中的数据
*/
@Test
public void deleteData() throws KuduException
//打开表
KuduTable kuduTable = kuduClient.openTable(tableName);
KuduSession kuduSession = kuduClient.newSession();
//获取Delete对象
Delete delete = kuduTable.newDelete();
//构建要删除的行对象
PartialRow row = delete.getRow();
//设置删除数据的条件
row.addInt("CompanyId",2);
kuduSession.flush();
kuduSession.apply(delete);
kuduSession.close();
kuduClient.close();
十、修改表
package cn.it.kudu;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
/**
* 修改表操作
*/
public class AlterTable
//定义kudu的客户端对象
private static KuduClient kuduClient;
//定义一张表名称
private static String tableName = "person";
/**
* 初始化操作
*/
@Before
public void init()
//指定kudu的master地址
String masterAddress = "node2.cn";
//创建kudu的数据库连接
kuduClient = new KuduClient.KuduClientBuilder(masterAddress).defaultSocketReadTimeoutMs(6000).build();
/**
* 添加列
*/
@Test
public void alterTableAddColumn()
AlterTableOptions alterTableOptions = new AlterTableOptions();
alterTableOptions.addColumn(new ColumnSchema.ColumnSchemaBuilder("Address", Type.STRING).nullable(true).build());
try
kuduClient.alterTable(tableName, alterTableOptions);
catch (KuduException e)
e.printStackTrace();
/**
* 删除列
*/
@Test
public void alterTableDeleteColumn()
AlterTableOptions alterTableOptions = new AlterTableOptions().dropColumn("Address");
try
kuduClient.alterTable(tableName, alterTableOptions);
catch (KuduException e)
e.printStackTrace();
/**
* 添加分区列
*/
@Test
public void alterTableAddRangePartition()
int lowerValue = 110;
int upperValue = 120;
try
KuduTable kuduTable = kuduClient.openTable(tableName);
List<Partition> rangePartitions = kuduTable.getRangePartitions(6000);
boolean flag = true;
for (Partition rangePartition : rangePartitions)
int startKey = rangePartition.getDecodedRangeKeyStart(kuduTable).getInt("Id");
if(startKey == lowerValue)
flag = false;
if(flag)
PartialRow lower = kuduTable.getSchema().newPartialRow();
lower.addInt("Id", lowerValue);
PartialRow upper = kuduTable.getSchema().newPartialRow();
upper.addInt("Id", upperValue);
kuduClient.alterTable(tableName,new AlterTableOptions().addRangePartition(lower, upper));
else
System.out.println("分区已经存在,不能重复创建!");
catch (KuduException e)
e.printStackTrace();
catch (Exception exception)
exception.printStackTrace();
/**
* 删除表
* @throws KuduException
*/
@Test
public void dropTable() throws KuduException
kuduClient.deleteTable(tableName);
十一、删除表
/**
* 删除表
*/
@Test
public void dropTable() throws KuduException
//删除表
DeleteTableResponse response = kuduClient.deleteTable(tableName);
//关闭客户端连接
kuduClient.close();
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢大数据系列文章会每天更新,停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
以上是关于客快物流大数据项目(四十二):Java代码操作Kudu的主要内容,如果未能解决你的问题,请参考以下文章
客快物流大数据项目(四十六):Spark操作Kudu dataFrame操作kudu