客快物流大数据项目(四十二):Java代码操作Kudu

Posted Lansonli

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了客快物流大数据项目(四十二):Java代码操作Kudu相关的知识,希望对你有一定的参考价值。

目录

Java代码操作Kudu

一、构建maven工程

二、导入依赖

三、​​​​​​​创建包结构

四、​​​​​​​初始化方法

五、​​​​​​​创建表

六、​​​​​​​插入数据

七、​​​​​​​查询数据

八、修改数据

九、​​​​​​​删除数据

十、​​​​​​​修改表

十一、​​​​​​​删除表


Java代码操作Kudu

一、​​​​​​​构建maven工程

二、导入依赖

<repositories>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
</repositories>

<dependencies>
    <dependency>
        <groupId>org.apache.kudu</groupId>
        <artifactId>kudu-client</artifactId>
        <version>1.9.0-cdh6.2.1</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kudu</groupId>
        <artifactId>kudu-client-tools</artifactId>
        <version>1.9.0-cdh6.2.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 -->
    <dependency>
        <groupId>org.apache.kudu</groupId>
        <artifactId>kudu-spark2_2.11</artifactId>
        <version>1.9.0-cdh6.2.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>
</dependencies>

三、​​​​​​​创建包结构

包名

说明

cn.it

代码所在的包目录

 

四、​​​​​​​初始化方法

package cn.it;

import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Type;
import org.apache.kudu.client.KuduClient;
import org.junit.Before;

public class TestKudu 
    //定义KuduClient客户端对象
    private static KuduClient kuduClient;
    //定义表名
    private static String tableName = "person";

    /**
     * 初始化方法
     */
    @Before
    public void init() 
        //指定master地址
        String masterAddress = "node2.cn";
        //创建kudu的数据库连接
        kuduClient = new KuduClient.KuduClientBuilder(masterAddress).defaultSocketReadTimeoutMs(6000).build();
    

    //构建表schema的字段信息
    //字段名称   数据类型     是否为主键
    public ColumnSchema newColumn(String name, Type type, boolean isKey) 
        ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type);
        column.key(isKey);
        return column.build();
    

五、​​​​​​​创建表

/**  使用junit进行测试
 *
 * 创建表
 * @throws KuduException
 */
@Test
public void createTable() throws KuduException 
    //设置表的schema
    List<ColumnSchema> columns = new LinkedList<ColumnSchema>();
    columns.add(newColumn("CompanyId", Type.INT32, true));
    columns.add(newColumn("WorkId", Type.INT32, false));
    columns.add(newColumn("Name", Type.STRING, false));
    columns.add(newColumn("Gender", Type.STRING, false));
    columns.add(newColumn("Photo", Type.STRING, false));
    Schema schema = new Schema(columns);
    //创建表时提供的所有选项
    CreateTableOptions tableOptions = new CreateTableOptions();
    //设置表的副本和分区规则
    LinkedList<String> list = new LinkedList<String>();
    list.add("CompanyId");
    //设置表副本数
    tableOptions.setNumReplicas(1);
    //设置range分区
    //tableOptions.setRangePartitionColumns(list);
    //设置hash分区和分区的数量
    tableOptions.addHashPartitions(list, 3);
    try 
        kuduClient.createTable("person", schema, tableOptions);
     catch (Exception e) 
        e.printStackTrace();
    
    kuduClient.close();

 

六、​​​​​​​插入数据

/**
 * 向表中加载数据
 * @throws KuduException
 */
@Test
public void loadData() throws KuduException 
    //打开表
    KuduTable kuduTable = kuduClient.openTable(tableName);
    //创建KuduSession对象 kudu必须通过KuduSession写入数据
    KuduSession kuduSession = kuduClient.newSession();
    //采用flush方式 手动刷新
    kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
    kuduSession.setMutationBufferSpace(3000);
    //准备数据
    for(int i=1; i<=10; i++)
        Insert insert = kuduTable.newInsert();
        //设置字段的内容
        insert.getRow().addInt("CompanyId",i);
        insert.getRow().addInt("WorkId",i);
        insert.getRow().addString("Name","lisi"+i);
        insert.getRow().addString("Gender","male");
        insert.getRow().addString("Photo","person"+i);
        kuduSession.flush();
        kuduSession.apply(insert);
    
    kuduSession.close();
    kuduClient.close();

七、​​​​​​​查询数据

 /**
 * 查询表数据
 * @throws KuduException
 */
@Test
public void queryData() throws KuduException 
    //打开表
    KuduTable kuduTable = kuduClient.openTable(tableName);
    //获取scanner扫描器
    KuduScanner.KuduScannerBuilder scannerBuilder = kuduClient.newScannerBuilder(kuduTable);
    KuduScanner scanner = scannerBuilder.build();
    //遍历
    while(scanner.hasMoreRows())
        RowResultIterator rowResults = scanner.nextRows();
        while (rowResults.hasNext())
            RowResult result = rowResults.next();
            int companyId = result.getInt("CompanyId");
            int workId = result.getInt("WorkId");
            String name = result.getString("Name");
            String gender = result.getString("Gender");
            String photo = result.getString("Photo");
            System.out.print("companyId:"+companyId+" ");
            System.out.print("workId:"+workId+" ");
            System.out.print("name:"+name+" ");
            System.out.print("gender:"+gender+" ");
            System.out.println("photo:"+photo);
        
    
    //关闭
    scanner.close();
    kuduClient.close();

 

八、修改数据

/**
 * 修改数据
 * @throws KuduException
 */
@Test
public void upDATEData() throws KuduException 
    //打开表
    KuduTable kuduTable = kuduClient.openTable(tableName);
    //构建kuduSession对象
    KuduSession kuduSession = kuduClient.newSession();
    //设置刷新数据模式,自动提交
    kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);

    //更新数据需要获取UpDATE对象
    UpDATE upDATE = kuduTable.newUpDATE();
    //获取row对象
    PartialRow row = upDATE.getRow();
    //设置要更新的数据信息
    row.addInt("CompanyId",1);
    row.addString("Name","kobe");
    //操作这个upDATE对象
    kuduSession.apply(upDATE);
    kuduSession.close();

 

九、​​​​​​​删除数据

/**
 * 删除表中的数据
 */
@Test
public void deleteData() throws KuduException 
    //打开表
    KuduTable kuduTable = kuduClient.openTable(tableName);
    KuduSession kuduSession = kuduClient.newSession();
    //获取Delete对象
    Delete delete = kuduTable.newDelete();
    //构建要删除的行对象
    PartialRow row = delete.getRow();
    //设置删除数据的条件
    row.addInt("CompanyId",2);
    kuduSession.flush();
    kuduSession.apply(delete);
    kuduSession.close();
    kuduClient.close();

 

十、​​​​​​​修改表

package cn.it.kudu;

import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import org.junit.Before;
import org.junit.Test;

import java.util.List;

/**
 * 修改表操作
 */
public class AlterTable 
    //定义kudu的客户端对象
    private static KuduClient kuduClient;
    //定义一张表名称
    private static String tableName = "person";

    /**
     * 初始化操作
     */
    @Before
    public void init() 
        //指定kudu的master地址
        String masterAddress = "node2.cn";
        //创建kudu的数据库连接
        kuduClient = new KuduClient.KuduClientBuilder(masterAddress).defaultSocketReadTimeoutMs(6000).build();
    

    /**
     * 添加列
     */
    @Test
    public void alterTableAddColumn() 
        AlterTableOptions alterTableOptions = new AlterTableOptions();
        alterTableOptions.addColumn(new ColumnSchema.ColumnSchemaBuilder("Address", Type.STRING).nullable(true).build());
        try 
            kuduClient.alterTable(tableName, alterTableOptions);
         catch (KuduException e) 
            e.printStackTrace();
       
    

    /**
     * 删除列
     */
    @Test
    public void alterTableDeleteColumn()
        AlterTableOptions alterTableOptions = new AlterTableOptions().dropColumn("Address");
        try 
            kuduClient.alterTable(tableName, alterTableOptions);
         catch (KuduException e) 
            e.printStackTrace();
       
    

    /**
     * 添加分区列
     */
    @Test
    public void alterTableAddRangePartition()
        int lowerValue = 110;
        int upperValue = 120;
        try 
            KuduTable kuduTable = kuduClient.openTable(tableName);
            List<Partition> rangePartitions = kuduTable.getRangePartitions(6000);
            boolean flag = true;
            for (Partition rangePartition : rangePartitions) 
                int startKey = rangePartition.getDecodedRangeKeyStart(kuduTable).getInt("Id");
                if(startKey == lowerValue)
                    flag = false;
                
            
            if(flag) 
                PartialRow lower = kuduTable.getSchema().newPartialRow();
                lower.addInt("Id", lowerValue);
                PartialRow upper = kuduTable.getSchema().newPartialRow();
                upper.addInt("Id", upperValue);
                kuduClient.alterTable(tableName,new AlterTableOptions().addRangePartition(lower, upper));
            else
                System.out.println("分区已经存在,不能重复创建!");
            
         catch (KuduException e) 
            e.printStackTrace();
         catch (Exception exception) 
            exception.printStackTrace();
        
    

    /**
     * 删除表
     * @throws KuduException
     */
    @Test
    public void dropTable() throws KuduException 
        kuduClient.deleteTable(tableName);
    

十一、​​​​​​​删除表

/**
 * 删除表
 */
@Test
public void dropTable() throws KuduException 
    //删除表
    DeleteTableResponse response = kuduClient.deleteTable(tableName);
    //关闭客户端连接
    kuduClient.close();


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢大数据系列文章会每天更新,停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

 

 

以上是关于客快物流大数据项目(四十二):Java代码操作Kudu的主要内容,如果未能解决你的问题,请参考以下文章

客快物流大数据项目(四十六):Spark操作Kudu dataFrame操作kudu

客快物流大数据项目(四十五):Spark操作Kudu DML操作

客快物流大数据项目(四十三):kudu的分区方式

客快物流大数据项目(六十二):主题及指标开发

客快物流大数据项目(四十):ETL实现方案

客快物流大数据项目(四十一):Kudu入门介绍