如何使用Java API读写HDFS
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用Java API读写HDFS相关的知识,希望对你有一定的参考价值。
Java API读写HDFSpublic class FSOptr
/**
* @param args
*/
public static void main(String[] args) throws Exception
// TODO Auto-generated method stub
Configuration conf = new Configuration();
makeDir(conf);
rename(conf);
delete(conf);
// 创建文件目录
private static void makeDir(Configuration conf) throws Exception
FileSystem fs = FileSystem.get(conf);
Path dir = new Path("/user/hadoop/data/20140318");
boolean result = fs.mkdirs(dir);// 创建文件夹
System.out.println("make dir :" + result);
// 创建文件,并写入内容
Path dst = new Path("/user/hadoop/data/20140318/tmp");
byte[] buff = "hello,hadoop!".getBytes();
FSDataOutputStream outputStream = fs.create(dst);
outputStream.write(buff, 0, buff.length);
outputStream.close();
FileStatus files[] = fs.listStatus(dst);
for (FileStatus file : files)
System.out.println(file.getPath());
fs.close();
// 重命名文件
private static void rename(Configuration conf) throws Exception
FileSystem fs = FileSystem.get(conf);
Path oldName = new Path("/user/hadoop/data/20140318/1.txt");
Path newName = new Path("/user/hadoop/data/20140318/2.txt");
fs.rename(oldName, newName);
FileStatus files[] = fs.listStatus(new Path(
"/user/hadoop/data/20140318"));
for (FileStatus file : files)
System.out.println(file.getPath());
fs.close();
// 删除文件
@SuppressWarnings("deprecation")
private static void delete(Configuration conf) throws Exception
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/user/hadoop/data/20140318");
if (fs.isDirectory(path))
FileStatus files[] = fs.listStatus(path);
for (FileStatus file : files)
fs.delete(file.getPath());
else
fs.delete(path);
// 或者
fs.delete(path, true);
fs.close();
/**
* 下载,将hdfs文件下载到本地磁盘
*
* @param localSrc1
* 本地的文件地址,即文件的路径
* @param hdfsSrc1
* 存放在hdfs的文件地址
*/
public boolean sendFromHdfs(String hdfsSrc1, String localSrc1)
Configuration conf = new Configuration();
FileSystem fs = null;
try
fs = FileSystem.get(URI.create(hdfsSrc1), conf);
Path hdfs_path = new Path(hdfsSrc1);
Path local_path = new Path(localSrc1);
fs.copyToLocalFile(hdfs_path, local_path);
return true;
catch (IOException e)
e.printStackTrace();
return false;
/**
* 上传,将本地文件copy到hdfs系统中
*
* @param localSrc
* 本地的文件地址,即文件的路径
* @param hdfsSrc
* 存放在hdfs的文件地址
*/
public boolean sendToHdfs1(String localSrc, String hdfsSrc)
InputStream in;
try
in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();// 得到配置对象
FileSystem fs; // 文件系统
try
fs = FileSystem.get(URI.create(hdfsSrc), conf);
// 输出流,创建一个输出流
OutputStream out = fs.create(new Path(hdfsSrc),
new Progressable()
// 重写progress方法
public void progress()
// System.out.println("上传完一个设定缓存区大小容量的文件!");
);
// 连接两个流,形成通道,使输入流向输出流传输数据,
IOUtils.copyBytes(in, out, 10240, true); // in为输入流对象,out为输出流对象,4096为缓冲区大小,true为上传后关闭流
return true;
catch (IOException e)
e.printStackTrace();
catch (FileNotFoundException e)
e.printStackTrace();
return false;
/**
* 移动
*
* @param old_st原来存放的路径
* @param new_st移动到的路径
*/
public boolean moveFileName(String old_st, String new_st)
try
// 下载到服务器本地
boolean down_flag = sendFromHdfs(old_st, "/home/hadoop/文档/temp");
Configuration conf = new Configuration();
FileSystem fs = null;
// 删除源文件
try
fs = FileSystem.get(URI.create(old_st), conf);
Path hdfs_path = new Path(old_st);
fs.delete(hdfs_path);
catch (IOException e)
e.printStackTrace();
// 从服务器本地传到新路径
new_st = new_st + old_st.substring(old_st.lastIndexOf("/"));
boolean uplod_flag = sendToHdfs1("/home/hadoop/文档/temp", new_st);
if (down_flag && uplod_flag)
return true;
catch (Exception e)
e.printStackTrace();
return false;
// copy本地文件到hdfs
private static void CopyFromLocalFile(Configuration conf) throws Exception
FileSystem fs = FileSystem.get(conf);
Path src = new Path("/home/hadoop/word.txt");
Path dst = new Path("/user/hadoop/data/");
fs.copyFromLocalFile(src, dst);
fs.close();
// 获取给定目录下的所有子目录以及子文件
private static void getAllChildFile(Configuration conf) throws Exception
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/user/hadoop");
getFile(path, fs);
private static void getFile(Path path, FileSystem fs)throws Exception
FileStatus[] fileStatus = fs.listStatus(path);
for (int i = 0; i < fileStatus.length; i++)
if (fileStatus[i].isDir())
Path p = new Path(fileStatus[i].getPath().toString());
getFile(p, fs);
else
System.out.println(fileStatus[i].getPath().toString());
//判断文件是否存在
private static boolean isExist(Configuration conf,String path)throws Exception
FileSystem fileSystem = FileSystem.get(conf);
return fileSystem.exists(new Path(path));
//获取hdfs集群所有主机结点数据
private static void getAllClusterNodeInfo(Configuration conf)throws Exception
FileSystem fs = FileSystem.get(conf);
DistributedFileSystem hdfs = (DistributedFileSystem)fs;
DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();
String[] names = new String[dataNodeStats.length];
System.out.println("list of all the nodes in HDFS cluster:"); //print info
for(int i=0; i < dataNodeStats.length; i++)
names[i] = dataNodeStats[i].getHostName();
System.out.println(names[i]); //print info
//get the locations of a file in HDFS
private static void getFileLocation(Configuration conf)throws Exception
FileSystem fs = FileSystem.get(conf);
Path f = new Path("/user/cluster/dfs.txt");
FileStatus filestatus = fs.getFileStatus(f);
BlockLocation[] blkLocations = fs.getFileBlockLocations(filestatus,0,filestatus.getLen());
int blkCount = blkLocations.length;
for(int i=0; i < blkCount; i++)
String[] hosts = blkLocations[i].getHosts();
//Do sth with the block hosts
System.out.println(hosts);
//get HDFS file last modification time
private static void getModificationTime(Configuration conf)throws Exception
FileSystem fs = FileSystem.get(conf);
Path f = new Path("/user/cluster/dfs.txt");
FileStatus filestatus = fs.getFileStatus(f);
long modificationTime = filestatus.getModificationTime(); // measured in milliseconds since the epoch
Date d = new Date(modificationTime);
System.out.println(d);
参考技术A HDFS是Hadoop生态系统的根基,也是Hadoop生态系统中的重要一员,大部分时候,我们都会使用Linuxshell命令来管理HDFS,包括一些文件的创建,删除,修改,上传等等,因为使用shell命令操作HDFS的方式,相对比较简单,方便,但是有时候,我们也需要通过编程的方式来实现对文件系统的管理。比如有如下的一个小需求,要求我们实现读取HDFS某个文件夹下所有日志,经过加工处理后在写入到HDFS上,或者存进Hbase里,或者存进其他一些存储系统。这时候使用shell的方式就有点麻烦了,所以这时候我们就可以使用编程的方式来完成这件事了,当然散仙在这里使用的是原生的Java语言的方式,其他的一些语言例如C++,php,Python都可以实现,散仙在这里不给出演示了,(其实散仙也不会那些语言,除了刚入门的Python)。下面,散仙给出代码,以供参考:viewsourceprint?packagecom.java.api.hdfs;importjava.io.BufferedReader;importjava.io.IOException;importjava.io.InputStream;importjava.io.InputStreamReaderimportorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FileStatus;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;/***@author三劫散仙*JavaAPI操作HDFS*工具类****/publicclassOperaHDFSpublicstaticvoidmain(String[]args)throwsException//System.out.println("aaa");//uploadFile();//createFileOnHDFS();//deleteFileOnHDFS();//createDirectoryOnHDFS();//deleteDirectoryOnHDFS();//renameFileOrDirectoryOnHDFS();readHDFSListAll();/****加载配置文件***/staticConfigurationconf=newConfiguration();/***重名名一个文件夹或者文件publicstaticvoidrenameFileOrDirectoryOnHDFS()throwsExceptionFileSystemfs=FileSystem.get(conf);Pathp1=newPath("hdfs://10.2.143.5:9090/root/myfile/my.txt");fs.rename(p1,p2);System.out.println("重命名文件夹或文件成功..");/*****读取HDFS某个文件夹的所有*文件,并打印****/publicstaticvoidreadHDFSListAll()throwsException//流读入和写入InputStreamin=null;//获取HDFS的conf//读取HDFS上的文件系统FileSystemhdfs=FileSystem.get(conf);//使用缓冲流,进行按行读取的功能BufferedReaderbuff=null;//获取日志文件的根目录Pathlistf=newPath("hdfs://10.2.143.5:9090/root/myfile/");//获取根目录下的所有2级子文件目录FileStatusstats[]=hdfs.listStatus(listf);//自定义j,方便查看插入信息intj=0;for(inti=0;i 参考技术B 您好,很高兴能帮助您, 要是每个块都占用整个块的空间 那么hdfs需要多少存储空间啊 hdfs的块只是逻辑概念 我是这么理解的 你的采纳是我前进的动力,还有不懂的地方,请你继续“追问”! 如你还有别的问题,可另外向我求助;
以上是关于如何使用Java API读写HDFS的主要内容,如果未能解决你的问题,请参考以下文章