Kudu案例库V1.0版

Posted ChinaManor

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kudu案例库V1.0版相关的知识,希望对你有一定的参考价值。

项目案例库

案例一:Java 操作 Kudu之创建KuduClient实例

pom.xml

<!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 -->
<repositories>
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
    <repository>
        <id>jboss</id>
        <url>http://repository.jboss.com/nexus/content/groups/public</url>
    </repository>
</repositories>

<!-- 版本属性 -->
<properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <kudu.version>1.9.0-cdh6.2.1</kudu.version>
    <junit.version>4.12</junit.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.kudu</groupId>
        <artifactId>kudu-client</artifactId>
        <version>${kudu.version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kudu</groupId>
        <artifactId>kudu-client-tools</artifactId>
        <version>${kudu.version}</version>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>

</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
    </plugins>
</build>

代码实现

	// 定声明KuduClient实例对象
	private KuduClient kuduClient ;

	@Before
	public void init() {
		String masterAddresses = "node2.itcast.cn:7051" ;
		// 构建KuduClient实例对象,连接Kudu集群(Kudu Master)
		kuduClient = new KuduClient.KuduClientBuilder(masterAddresses)
				.defaultSocketReadTimeoutMs(10000) // 设置连接Socket超时时间
				.defaultOperationTimeoutMs(10000)
				.build();
	}

案例二:Java 操作 Kudu之创建KuduTable

/**
	 * 用于构建Kudu表中每列的字段信息Schema
	 *
	 * @param name 字段名称
	 * @param type 字段类型
	 * @param isKey 是否为Key
	 * @return ColumnSchema对象
	 */
	private ColumnSchema newColumnSchema(String name, Type type, boolean isKey) {
		// 创建ColumnSchemaBuilder实例对象
		ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type);
		// 设置是否为主键
		column.key(isKey) ;
		// 构建 ColumnSchema
		return column.build() ;
	}

	/**
	 * 创建Kudu中的表,表的结构如下所示:
	 create table itcast_users(
		 id int,
		 name string,
		 age byte,
		 primary key(id)
	 )
	 paritition by hash(id) partitions 3
	 stored as kudu ;
	 */
	@Test
	public void createKuduTable() throws KuduException {
		// a. 定义Kudu表的Schema信息
		// 定义Kudu表每列Schema信息
		List<ColumnSchema> columns = new ArrayList<>();
		columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build());
		columns.add(newColumnSchema("name", Type.STRING, false));
		columns.add(newColumnSchema("age", Type.INT8, false));

		Schema schema = new Schema(columns) ;

		// b. 定义Kudu表的属性,分区策略,分区数目和副本数
		CreateTableOptions options = new CreateTableOptions() ;
		// 设置分区策略
		options.addHashPartitions(Collections.singletonList("id"), 3);
		// 设置副本数目
		options.setNumReplicas(1) ;

		/*
		public KuduTable createTable(String name, Schema schema, CreateTableOptions builder)
		 */
		KuduTable kuduTable = kuduClient.createTable("itcast_users", schema, options);
		System.out.println("TableId: " + kuduTable.getTableId());
	}

案例三:Java 操作 Kudu之修改表中字段,增加列:address,String

/**
	 * 对Kudu中表进行修改,增加列:address,String
	 */
	@Test
	public void alterKuduTableAddColumn() throws KuduException {
		// 添加一列:gender
		AlterTableOptions ato = new AlterTableOptions() ;
		//ato.addColumn(newColumnSchema("gender", Type.STRING, false)) ;
		ato.addColumn("gender", Type.STRING, "male") ;

		//  public AlterTableResponse alterTable(String name, AlterTableOptions ato)
		AlterTableResponse response = kuduClient.alterTable("itcast_users", ato);
		System.out.println(response.getTableId());
	}

案例四:对Kudu表进行修改,删除列:address

	@Test
	public void alterKuduTableDropColumn() throws KuduException {
		// 删除:gender
		AlterTableOptions ato = new AlterTableOptions() ;
		ato.dropColumn("gender") ;

		//  public AlterTableResponse alterTable(String name, AlterTableOptions ato)
		AlterTableResponse response = kuduClient.alterTable("itcast_users", ato);
		System.out.println(response.getTableId());
	}

案例五:对Kudu表中进行插入数据

将数据插入到Kudu Table中: INSERT INTO (id, name, age) VALUES (1001, "zhangsan", 26)
	@Test
	public void insertKuduDataSingle() throws KuduException {
		// a. 获取Kudu表的句柄,传递表的名称
		KuduTable kuduTable = kuduClient.openTable("itcast_users") ;

		// b. 获取KuduSession会话对象
		KuduSession kuduSession = kuduClient.newSession();

		// c. 构建Insert插入对象
		Insert insert = kuduTable.newInsert();
		// 获取Row对象,设置插入的值
		PartialRow row = insert.getRow();
		row.addInt("id", 1);
		row.addString("name", "itcast");
		row.addByte("age", (byte)15);

		// d. 执行插入数据
		OperationResponse response = kuduSession.apply(insert);
		System.out.println(response.getElapsedMillis());

		// e. 关闭
		kuduSession.close();
	}

案例六:批量数据插入kudu表中

@Test
	public void insertKuduData() throws KuduException {
		// a. 获取Kudu表的句柄,传递表的名称
		KuduTable kuduTable = kuduClient.openTable("itcast_users") ;

		// b. 获取KuduSession会话对象
		KuduSession kuduSession = kuduClient.newSession();
		// 设置批量插入数据
		kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH); // 手动提交,插入数据
		kuduSession.setMutationBufferSpace(1000); // 批量插入数据条目数

		// FOR循环,遍历100次,插入100条数据
		Random random = new Random() ;
		for (int i = 1 ; i <= 100; i ++){
			// c. 构建Insert插入对象
			Insert insert = kuduTable.newInsert();

			// 获取Row对象,设置插入的值
			PartialRow row = insert.getRow();
			row.addInt("id", 1000 + i);
			row.addString("name", "zhangsan-" + i);
			row.addByte("age", (byte)(random.nextInt(10) + 20));

			// d. 加入批次中
			kuduSession.apply(insert) ;
		}

		// 手动进行触发插入数据:刷新
		kuduSession.flush();

		// e. 关闭
		kuduSession.close();
	}

案例七:从Kudu表中全量加载数据

@Test
public void selectKuduData() throws KuduException {
   // a. 获取Kudu Table的句柄,依据表的名称
   KuduTable kuduTable = kuduClient.openTable("itcast_users");

   // b. 构建表的扫描器对象Scanner
   KuduScanner.KuduScannerBuilder builder = kuduClient.newScannerBuilder(kuduTable);
   KuduScanner scanner = builder.build();

   // c. 遍历数据
   int counter =  1;
   while (scanner.hasMoreRows()){ // 表示遍历每个Tablet数据
      System.out.println("===================" + (counter ++) + "===================");
      RowResultIterator rowResults = scanner.nextRows();
      // 遍历Tablet中每条数据
      while (rowResults.hasNext()){
         RowResult rowResult = rowResults.next();
         System.out.println(
               "id = " + rowResult.getInt("id")
                     + ", name = " + rowResult.getString("name")
                     + ", age = " + rowResult.getByte("age")
         );
      }
   }
}

案例八:从Kudu表中设置过滤查询数据

设置过滤:比如只查询id和age两个字段的值,年龄age小于25,id大于1050
	/**
	 * 用于构建Kudu表中每列的字段信息Schema
	 *
	 * @param name 字段名称
	 * @param type 字段类型
	 * @param isKey 是否为Key
	 * @return ColumnSchema对象
	 */
	private ColumnSchema newColumnSchema(String name, Type type, boolean isKey) {
		// 创建ColumnSchemaBuilder实例对象
		ColumnSchema.ColumnSchemaBuilder column = new ColumnSchema.ColumnSchemaBuilder(name, type);
		// 设置是否为主键
		column.key(isKey) ;
		// 构建 ColumnSchema
		return column.build() ;
	}
	@Test
	public void queryKuduData() throws KuduException {
		// a. 获取Kudu Table的句柄,依据表的名称
		KuduTable kuduTable = kuduClient.openTable("itcast_users");

		// b. 构建表的扫描器对象Scanner
		KuduScanner.KuduScannerBuilder builder = kuduClient.newScannerBuilder(kuduTable);
		// TODO:设置选取的字段project投影
		builder.setProjectedColumnNames(Arrays.asList("id", "age")) ;

		// TODO: 设置过滤条件
		// id大于1050
		KuduPredicate idPredicate = KuduPredicate.newComparisonPredicate(
			newColumnSchema("id", Type.INT32, true), //
				KuduPredicate.ComparisonOp.GREATER, //
				1050 //
		) ;
		builder.addPredicate(idPredicate) ;

		// 年龄age小于25
		KuduPredicate agePredicate =  KuduPredicate.newComparisonPredicate(
				newColumnSchema("age", Type.INT8, false), //
				KuduPredicate.ComparisonOp.LESS, //
				(byte)25 //
		) ;
		builder.addPredicate(agePredicate) ;

		KuduScanner scanner = builder.build();

		// c. 遍历数据
		int counter =  1;
		while (scanner.hasMoreRows()){ // 表示遍历每个Tablet数据
			System.out.println("===================" + (counter ++) + "===================");
			RowResultIterator rowResults = scanner.nextRows();
			// 遍历Tablet中每条数据
			while (rowResults.hasNext()){
				RowResult rowResult = rowResults.next();
				System.out.println(
						"id = " + rowResult.getInt("id")
								+ ", age = " + rowResult.getByte("age")
				);
			}
		}
	}

案例九:更新kudu表中的数据

	/**
	 * 更新Kudu表中数据
	 */
	@Test
	public void updateKuduData() throws KuduException {
		// a. 获取Kudu表的句柄,传递表的名称
		KuduTable kuduTable = kuduClient.openTable("itcast_users") ;

		// b. 获取KuduSession会话对象
		KuduSession kuduSession = kuduClient.newSession();

		// c. 构建Update更新对象
		Update update = kuduTable.newUpdate();
		// 获取Row对象,设置插入的值
		PartialRow row = update.getRow();
		row.addInt("id", 1);
		row.addString("name", "czxy");
		row.addByte("age", (byte)5);

		// d. 执行插入数据
		OperationResponse response = kuduSession.apply(update);
		System.out.println(response.getElapsedMillis());

		// e. 关闭
		kuduSession.close();
	}

案例十:更新Kudu表中数据,当主键存在时,更新数据;不存在时,插入数据

	@Test
	public void upsertKuduData() throws KuduException {
		// a. 获取Kudu表的句柄,传递表的名称
		KuduTable kuduTable = kuduClient.openTable("itcast_users") ;

		// b. 获取KuduSession会话对象
		KuduSession kuduSession = kuduClient.newSession();

		// c. 构建Upsert对象
		Upsert upsert = kuduTable.newUpsert();
		// 获取Row对象,设置插入的值
		PartialRow row = upsert.getRow();
		row.addInt("id", 1);
		row.addString("name", "czxy");
		row.addByte("age", (byte)5);

		// d. 执行插入数据
		OperationResponse response = kuduSession.apply(upsert);
		System.out.println(response.getElapsedMillis());

		// e. 关闭
		kuduSession.close();
	}

案例十一:删除Kudu表中数据

	@Test
	public void deleteKuduData() throws KuduException {
		// a. 获取Kudu表的句柄,传递表的名称
		KuduTable kuduTable = kuduClient.openTable("itcast_users") ;

		// b. 获取KuduSession会话对象
		KuduSession kuduSession = kuduClient.newSession();

		// c. 构建Delete删除对象
		Delete delete = kuduTable.newDelete();
		// 获取Row对象,设置插入的值
		PartialRow row = delete.getRow();
		row.addInt("id", 1)Spark案例库V1.0版

Python代码项目目录规范v1.0

面向对象案例 - 学生信息管理系统V1.0

面向对象案例 - 学生信息管理系统V1.0

jplogic v1.0案例开发之知识库管理(文档管理等)

kudu 1.7 源码安装