使用javaAPI操作hdfs
Posted 郭子仪
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用javaAPI操作hdfs相关的知识,希望对你有一定的参考价值。
使用javaAPI操作hdfs
package com.zuoyan.hadoop; import java.io.FileOutputStream; import java.io.IOException; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class XPath { /** * 这里的创建文件夹同shell中的mkdir -p 语序前面的文件夹不存在 * 跟java中的IO操作一样,也只能对path对象做操作;但是这里的Path对象是hdfs中的 * @param fs * @return */ public boolean myCreatePath(FileSystem fs){ boolean b = false; Path path = new Path("/data/test/"); try { // even the path exist,it can also create the path. b = fs.mkdirs(path); } catch (IOException e) { e.printStackTrace(); } finally { try { fs.close(); } catch (IOException e) { e.printStackTrace(); } } return b; } /** * 删除文件,实际上删除的是给定path路径的最后一个 * 跟java中一样,也需要path对象,不过是hadoop.fs包中的。 * 实际上delete(Path p)已经过时了,更多使用delete(Path p,boolean recursive) * 后面的布尔值实际上是对文件的删除,相当于rm -r * @param fs * @return */ public boolean myDropHdfsPath(FileSystem fs){ boolean b = false; Path path = new Path("/data/test/hadoop-2.6.0.tar.gz"); try { b = fs.delete(path,b); } catch (IOException e) { e.printStackTrace(); } finally { try { fs.close(); } catch (IOException e) { e.printStackTrace(); } } return b; } /** * 重命名文件夹 * @param hdfs * @return */ public boolean myRename(FileSystem hdfs){ boolean b = false; Path oldPath = new Path("/data/test1"); Path newPath = new Path("/data/test"); try { b = hdfs.rename(oldPath,newPath); } catch (IOException e) { e.printStackTrace(); }finally { try { hdfs.close(); } catch (IOException e) { e.printStackTrace(); } } return b; } /** * 遍历文件夹 * public FileStatus[] listStatus(Path p) * 通常使用HDFS文件系统的listStatus(path)来获取改定路径的子路径。然后逐个判断 * 值得注意的是: * 1.并不是总有文件夹中有文件,有些文件夹是空的,如果仅仅做是否为文件的判断会有问题,必须加文件的长度是否为0的判断 * 2.使用getPath()方法获取的是FileStatus对象是带URL路径的。使用FileStatus.getPath().toUri().getPath()获取的路径才是不带url的路径 * @param hdfs * @param listPath 传入的HDFS开始遍历的路径 * @return */ public Set<String> recursiveHdfsPath(FileSystem hdfs,Path listPath){ Set<String> set = new HashSet<String>(); FileStatus[] files = null; try { files = hdfs.listStatus(listPath); // 实际上并不是每个文件夹都会有文件的。 if(files.length == 0){ // 如果不使用toUri(),获取的路径带URL。 set.add(listPath.toUri().getPath()); }else { // 判断是否为文件 for (FileStatus f : files) { if (files.length == 0 || f.isFile()) { set.add(f.getPath().toUri().getPath()); } else { // 是文件夹,且非空,就继续遍历 recursiveHdfsPath(hdfs, f.getPath()); } } } } catch (IOException e) { e.printStackTrace(); } return set; } /** * 文件简单的判断 * 是否存在 * 是否是文件夹 * 是否是文件 * @param fs */ public void myCheck(FileSystem fs){ boolean isExists = false; boolean isDirectorys = false; boolean isFiles = false; Path path = new Path("/data/test"); try { isExists = fs.exists(path); isDirectorys = fs.isDirectory(path); isFiles = fs.isFile(path); } catch (IOException e){ e.printStackTrace(); } finally { try { fs.close(); } catch (IOException e) { e.printStackTrace(); } } if(!isExists){ System.out.println("lu jing not cun zai."); }else{ System.out.println("lu jing cun zai."); if(isDirectorys){ System.out.println("Directory"); }else if(isFiles){ System.out.println("Files"); } } } /** * 获取配置的所有信息-需要本地Hadoop环境 * 首先,我们要知道配置文件是哪一个 * 然后我们将获取的配置文件用迭代器接收 * 实际上配置中是KV对,我们可以通过java中的Entry来接收 * */ public void showAllConf(){ Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://huabingood01:9000"); Iterator<Map.Entry<String,String>> it = conf.iterator(); while(it.hasNext()){ Map.Entry<String,String> entry = it.next(); System.out.println(entry.getKey()+"=" +entry.getValue()); } } /** * 文件下载(需要本地Hadoop环境) * 注意下载的路径的最后一个地址是下载的文件名 * copyToLocalFile(Path local,Path hdfs) * 下载命令中的参数是没有任何布尔值的,如果添加了布尔是,意味着这是moveToLocalFile() * @param fs */ public void getFileFromHDFS(FileSystem fs){ String url = "/data/test/hadoop-2.6.0.tar.gz"; String url1 = "E:\hadoop-2.6.0.tar.gz"; Path HDFSPath = new Path(url); Path localPath = new Path(url1); try { fs.copyToLocalFile(HDFSPath,localPath); } catch (IOException e) { e.printStackTrace(); }finally { try { fs.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * 文件的上传至hdfs文件 * 注意事项同文件的上传 * 注意如果上传的路径不存在会自动创建 * 如果存在同名的文件,会覆盖 * @param fs */ public void myPutFile2HDFS(FileSystem fs){ boolean pathExists = false; // 如果上传的路径不存在会创建 // 如果该路径文件已存在,就会覆盖 String url = "E:\java\安装包\hadoop-all\hadoop-2.6.0.tar.gz"; Path localPath = new Path(url); Path hdfsPath = new Path("/data/test/hadoop-2.6.0-11.tar.gz"); try { fs.copyFromLocalFile(localPath,hdfsPath); } catch (IOException e) { e.printStackTrace(); }finally { try { fs.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * hdfs文件下载到本地 * @param fs * @throws IOException */ public void getFileFromHDFS1(FileSystem fs) throws IOException{ String url = "/data/test/hadoop-2.6.0.tar.gz"; String url1 = "E:\hadoop-2.6.0.tar.gz"; FSDataInputStream fsDataInputStream=fs.open(new Path(url)); FileOutputStream fileOutputStream=new FileOutputStream(url1); IOUtils.copyBytes(fsDataInputStream,fileOutputStream,1024*1024*64,false); fsDataInputStream.close(); fileOutputStream.close(); } /** * hdfs之间文件的复制 * 使用FSDataInputStream来打开文件open(Path p) * 使用FSDataOutputStream开创建写到的路径create(Path p) * 使用 IOUtils.copyBytes(FSDataInputStream,FSDataOutputStream,int buffer,Boolean isClose)来进行具体的读写 * 说明: * 1.java中使用缓冲区来加速读取文件,这里也使用了缓冲区,但是只要指定缓冲区大小即可,不必单独设置一个新的数组来接受 * 2.最后一个布尔值表示是否使用完后关闭读写流。通常是false,如果不手动关会报错的 * @param hdfs * hdfs之间文件的复制 */ public void copyFileBetweenHDFS(FileSystem hdfs){ Path inPath = new Path("/data/test/hadoop-2.6.0.tar.gz"); Path outPath = new Path("/data/hadoop-2.6.0.tar.gz"); // byte[] ioBuffer = new byte[1024*1024*64]; // int len = 0; FSDataInputStream hdfsIn = null; FSDataOutputStream hdfsOut = null; try { hdfsIn = hdfs.open(inPath); hdfsOut = hdfs.create(outPath); IOUtils.copyBytes(hdfsIn,hdfsOut,1024*1024*64,false); /*while((len=hdfsIn.read(ioBuffer))!= -1){ IOUtils.copyBytes(hdfsIn,hdfsOut,len,true); }*/ } catch (IOException e) { e.printStackTrace(); }finally { try { hdfsOut.close(); hdfsIn.close(); } catch (IOException e) { e.printStackTrace(); } } } }
以上是关于使用javaAPI操作hdfs的主要内容,如果未能解决你的问题,请参考以下文章