如何把hdfs上的多个目录下的文件合并为一个文件

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何把hdfs上的多个目录下的文件合并为一个文件相关的知识,希望对你有一定的参考价值。

hdfs dfs -cat /folderpath/folder* | hdfs dfs -copyFromLocal - /newfolderpath/file
1
1
这样可以把文件hdfs上 /folderpath目录下的/folder开头的文件,还不合并到/newfolderpath目录下的file一个文件中 注意/folder*必须是文件,而不能是文件夹,如果是文件夹,可以/folder*/*

cat test.txt | ssh test@masternode "hadoop dfs -put - hadoopFoldername/"
1
1
可以这样把本机的文件put到HDFS上面,而不用先复制文件到集群机器上
参考技术A 版本 2.程序集 窗口程序集1.子程序 __启动窗口_创建完毕编辑框1.是否允许多行 = 真编辑框1.内容 = 到文本 (读入文件 (取运行目录 () + “\指定文件.txt”))本回答被提问者采纳

hadoop 将HDFS上多个小文件合并到SequenceFile里

背景:hdfs上的文件最好和hdfs的块大小的N倍。如果文件太小,浪费namnode的元数据存储空间以及内存,如果文件分块不合理也会影响mapreduce中map的效率。

本例中将小文件的文件名作为key,其内容作为value生成SequenceFile

1、生成文件

 //将目标目录的所有文件以文件名为key,内容为value放入SequenceFile中
    //第一个参数是需要打包的目录,第二个参数生成的文件路径和名称
    private static void combineToSequenceFile(String[] args) throws IOException {
        String sourceDir = args[0];
        String destFile = args[1];

        List<String> files = getFiles(sourceDir);

        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Path destPath = new Path(destFile);
        if (fs.exists(destPath)) {
            fs.delete(destPath, true);
        }

        FSDataInputStream in = null;

        Text key = new Text();
        BytesWritable value = new BytesWritable();

        byte[] buff = new byte[4096];
        SequenceFile.Writer writer = null;

        SequenceFile.Writer.Option option1 = SequenceFile.Writer.file(new Path(destFile));
        SequenceFile.Writer.Option option2 = SequenceFile.Writer.keyClass(key.getClass());
        SequenceFile.Writer.Option option3 = SequenceFile.Writer.valueClass(value.getClass());
        SequenceFile.Writer.Option option4 = SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD);
        try {
            writer = SequenceFile.createWriter(conf, option1, option2, option3, option4);
            for (int i = 0; i < files.size(); i++) {
                Path path = new Path(files.get(i).toString());
                System.out.println("读取文件:" + path.toString());
                key = new Text(files.get(i).toString());
                in = fs.open(path);
//                只能处理小文件,int最大只能表示到1个G的大小,实际上大文件放入SequenceFile也没有意义
                int length = (int) fs.getFileStatus(path).getLen();
                byte[] bytes = new byte[length];
//                read最多只能读取65536的大小
                int readLength = in.read(buff);
                int offset = 0;
                while (readLength > 0) {
                    System.arraycopy(buff, 0, bytes, offset, readLength);
                    offset += readLength;
                    readLength = in.read(buff);
                }
                System.out.println("file length:" + length + ",read length:" + offset);
                value = new BytesWritable(bytes);
                System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value.getLength());
                writer.append(key, value);
            }
        } finally {
            IOUtils.closeStream(in);
            IOUtils.closeStream(writer);
            IOUtils.closeStream(fs);
        }

    }

查找文件:

    private static List<String> getFiles(String dir) throws IOException {
        Configuration conf = new Configuration();
        Path path = new Path(dir);
        FileSystem fs = null;
        List<String> filelist = new ArrayList<>();
        try {
            fs = FileSystem.get(conf);

            //对单个文件或目录下所有文件和目录
            FileStatus[] fileStatuses = fs.listStatus(path);

            for (FileStatus fileStatus : fileStatuses) {
                //递归查找子目录
                if (fileStatus.isDirectory()) {
                    filelist.addAll(getFiles(fileStatus.getPath().toString()));
                } else {
                    filelist.add(fileStatus.getPath().toString());
                }
            }
            return filelist;
        } finally {
            IOUtils.closeStream(fs);
        }
    }

2、还原压缩的SequenceFile文件

    //将combineToSequenceFile生成的文件分解成原文件。
    private static void extractCombineSequenceFile(String[] args) throws IOException {
        String sourceFile = args[0];
//        String destdir = args[1];
        Configuration conf = new Configuration();
        Path sourcePath = new Path(sourceFile);

        SequenceFile.Reader reader = null;
        SequenceFile.Reader.Option option1 = SequenceFile.Reader.file(sourcePath);

        Writable key = null;
        Writable value = null;
//        Text key = null;
//        BytesWritable value = null;

        FileSystem fs = FileSystem.get(conf);
        try {
            reader = new SequenceFile.Reader(conf, option1);
            key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
            value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);

            //在知道key和value的明确类型的情况下,可以直接用其类型
//            key = ReflectionUtils.newInstance(Text.class, conf);
//            value =  ReflectionUtils.newInstance(BytesWritable.class, conf);
            long position = reader.getPosition();
            while (reader.next(key, value)) {
                FSDataOutputStream out = fs.create(new Path(key.toString()), true);
                //文件头会多出4个字节,用来标识长度,而本例中原文件头是没有长度的,所以不能用这个方式写入流
//                value.write(out);
                out.write(((BytesWritable)value).getBytes(),0,((BytesWritable)value).getLength());

                //                out.write(value.getBytes(),0,value.getLength());
                System.out.printf("[%s]\t%s\t%s\n", position, key, out.getPos());
                out.close();
                position = reader.getPosition();
            }
        } finally {
            IOUtils.closeStream(reader);
            IOUtils.closeStream(fs);
        }
    }

 

以上是关于如何把hdfs上的多个目录下的文件合并为一个文件的主要内容,如果未能解决你的问题,请参考以下文章

hadoop 将HDFS上多个小文件合并到SequenceFile里

合并 HDFS 上的压缩文件

HDFS简单编程实例:文件合并

EXCEL如何把同一个目录下多个工作薄合并到一个工作薄

如何快速的?合并多个excel文件到一个excel里面?

如何将多个HDFS文件压缩为一个