使用 lucene 改进多线程索引

Posted

技术标签:

【中文标题】使用 lucene 改进多线程索引【英文标题】:Improve multi-thread indexing with lucene 【发布时间】:2012-03-08 05:55:11 【问题描述】:

我正在尝试使用多个线程在 Lucene 中构建我的索引。因此,我开始编写代码并编写了以下代码。首先,我找到文件,并为每个文件创建一个线程来索引它。之后我加入线程并优化索引。它有效,但我不确定......我可以大规模信任它吗?有什么办法可以改善吗?

import java.io.File;
import java.io.FileFilter;
import java.io.FileReader;
import java.io.IOException;
import java.io.File;
import java.io.FileReader;
import java.io.BufferedReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Document;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.analysis.StopAnalyzer;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.Version;
import org.apache.lucene.index.TermFreqVector;

public class mIndexer extends Thread 

    private File ifile;
    private static IndexWriter writer;

    public mIndexer(File f) 
    ifile = f.getAbsoluteFile();
    

    public static void main(String args[]) throws Exception 
    System.out.println("here...");

    String indexDir;
        String dataDir;
    if (args.length != 2) 
        dataDir = new String("/home/omid/Ranking/docs/");
        indexDir = new String("/home/omid/Ranking/indexes/");
    
    else 
        dataDir = args[0];
        indexDir = args[1];
    

    long start = System.currentTimeMillis();

    Directory dir = FSDirectory.open(new File(indexDir));
    writer = new IndexWriter(dir,
    new StopAnalyzer(Version.LUCENE_34, new File("/home/omid/Desktop/stopwords.txt")),
    true,
    IndexWriter.MaxFieldLength.UNLIMITED);
    int numIndexed = 0;
    try 
        numIndexed = index(dataDir, new TextFilesFilter());
     finally 
        long end = System.currentTimeMillis();
        System.out.println("Indexing " + numIndexed + " files took " + (end - start) + " milliseconds");
        writer.optimize();
        System.out.println("Optimization took place in " + (System.currentTimeMillis() - end) + " milliseconds");
        writer.close();
    
    System.out.println("Enjoy your day/night");
    

    public static int index(String dataDir, FileFilter filter) throws Exception 
    File[] dires = new File(dataDir).listFiles();
    for (File d: dires) 
        if (d.isDirectory()) 
        File[] files = new File(d.getAbsolutePath()).listFiles();
        for (File f: files) 
            if (!f.isDirectory() &&
            !f.isHidden() &&
            f.exists() &&
            f.canRead() &&
            (filter == null || filter.accept(f))) 
                Thread t = new mIndexer(f);
                t.start();
                t.join();
            
        
        
    
    return writer.numDocs();
    

    private static class TextFilesFilter implements FileFilter 
    public boolean accept(File path) 
        return path.getName().toLowerCase().endsWith(".txt");
    
    

    protected Document getDocument() throws Exception 
    Document doc = new Document();
    if (ifile.exists()) 
        doc.add(new Field("contents", new FileReader(ifile), Field.TermVector.YES));
        doc.add(new Field("path", ifile.getAbsolutePath(), Field.Store.YES, Field.Index.NOT_ANALYZED));
        String cat = "WIR";
        cat = ifile.getAbsolutePath().substring(0, ifile.getAbsolutePath().length()-ifile.getName().length()-1);
        cat = cat.substring(cat.lastIndexOf('/')+1, cat.length());
        //doc.add(new Field("category", cat.subSequence(0, cat.length()), Field.Store.YES));
        //System.out.println(cat.subSequence(0, cat.length()));
    
    return doc;
    

    public void run() 
    try 
        System.out.println("Indexing " + ifile.getAbsolutePath());
        Document doc = getDocument();
        writer.addDocument(doc);
     catch (Exception e) 
        System.out.println(e.toString());
    

    

任何 hep 都会被考虑。

【问题讨论】:

【参考方案1】:

如果你想并行化索引,你可以做两件事:

并行调用 addDocument, 增加合并调度程序的最大线程数。

您在并行化对 addDocuments 的调用方面是正确的,但是随着您需要索引的文档数量的增加,每个文档生成一个线程不会扩展。您应该使用固定大小的ThreadPoolExecutor。由于此任务主要是 CPU 密集型任务(取决于您的分析器和检索数据的方式),因此将计算机的 CPU 数量设置为最大线程数可能是一个好的开始。

关于合并调度程序,您可以增加setMaxThreadCount method of ConcurrentMergeScheduler 可以使用的最大线程数。请注意,磁盘在顺序读取/写入方面比随机读取/写入要好得多,因此为合并调度程序设置过高的最大线程数更有可能减慢索引速度而不是加快速度。

但在尝试并行化索引过程之前,您可能应该尝试找出瓶颈所在。如果您的磁盘太慢,瓶颈可能是刷新和合并步骤,因此并行调用 addDocument(主要包括分析文档并在内存中缓冲分析结果)不会提高索引速度完全没有。

一些旁注:

在 Lucene 的开发版本中正在进行一些工作,以提高索引并行性(尤其是刷新部分,blog entry 解释了它的工作原理)。

Lucene 在how to improve indexing speed 上有一个不错的 wiki 页面,您可以在其中找到提高索引速度的其他方法。

【讨论】:

非常感谢您的帮助。您对线程数的评论非常有用。我之前没提过……【参考方案2】:

我认为更现代的方法是使用ThreadPoolExecutor 并提交一个Runnable 来进行索引。您可以使用 .awaitTermination 或 CountdownLatch 等待所有线程终止。

我不喜欢让你的主类扩展 Thread,只需创建一个可运行的内部类,它在构造函数中获取其依赖关系。这使您的代码更具可读性,因为线程所做的工作与您的应用程序设置代码明显分开。

关于风格的几点说明,我不喜欢让你的主类抛出异常,这通常意味着你对你使用的代码可能抛出的不同检查异常情况没有清晰的概念。除非您有非常具体的原因,否则通常这不是正确的做法。

【讨论】:

提前谢谢您。实际上,我实现了 Runnable,这是一个好主意,并使用了 ThreadPoolExecutor,它解决了 jpountz 提到的程序中的一个真正的错误。 awaitTermination 的缺点是它不会等待所有线程完成,而是会在 n 个时间单位后退出。 :-( 循环是必要的。 同意,这将证明 IndexWriter 无法正常关闭。即使索引目录没有被索引编写器操作, writer_lock 仍然存在。

以上是关于使用 lucene 改进多线程索引的主要内容,如果未能解决你的问题,请参考以下文章

单例模式--饿汉懒汉多线程以及多线程下改进

delphi 线程教学第四节:多线程类的改进

*Android 多线程下载 仿下载助手(改进版)

EchoServer和EchoClient模型的改进1之多线程

Linux VM(重型多线程应用程序)的性能改进

64.文件载入内存进行多线程以及根据索引文件进行多线程索引