大数据HadoopHDFS的Java API操作

Posted 脚丫先生

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据HadoopHDFS的Java API操作相关的知识,希望对你有一定的参考价值。

大家好,我是脚丫先生 (o^^o)

紧接着前文说到HDFS的API操作,我们今天针对HDFS的API操作进行详细解说。

好了,我们开始今天的正文。


一、HDFS 的 Java API 操作

HDFS 在生产应用中主要是客户端的开发,其核心步骤是从 HDFS 提供的 API 中构造一个 HDFS 的访问客户端对象,然后通过该客户端对象操作(增删改查)HDFS 上的文件

二、搭建开发环境

打开 IDEA,创建一个 maven 工程,然后 引入对应的 HDFS 的jar 依赖

<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>2.7.7</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.7.7</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs-client -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs-client</artifactId>
    <version>2.7.7</version>
    <scope>provided</scope>
</dependency>

三、FileSystem实例获取讲解

在 Java 中操作 HDFS,首先要获得一个客户端实例:

Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);

而我们的操作目标是 HDFS,所以获取到的 fs 对象应该是DistributedFileSystem 的实例;get方法是从何处判断具体实例化那种客户端类呢?从conf中的一个参数 fs.defaultFS的配置值判断;如果我们的代码中没有指定 fs.defaultFS,并且工程 classpath 下也没有给定相应的配置,conf 中的默认值就来自于 hadoop 的 jar 包中的 core-default.xml,默认值为:file:///,则获取的将不是一个 DistributedFileSystem 的实例,而是一个本地文件系统的客户端对象 LocalFileSystem

四、 HDFS常用Java API代码

package hdfs;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/**
 * java中创建对象的几种方式
 * 1: 构造方法 new FileSystem()
 * 2   静态构造方法 FileSystem.getInstance
 * 3.  反射
 * 4.  克隆      fs.clone()
 * 5.   序列化
 */
public class HDFS_API_DEMO {

	public static FileSystem fs;
	public static Configuration conf;

	// 初始一个本地文件系统
	public static void initLocalFS() throws Exception{
		conf = new Configuration();
		// 利用FileSystem的自身携带的get方法获取FileSystem的一个实例
		fs = FileSystem.get(conf);
	}

	// 代码设置访问初始化一个HDFS文件系统示例对象
	public static void initHDFS() throws Exception{
		conf = new Configuration();
		// 设置了该参数表示从hdfs获取一个分布式文件系统的实例,否则默认表示获取一个本地文件系统的实例
		conf.set("fs.defaultFS", "hdfs://192.168.239.128:8020");
		// 设置客户端访问hdfs集群的身份标识(设置搭建集群的时候的用户名)
		System.setProperty("HADOOP_USER_NAME", "root");
		// 利用FileSystem的自身携带的get方法获取FileSystem的一个实例
		fs = FileSystem.get(conf);
	}

	// 从配置文件初始化一个HDFS文件系统实例对象
	public static void initHDFSWithConf() throws Exception{
		/**
		 * 构造一个配置参数对象,设置一个参数:我们要访问的hdfs的URI
		 * 从而FileSystem.get()方法就知道应该是去构造一个访问hdfs文件系统的客户端,以及hdfs的访问地址 new
		 * Configuration();的时候,它就会去加载jar包中的hdfs-default.xml
		 * 然后再加载classpath下的hdfs-site.xml
		 */
		Configuration conf = new Configuration();
		System.setProperty("HADOOP_USER_NAME", "bigdata");
		conf.addResource("config/core-site.xml");
		conf.addResource("config/hdfs-site.xml");
		conf.addResource("config/mapred-site.xml");
		conf.addResource("config/yarn-site.xml");

		// conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
		// 参数优先级: 1、客户端代码中设置的值 2、classpath下的用户自定义配置文件 3、然后是服务器的默认配置
		conf.set("dfs.replication", "2");
		conf.set("dfs.block.size", "64m");

		// 获取一个hdfs的访问客户端,根据参数,这个实例应该是DistributedFileSystem的实例
		// 如果这样去获取,那conf里面就可以不要配"fs.defaultFS"参数,而且,这个客户端的身份标识已经是hadoop用户
		fs = FileSystem.get(conf);
	}

	@Before
	public void init() throws Exception {
//		initLocalFS();
		initHDFS();
//		initHDFSWithConf();
	}

	/**
	 * 创建文件夹
	 */
	@Test
	public void testMkdir() throws Exception {
		System.out.println(fs.mkdirs(new Path("/ccc/bbb/aaa")));
	}

	/**
	 * 上传文件
	 */
	@Test
	public void testCopyFromLocal() throws Exception {
		// src : 要上传的文件所在的本地路径
		// dst : 要上传到hdfs的目标路径
		Path src = new Path("C:/software/hadoop-eclipse-plugin-2.7.7.jar");
		Path dst = new Path("/");
		fs.copyFromLocalFile(src, dst);
	}

	/**
	 * 下载文件
	 */
	@Test
	public void testCopyToLocal() throws Exception {
		fs.copyToLocalFile(new Path("/wordcount/input/helloWorld.txt"), new Path("c:/"));
//		fs.copyToLocalFile(new Path("/wordcount/input/helloWorld.txt"), new Path("d:/"), true);
	}

	/**
	 * 删除文件 或者 文件夹
	 */
	@Test
	public void testRemoveFileOrDir() throws Exception {
		// 删除文件或者文件夹,如果文件夹不为空,这第二个参数必须有, 而且要为true
		fs.delete(new Path("/ccc/bbb"), true);
	}

	/**
	 * 重命名文件 或者 文件夹
	 */
	@Test
	public void testRenameFileOrDir() throws Exception {
		// fs.rename(new Path("/ccc"), new Path("/vvv"));
		fs.rename(new Path("/hadoop-eclipse-plugin-2.6.4.jar"), new Path("/eclipsePlugin.jar"));
	}

	/**
	 * 显示 指定文件夹下  所有的文件
	 */
	@Test
	public void testListFiles() throws Exception {
		//RemoteIterator迭代器
		RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/wordcount"), true);
		while (listFiles.hasNext()) {
			LocatedFileStatus fileStatus = listFiles.next();
			System.out.print(fileStatus.getPath() + "\\t");
			System.out.print(fileStatus.getPath().getName() + "\\t");
			System.out.print(fileStatus.getBlockSize() + "\\t");
			System.out.print(fileStatus.getPermission() + "\\t");
			System.out.print(fileStatus.getReplication() + "\\t");
			System.out.println(fileStatus.getLen());

			BlockLocation[] blockLocations = fileStatus.getBlockLocations();
			for (BlockLocation bl : blockLocations) {
				System.out.println("Block Length:" + bl.getLength() + "   Block OffSet:" + bl.getOffset());
				String[] hosts = bl.getHosts();
				for (String str : hosts) {
					System.out.print(str + "\\t");
				}
				System.out.println();
			}

			System.out.println("---------------------------------------");
		}
	}

	/**
	 * 查看指定文件下 的 文件 或者 文件夹。 不包含子文件夹下的内容
	 * 查看文件下的是文件夹还是目录
	 */
	@Test
	public void testListStatus() throws Exception {
		FileStatus[] listStatus = fs.listStatus(new Path("/hdfs"));
		String flag = "";
		for (FileStatus status : listStatus) {
			if (status.isDirectory()) {
				flag = "Directory";
			} else {
				flag = "File";
			}
			System.out.println(flag + "\\t" + status.getPath());
		}
	}

	/**
	 * 关闭 FS 示例对象
	 */
	@After
	public void close(){
		try {
			fs.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}

以上是关于大数据HadoopHDFS的Java API操作的主要内容,如果未能解决你的问题,请参考以下文章

大数据HadoopHDFS3.3.1-Namenode系列源码阅读

大数据HadoopHDFS-Namenode-bootstrapStandby同步元数据的源码步骤分析

大数据Kudu:Kudu Java Api操作

2021年大数据ZooKeeper:ZooKeeper Java API操作

大数据讲课笔记6.6 ZooKeeper的Java API操作

大数据讲课笔记6.6 ZooKeeper的Java API操作