从文件中读取,并归并排序

Posted lhever_

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从文件中读取,并归并排序相关的知识,希望对你有一定的参考价值。

如果要排序的数据太多了,就不能一次性加载到内存中进行排序,只能分而治之,然后再合并。
从第一次接触算法开始,本人就写过归并排序的算法。 只不过当年写的归并排序算法,数据是可以一次性加载到内存中完成排序的。
甚至在此后的许多年,工作中始终没有遇到过内存装不下但又需要排序的场景
不过最近闲来无事,所以想弥补一下大数据量场景下的排序经历,遂得此文。
本文已经完整的实现了归并排序的整个过程,经过自测发现,效率尚有待提升(主要是花在磁盘IO上的时间太多)。
另外, 由于没有封装, 需要依次运行如下几个类,才能走完整个归并流程。由于涉及的类有点多,所以在晾代码前依次解释一下各个类的作用:
1. TestFileGenerator.java 生成测试文件,该文件体积较大,该文件的每一行是一个32位的整数。稍后将会对文件中的数字进行排序
2. FileSpliter.java 该类可以将上一个类(TestFileGenerator.java)生成的大文件拆分成多个小文件。各个小文件的内容总和与大文件一致
3. FileSorter.java 读取每一个小文件中的所有整数, 并进行升序排序,然后再将排序后的数字重新写回文件
4. IndexMinPQ.java 优先队列的一个实现类,该类每次取出队列中的最小元素,该类支持将每个元素关联一个索引。 该类是支撑归并排序的核心类
5. LineReader.java 每调用一次该类的readLine()方法,可以从文件中读取一行
6. FileMergeSort.java 该类归并每一个小文件中的数值,最终得到所有数字升序排序的大文件(包含小文件中的所有数字,并且是升序排序的)

TEST.java
运行该类可以完整的走一遍 归并排序的流程,

运行代码之前建议事先在E盘建立目录E:/java_wps/,或者修改代码中磁盘路径,该示例是在windows中运行的,读者也可以改为linux路径再运行


public class Test 

    public static void main(String[] args) throws IOException 

        String originalFile = "E:/java_wps/test.txt";

        //生成1000个整数,保存到文件E:/java_wps/test.txt中
        TestFileGenerator.generateFile(originalFile, 1000);

        //每100个数字拆分到一个单独的文件中
        FileSpliter fs = new FileSpliter(new File(originalFile), new FileSpliter.WriteListener(100));
        fs.readLines();

        //找出上一个步骤生成的小文件
        File[] files = FileSorter.find("E:/java_wps");
        //将每一个小文件进行单独的排序
        Arrays.stream(files).forEach(f -> FileSorter.doSort(f));

        //归并排序每一个小文件中的内容,得到最终有序的大文件E:/java_wps/merge.txt
        FileMergeSort.mergeFiles(files, "E:/java_wps/merge.txt");


    


TestFileGenerator .java
运行该类可以生成测试文件,比如运行方法generateFile(“E:/java_wps/test.txt”, 1000);可以将1000个数字写入文件E:/java_wps/test.txt

import org.apache.commons.io.IOUtils;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Random;

public class TestFileGenerator 

    public static void generateFile(String filePath, long lineSize) throws IOException 

        File file = new File(filePath);
        if (file.exists()) 
            file.delete();
        
        FileOutputStream fos = new FileOutputStream(file, true);
        try 
            Random random = new Random(System.currentTimeMillis() +
                    new Random(System.currentTimeMillis() / 17).nextInt(100000));

            StringBuilder builder = new StringBuilder();
            String LF = "\\n";
            for(long i = 1; i <= lineSize; i++) 
                builder.setLength(0);
                builder.append(random.nextInt(Integer.MAX_VALUE));
                if (i < lineSize) 
                    builder.append(LF);
                
                IOUtils.write(builder.toString(), fos, "UTF-8");
            

         finally 
            IOUtils.closeQuietly(fos);
        

    

    public static void main(String[] args) throws IOException 
        TestFileGenerator.generateFile("E:/java_wps/test.txt", 1000);
    




FileSpliter .java
该类负责拆分文件,拆分的文件名称是sub-i.txt, i是一个序号,从1开始,一个大文件被拆分成3个,则3个子文件依次命名为sub-1.txt, sub-2.txt, sub-3.txt

import org.apache.commons.io.IOUtils;

import java.io.*;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.commons.io.IOUtils.EOF;

public class FileSpliter 

    private static final String RAF_MODE = "r";
    private final byte[] inbuf = new byte[8192];
    private final RandomAccessFile reader;
    private final Charset charset = Charset.forName("UTF-8");
    private final WriteListener listener;


    public FileSpliter(File file, WriteListener listener) throws IOException 
        this.reader = new RandomAccessFile(file, RAF_MODE);
        this.reader.seek(0L);
        this.listener = listener;
    


    public long readLines() throws IOException 
        try (ByteArrayOutputStream lineBuf = new ByteArrayOutputStream(64)) 
            long pos = reader.getFilePointer();
            long rePos = pos; // position to re-read
            int num;
            boolean seenCR = false;
            while ((num = reader.read(inbuf)) != EOF) 
                for (int i = 0; i < num; i++) 
                    final byte ch = inbuf[i];
                    switch (ch) 
                        case '\\n':
                            seenCR = false; // swallow CR before LF
                            listener.handle(new String(lineBuf.toByteArray(), charset));
                            lineBuf.reset();
                            rePos = pos + i + 1;
                            break;
                        case '\\r':
                            if (seenCR) 
                                lineBuf.write('\\r');
                            
                            seenCR = true;
                            break;
                        default:
                            if (seenCR) 
                                seenCR = false; // swallow final CR
                                listener.handle(new String(lineBuf.toByteArray(), charset));
                                lineBuf.reset();
                                rePos = pos + i + 1;
                            
                            lineBuf.write(ch);
                    
                
                pos = reader.getFilePointer();
            

            reader.seek(rePos); // Ensure we can re-read if necessary


            byte[] bytes = lineBuf.toByteArray();
            if (bytes.length > 0) 
                listener.handle(new String(bytes, charset));
            

            listener.write();

            return rePos;
        
    


    public static class WriteListener 

        public WriteListener(int threshold) 
            this.threshold = threshold;
        

        int threshold = 1000000;
        List<String> lines = new ArrayList<>();
        AtomicInteger atomicInteger = new AtomicInteger(1);


        public void handle(final String line) throws IOException 
            if (line != null) 
                lines.add(line);
            
            if (lines.size() >= threshold) 
                write();
                lines.clear();
            
        

        public void write() throws IOException 
            if (lines.size() <= 0) 
                return;
            
            FileOutputStream fos = null;
            try 
                File file = new File("E:/java_wps/" + "sub-" + atomicInteger.getAndAdd(1) + ".txt");
                fos = new FileOutputStream(file, true);
                IOUtils.writeLines(lines, null, fos, "UTF-8");
             finally 
                IOUtils.closeQuietly(fos);
            
        

    


    public static void main(String[] args) throws IOException 
        File file = new File("E:/java_wps/test.txt");
        //每100行拆分到一个单独的文件中
        FileSpliter fs = new FileSpliter(file, new WriteListener(100));
        fs.readLines();
    





FileSorter .java
读取每一个小文件中的整数, 并使用jdk自带算法(Arrays.sort)进行升序排序,然后再将排序后的数字重新写回文件

import org.apache.commons.io.Charsets;
import org.apache.commons.io.IOUtils;

import java.io.*;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class FileSorter 


    public static void main(String[] args) throws IOException 
        File[] files = find("E:/java_wps");
        Arrays.stream(files).forEach(f -> doSort(f));
    


    public static File[] find(String path) 
        File dir = new File(path);

        File[] files = dir.listFiles(new FileFilter() 
            @Override
            public boolean accept(File dir) 
                return dir.isFile() && dir.getName().startsWith("sub-") && dir.getName().endsWith(".txt");
            
        );

        return files;
    

    public static void writeLines(final int[] lines, String lineEnding, final OutputStream output,
                                  final Charset charset) throws IOException 
        if (lines == null) 
            return;
        
        if (lineEnding == null) 
            lineEnding = System.lineSeparator();
        
        final Charset cs = Charsets.toCharset(charset);
        for (final Object line : lines) 
            if (line != null) 
                output.write(line.toString().getBytes(cs));
            
            output.write(lineEnding.getBytes(cs));
        
    

    public static void doSort(File file) 
        try 
            sort(file);
         catch (IOException e) 
            e.printStackTrace();
        
    

    private static List<Integer> readFile(File file) throws IOException 
        List<String> strings = null;
        try(FileInputStream is = new FileInputStream(file)) 
            strings = IOUtils.readLines(is, "UTF-8");
        

        List<Integer> integers = new ArrayList<>();
        for (String item : strings) 
            if (item == null) 
                continue;
            
            item = item.trim();
            if (item.length() == 0) 
                continue;
            
            try 
                int i = Integer.parseInt(item);
                integers.add(i);
             catch (NumberFormatException e) 
            
        
        return integers;


    

    private static void sort(File file) throws IOException 
        List<Integer> integers = readFile(file);
        int[] ts = new int[integers.size()];
        int idx = 0;
        while (integers.size() > 0) 
            Integer remove = integers.remove(0);
            ts[idx++] = remove;
        

        Arrays.sort(ts);

        try(FileOutputStream fos = new FileOutputStream(file)) 
            writeLines(ts, null, fos, Charset.forName("UTF-8"));
        
    





IndexMinPQ.java
优先队列实现,该队列中的每一个元素可以关联一个整数索引

import java.util.Iterator;
import java.util.NoSuchElementException;

/**
 *  The <tt>IndexMinPQ</tt> class represents an indexed priority queue of generic keys.
 *  It supports the usual <em>insert</em> and <em>delete-the-minimum</em>
 *  operations, along with <em>delete</em> and <em>change-the-key</em> 
 *  methods. In order to let the client refer to keys on the priority queue,
 *  an integer between 0 and maxN-1 is associated with each key&mdash;the client
 *  uses this integer to specify which key to delete or change.
 *  It also supports methods for peeking at the minimum key,
 *  testing if the priority queue is empty, and iterating through
 *  the keys.
 *  <p>
 *  This implementation uses a binary heap along with an array to associate
 *  keys with integers in the given range.
 *  The <em>insert</em>, <em>delete-the-minimum</em>, <em>delete</em>,
 *  <em>change-key</em>, <em>decrease-key</em>, and <em>increase-key</em>
 *  operations tak

以上是关于从文件中读取,并归并排序的主要内容,如果未能解决你的问题,请参考以下文章

从文件中读取,并归并排序

从文件中读取,并归并排序

算法:多路归并的外排序

数据结构-排序外部排序

[ 数据结构 -- 手撕排序算法第六篇 ] 归并排序(下)-- 非递归方法实现

PG归并排序算法详解