使用Java API 操作Kudu

Posted 杀智勇双全杀

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Java API 操作Kudu相关的知识,希望对你有一定的参考价值。

概述

不是给纯小白看的,也就不谈概念这种废话了,直接上代码。。。使用一个叫KuduPlus的小工具辅助测试。

Maven依赖

    <!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 -->
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>jboss</id>
            <url>http://repository.jboss.com/nexus/content/groups/public</url>
        </repository>
    </repositories>

    <!-- 版本属性 -->
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <kudu.version>1.9.0-cdh6.2.1</kudu.version>
        <junit.version>4.12</junit.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client</artifactId>
            <version>${kudu.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-client-tools</artifactId>
            <version>${kudu.version}</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

操作Kudu表

代码太多,使用了//region//endregion,在idea中可以折叠,还是多,又分开写的,合一起也可以。方便在需要的时候直接ctrl+v改参数。

package com.aa.kudu.table;

import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;

//使用Java API对Kudu进行CRUD操作,包含创建表及删除表
public class kuduTableDemo {
    //定义kuduClient实例对象
    private KuduClient kuduClient = null;

    //region    Before操作初始化
    @Before
    public void init() {
        //KuduMaster地址信息
        String masterAddresses = "192.168.88.20:7051";
        //初始化KuduClient实例对象
        kuduClient = new KuduClient.KuduClientBuilder(masterAddresses)//建造者模式
                .defaultOperationTimeoutMs(10000)//设置Kudu操作的超时时间,默认30s
                .defaultSocketReadTimeoutMs(6000)//设置从Socket读数据超时,默认10s
                .build();//返回KuduClient类型
    }
    //endregion

    //region 测试连接
    @Test
    public void testKuduClient() {
        System.out.println("kuduClient = " + kuduClient);//kuduClient = org.apache.kudu.client.KuduClient@6e1ec318
    }
    //endregion

    //region    测试创建表create table
    /*
    创建Kudu表:
    create table aa_users(
    	 id int,
		 name string,
		 age byte,
		 primary key(id)
    )
     */
    //封装方法
    private ColumnSchema newColumnSchema(String name, Type type, boolean isKey) {
        //创建ColumnSchemaBuilder实例对象
        ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type);
        column.key(isKey);//设置是否为主键
        //构建ColumnSchema
        return column.build();
    }

    @Test
    public void createKuduTable() throws KuduException {
        //定义各个列,添加到List列表
        List<ColumnSchema> columns = new ArrayList<>();
        //定义每个列、名称、类型及是否魏主键
        columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());//使用默认方法
        columns.add(newColumnSchema("name", Type.STRING, false));//使用封装的方法
        columns.add(newColumnSchema("age", Type.INT8, false));
        //定义schema:public Schema(List<ColumnSchema> columns)
        Schema schema = new Schema(columns);

        //定义表的属性
        CreateTableOptions options = new CreateTableOptions();
        //设置分区策略
        options.addHashPartitions(Arrays.asList("id"), 3);
        //设置副本数目
        options.setNumReplicas(1);

        //创建Kudu表:public KuduTable createTable(String name, Schema schema, CreateTableOptions builder)
        KuduTable kuduTable = kuduClient.createTable("aa_users", schema, options);
        System.out.println("kuduTable.getTableId() = " + kuduTable.getTableId());//kuduTable.getTableId() = 291ebdc9de8e44c3a84a3601f13dcf94
    }
    //endregion

    //region    测试删除表(delete table if exists)
    @Test
//    public void dropKuduTable(String tableName) throws KuduException {//可以传表名
//            if(kuduClient.tableExists(tableName)){//if exists再删除,避免报错
//                kuduClient.deleteTable(tableName);//按名称删除
//            }
//        }
    public void dropKuduTable() throws KuduException {
        if (kuduClient.tableExists("aa_users")) {//if exists再删除,避免报错
            System.out.println("存在表aa_users");
            kuduClient.deleteTable("aa_users");//按名称删除
            System.out.println("已删除表aa_users");
        } else {
            System.out.println("不存在表aa_users");
        }
    }
    //endregion

    //region    创建表(范围分区)
    @Test
    public void  createKuduTableByRange() throws KuduException{
        //定义schema信息、列名称、列类型
        List<ColumnSchema> columns = new ArrayList<>();
        columns.add(new ColumnSchema.ColumnSchemaBuilder("id",Type.INT32).key(true).build());
        columns.add(newColumnSchema("name",Type.STRING,false));
        columns.add(newColumnSchema("age",Type.INT8,false));
        Schema schema = new Schema(columns);

        //设置表的属性
        CreateTableOptions options = new CreateTableOptions();
        //设置分区策略
        options.setRangePartitionColumns(Arrays.asList("id"));//设置范围分区字段名称

        //id<100
        PartialRow upper100 = new PartialRow(schema);
        upper100.addInt("id",100);
        options.addRangePartition(new PartialRow(schema),upper100);

        //100<=id<500
        PartialRow lower100 = new PartialRow(schema);
        lower100.addInt("id",100);
        PartialRow upper500 = new PartialRow(schema);
        upper500.addInt("id",500);
        options.addRangePartition(lower100,upper500);

        //id>=500
        PartialRow lower500 = new PartialRow(schema);
        lower500.addInt("id",500);
        options.addRangePartition(lower500,new PartialRow(schema));

        //设置副本数目
        options.setNumReplicas(1);

        //传递参数,构建表
        KuduTable kuduTable = kuduClient.createTable("aa_users_range",schema,options);
        System.out.println("kuduTable.getTableId() = " + kuduTable.getTableId());

    }
    //endregion

    //region    创建表(多级分区)
    //先哈希再范围,或先哈希再哈希
    @Test
    public void createKuduTableMulti() throws KuduException{
        //构建Schema信息
        List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
        columnSchemas.add(newColumnSchema("id",Type.INT32,true));
        columnSchemas.add(newColumnSchema("age",Type.INT8,true));
        columnSchemas.add(newColumnSchema("name",Type.STRING,false));
        //定义Schema信息
        Schema schema = new Schema(columnSchemas);

        //Kudu表的分区策略及分区副本数目设置
        CreateTableOptions tableOptions = new CreateTableOptions();
        // TODO: 2021/6/29 设置哈希分区
        List<String> columnsHash = new ArrayList<>();
        columnsHash.add("id");
        tableOptions.addHashPartitions(columnsHash,5);

        // TODO: 2021/6/29 设置范围分区
        List<String> columnsRange = new ArrayList<>();
        columnsRange.add("age");
        tableOptions.setRangePartitionColumns(columnsRange);
        //添加范围分区
        PartialRow upper21 = new PartialRow(schema);
        upper21.addByte("age",(byte)21);//缺少会报错org.apache.kudu.client.NonRecoverableException: overlapping range partitions: first range partition: UNBOUNDED
        tableOptions.addRangePartition(new PartialRow(schema),upper21);
        //添加范围分区
        PartialRow lower21 = new PartialRow(schema);
        lower21.addByte("age",(byte)21);
        PartialRow upper41 = new PartialRow(schema);
        upper41.addByte("age",(byte)41);
        tableOptions.addRangePartition(lower21,upper41);//缺少会报错org.apache.kudu.client.NonRecoverableException: overlapping range partitions
        //添加范围分区
        PartialRow lower41 = new PartialRow(schema);
        lower41.addByte("age",(byte)41);
        tableOptions.addRangePartition(lower41,new PartialRow(schema));

        //副本数设置
        tableOptions.setNumReplicas(1);

        //在Kudu中创建表
        KuduTable userTable = kuduClient.createTable("aa_users_multi",schema,tableOptions);
        System.out.println(userTable.toString());//org.apache.kudu.client.KuduTable@17695df3
        /*
        node2:8051看到
        HASH (id) PARTITIONS 5,
        RANGE (age) (
            PARTITION VALUES < 21,
            PARTITION 21 <= VALUES < 41,
            PARTITION VALUES >= 41
        )
         */
    }
    //endregion

    //region    添加列
    @Test
    public void alterKuduTableAddColumn() throws KuduException{
        //添加列
        AlterTableOptions alterTableOptions = new AlterTableOptions();
        alterTableOptions.addColumn("address",Type.STRING,"银河系");
        //修改表
        AlterTableResponse response = kuduClient.alterTable("aa_users",alterTableOptions);
        System.out.println(response.getTableId());//80a90f5ff44a4432a21fff322c8f1659
    }
    //endregion

    //region    删除列
    @Test
    public void alterKuduTableDropColumn() throws KuduException{
        //删除列
        AlterTableOptions alterTableOptions = new AlterTableOptions();
        alterTableOptions.dropColumn("address");
        //修改表
        AlterTableResponse response = kuduClient.alterTable("aa_users",alterTableOptions);
        System.out.println(response.getTableId());
    }
    //endregion

    //region    释放资源
    @After
    public void close() throws KuduException {
        if (kuduClient != null) {
            kuduClient.close();//测试完成后释放资源
        }
    }
    //endregion
}

操作Kudu数据

package com.aa.kudu.data;

import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Arrays;
import java.util.Random;

public class kuduDataDemo {
    //定义kuduClient实例对象
    private KuduClient kuduClient = null;

    //region    封装方法
    private ColumnSchema newColumnSchema(String name, Type type, boolean isKey) {
        //创建ColumnSchemaBuilder实例对象
        ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type);
        column.key(isKey);//设置是否为主键
        //构建ColumnSchema
        return column.build();
    }
    //endregion

    //region    Before操作初始化
    @Before
    public void init() {
        //KuduMaster地址信息
        String masterAddresses = "192.168.88.20:7051";
        //初始化KuduClient实例对象
        kuduClient = new KuduClient.KuduClientBuilder(masterAddresses)//建造者模式
                .defaultOperationTimeoutMs(10000)//设置Kudu操作的超时时间,默认30s
                .defaultSocketReadTimeoutMs(6000)//设置从Socket读数据超时,默认10s
                .build();//返回KuduClient类型
    }
    //endregion

    //region    测试单条插入insert
    @Test
    public void insertKuduSingleData() throws KuduException {
        //获取操作句柄
        KuduTable kuduTable = kuduClient.openTable("aa_users");
        //获取kuduSession的实例对象
        KuduSession kuduSession 以上是关于使用Java API 操作Kudu的主要内容,如果未能解决你的问题,请参考以下文章

大数据Kudu:Kudu Java Api操作

kudu系列: Java API使用和效率测试

客快物流大数据项目(四十二):Java代码操作Kudu

客快物流大数据项目(四十六):Spark操作Kudu dataFrame操作kudu

spark操作kudu,出现异常java.lang.ClassNotFoundException: org.apache.kudu.spark.kudu.DefaultSource

spark操作kudu,出现异常java.lang.ClassNotFoundException: org.apache.kudu.spark.kudu.DefaultSource