Java API实现HDFS有关目录与文件的相关功能

Posted 行秋

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java API实现HDFS有关目录与文件的相关功能相关的知识,希望对你有一定的参考价值。

实验要求

Java API实现HDFS以下功能。
1.    在HDFS中创建一个新目录;
2.    从本地拷贝文件到新创建的目录中;
3.    将HDFS中指定文件的内容输出到终端中;
4.    显示一个指定目录下所有文件;
5.    完成指定目录下指定类型文件的合并
6.    在HDFS中,将文件从源路径移动到目的路径。
7.    删除HDFS中指定的文件;

实验环境

操作系统:Linux

Hadoop版本:2.7.0或以上版本

JDK版本:1.8或以上版本

Java IDE:IDEA

项目文件结构

 代码

1.    在HDFS中创建一个新目录

新建Java class命名为 CreateDir

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.util.Scanner;
import java.net.URI;

public class CreateDir {
    public static void main(String[] args) {
        try {
            Scanner sc = new Scanner(System.in);
            String dirPath = '/'+sc.next();
            FileSystem fs = FileSystem.get(new URI("hdfs://172.18.0.2:9000"), new Configuration());
            Path hdfsPath = new Path(dirPath);
            if(fs.mkdirs(hdfsPath)){
                System.out.println("Directory "+ dirPath +" has been created successfully!");
            }
        }catch(Exception e) {
            e.printStackTrace();
        }
    }
}

运行截图:

  终端上进行Hadoop检测

 2.    从本地拷贝文件到新创建的目录中

新建Java class命名为 CopyFile

import java.net.URI;

import java.util.Scanner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class CopyFile {
    public static void main(String args[]) throws Exception {
        System.out.println("Input the filepath:");
        Scanner sc = new Scanner(System.in);
        Path src=new Path(sc.next());
        FileSystem fs = FileSystem.get(new URI("hdfs://172.18.0.2:9000"), new Configuration());
        Path dst =new Path("/mydir");
        fs.copyFromLocalFile(src, dst);
    }
}

运行结果

终端上进行Hadoop检测

3.    将HDFS中指定文件的内容输出到终端中

新建Java class命名为 ReadFile

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.net.URI;
import java.util.Scanner;

public class ReadFile {
    public static void main(String[] args) {
        try {
            Scanner sc = new Scanner(System.in);
            String filePath = '/'+sc.next();
            FileSystem fs = FileSystem.get(new URI("hdfs://172.18.0.2:9000"), new Configuration());
            
            Path srcPath = new Path(filePath);

            FSDataInputStream is = fs.open(srcPath);
            while(true) {
                String line = is.readLine();
                if(line == null) {
                    break;
                }
                System.out.println(line);
            }
            is.close();
        }catch(Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果

终端上进行Hadoop检测

这里为了第4,5问达到更好看的效果,Hadoop下面的mydir目录中,又上传了yourfile.txt文件,内容为“goodbye,big data!"。和herfile文件,内容是“big data is good!”如图:

 4.    显示一个指定目录下所有文件

新建Java class命名为 ListFile

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;

import java.net.URI;
import java.util.Scanner;

public class ListFiles {
    public static void main(String[] args) {
        try {
            Scanner sc = new Scanner(System.in);
            String filePath = sc.next();
            FileSystem fs = FileSystem.get(new URI("hdfs://172.18.0.2:9000"), new Configuration());
            Path srcPath = new Path(filePath);
            FileStatus[] stats = fs.listStatus(srcPath);
            Path[] paths = FileUtil.stat2Paths(stats);
            for(Path p : paths)
                System.out.println(p.getName());
        }catch(Exception e) {
            e.printStackTrace();
        }
    }
}

运行结果

终端上进行Hadoop检测

5.    完成指定目录下指定类型文件的合并

新建Java class命名为 MergeFiles,MyFileFilter

(本人对此处的“合并”,理解为“内容合并”,即将指定目录下的同类型文件内容写进一个文件。比如mydir目录下有三个文件myfile.txt、yourfile.txt、herfile,需过滤掉herfile文件,将两个文本文件myfile.txt、yourfile.txt内容写入一个文件mergeNew.txt,进行“合并”。)

注:通过自定义过滤函数实现指定类型功能。

MyFileFilter代码

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
class MyFileFilter implements PathFilter {
    String reg = null;
    MyFileFilter(String reg) {
        this.reg = reg;
    }
    public boolean accept(Path path) {
        if( (path.toString().matches(reg)) ) {
            return true;
        }
        return false;
    }
}

 MergeFiles代码

import java.io.*;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
public class MergeFiles{
    Path inputPath = null;//待合并文件所在目录的路径
    Path outputPath = null;//输出文件的路径
    public MergeFiles(String input, String output) {
        this.inputPath = new Path(input);
        this.outputPath = new Path(output);
    }
    public void doMerge() throws IOException {
        Configuration conf = new Configuration();
        //org.apache.hadoop.conf.Configuration 即访问配置项。
        //所有的配置项的值,如果在core-site.xml中有对应的配置,则以core-site.xml为准。
        conf.set("fs.defaultFS", "hdfs://172.18.0.2:9000");
        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
        FileSystem fsSource = FileSystem.get(URI.create(inputPath.toString()), conf);
        FileSystem fsDst = FileSystem.get(URI.create(outputPath.toString()), conf);
        //下面过找到输入目录中后缀为.txt的文件,需自写过滤函数MyPathFilter        
       FileStatus[] sourceStatus = fsSource.listStatus(inputPath, new MyPathFilter(“.*\\\\.txt”));
        //org.apache.hadoop.fs.FileStatus是一个接口,用于向客户端展示系统中文件和目录的元数据,
//具体包括文件大小、块大小、副本信息、所有者、修改时间等。可通过FileSystem.listStatus()方法获得具体的实例对象。
//   .*表示0个或多个任意字符  \\\\.构成一个转义字符,表示.
        FSDataOutputStream fsdos = fsDst.create(outputPath);
        PrintStream ps = new PrintStream(System.out);//文件输出流,用于写Hadoop文件
        for (FileStatus sta : sourceStatus) {//分别读取过滤之后的每个文件的内容,并输出到同一个文件中
            System.out.println("path:" + sta.getPath() + "  file size:" + sta.getLen() +
                    "  auth:" + sta.getPermission() + "  content:");//打印后缀不为.txt的文件的路径、大小、权限、内容
            FSDataInputStream fsdis = fsSource.open(sta.getPath());
            //文件输入流,用于读取Hadoop文件
            byte[] data = new byte[1024];
            int read = -1;
            while ((read = fsdis.read(data)) > 0) {
                ps.write(data, 0, read);
                fsdos.write(data, 0, read);
            }
            fsdis.close();
        }//end of for
        ps.close();
        fsdos.close();
    }//end of doMerge()
    public static void main(String[] args) throws IOException{
        MergeFiles MergeFiles = new MergeFiles(
                "hdfs://172.18.0.2:9000/mydir",
                "hdfs://172.18.0.2:9000/mergeNew.txt"
        );
        MergeFiles.doMerge();
    }
}

 运行结果

终端上进行Hadoop检测

6.    在HDFS中,将文件从源路径移动到目的路径

新建Java class命名为 MvFile

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.net.URI;
import java.util.Scanner;

public class MvFile{
    public static void main(String[] args) {
        try {
            Scanner sc = new Scanner(System.in);
            String srcStrPath = '/'+sc.next();
            String dstStrPath = '/'+sc.next();
            FileSystem fs = FileSystem.get(new URI("hdfs://172.18.0.2:9000"), new Configuration());
            Path srcPath = new Path(srcStrPath);
            Path dstPath = new Path(dstStrPath);
            if(fs.rename(srcPath,dstPath)) {
                System.out.println("movefile from " + srcStrPath + " to " + dstStrPath + "successfully!");
            }
        }catch(Exception e) {
            e.printStackTrace();
        }
    }
}

 运行结果 

 终端上进行Hadoop检测 

 7.    删除HDFS中指定的文件

新建Java class命名为 DeleteFile

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.net.URI;
import java.util.Scanner;

public class DeleteFile {
    public static void main(String[] args) {
        try {
            Scanner sc = new Scanner(System.in);
            String filePath = '/'+sc.next();
            FileSystem fs = FileSystem.get(new URI("hdfs://172.18.0.2:9000"), new Configuration());
            
            Path hdfsPath = new Path(filePath);
            if(fs.delete(hdfsPath,false)){
                System.out.println("File "+ filePath +" has been deleted successfully!");
            }
        }catch(Exception e) {
            e.printStackTrace();
        }
    }
}

 运行结果

  终端上进行Hadoop检测

 备注

问题1

用java操作hdfs的时候会报这个错误,大概是这个原因:因为在配置Hadoop时,fs.defaultFS值使用的是节点名称,所以ip要与master节点对应。"hdfs://100.65.0.2:9000"

以上是关于Java API实现HDFS有关目录与文件的相关功能的主要内容,如果未能解决你的问题,请参考以下文章

Java API操作Hdfs详细示例

Java API操作Hdfs详细示例

如何使用Java API读写HDFS

如何使用Java API访问HDFS为目录设置配额

HDFS文件系统的JAVA-API操作

如何使用Java API读写HDFS