HDFS与Lucene

Posted 山形依旧

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HDFS与Lucene相关的知识,希望对你有一定的参考价值。

一.  hadoop自定义类型

Hadoop的自定制数据类型
一般有两个办法,一种较为简单的是针对值,另外一种更为完整的是对于键和值都适应的方法:
1、实现Writable接口:

/* DataInput and DataOutput 类是java.io的类 */
    public interface Writable {
        void readFields(DataInput in);
        void write(DataOutput out);
    }

下面是一个小例子:

 public class Point3D implement Writable {
        public float x, y, z;
        public Point3D(float fx, float fy, float fz) {
             this.x = fx;
             this.y = fy;
             this.z = fz;
        }
        public Point3D() {
            this(0.0f, 0.0f, 0.0f);
        }
        public void readFields(DataInput in) throws IOException {
            x = in.readFloat();
            y = in.readFloat();
            z = in.readFloat();
        }
        public void write(DataOutput out) throws IOException {
            out.writeFloat(x);
            out.writeFloat(y);
            out.writeFloat(z);
        }
        public String toString() {
            return Float.toString(x) + ", " + Float.toString(y) + ", " + Float.toString(z);
        }
    }

2、对于键来说,需要指定排序规则(呃,这句话可能有点C++风格?),对此,Java版Hadoop的办法是实现WritableComparable这个泛型接口,WritableComparable,顾名思义了,一半是Writable,一半是Comparable

  public interface WritableComparable<T> {
        public void readFields(DataInput in);
        public void write(DataOutput out);
        public int compareTo(T other);
    }

先给出下面的简单例子,再做说明和扩展。

public class Point3D inplements WritableComparable {
    public float x, y, z;
    public Point3D(float fx, float fy, float fz) {
         this.x = fx;
         this.y = fy;
         this.z = fz;
    }
    public Point3D() {
        this(0.0f, 0.0f, 0.0f);
    }
    public void readFields(DataInput in) throws IOException {
        x = in.readFloat();
        y = in.readFloat();
        z = in.readFloat();
    }
    public void write(DataOutput out) throws IOException {
        out.writeFloat(x);
        out.writeFloat(y);
        out.writeFloat(z);
    }
    public String toString() {
        return Float.toString(x) + ", " + Float.toString(y) + ", " + Float.toString(z);
    }
    public float distanceFromOrigin() {
        return (float) Math.sqrt( x*+ y*+z*z);
    }
    public int compareTo(Point3D other) {
        return Float.compareTo(distanceFromOrigin(),other.distanceFromOrigin());
    }
    public boolean equals(Object o) {
        if( !(instanceof Point3D)) {
            return false;
        }
        Point3D other = (Point3D) o;
        return this.x == o.x && this.y == o.y && this.z == o.z;
    }
    /* 实现 hashCode() 方法很重要
    * Hadoop的Partitioners会用到这个方法,后面再说
    */

    public int hashCode() {
         return Float.floatToIntBits(x) ^ Float.floatToIntBits(y) ^ Float.floatToIntBits(z);
    }
}


自定义Hadoop数据类型后,需要明确告诉Hadoop来使用它们。这是 JobConf 所能担当的了。使用setOutputKeyClass() / setOutputValueClass()方法即可:

void setOutputKeyClass(Class  theClass)
void setOutputValueClass(Class  theClass)

通常(默认条件下),这个函数对Map和Reduce阶段的输出都起到作用,当然也有专门的 setMapOutputKeyClass() / setReduceOutputKeyClass() 接口。

二.  HDFS上构建Lucene索引

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator; 
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
/*
 * 自定义的一种hadoop输出类型,存储的内容是一个Map<String,String>.
 */

public class HDFSDocument implements Writable{
    HashMap<String,String> fields = new HashMap<String, String>();
 
    public void setFields(HashMap<String,String> fields){
        this.fields = fields;
    }
    public HashMap<String,String> getFields(){
        return this.fields;
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        fields.clear();
 
        String key = null, value = null;
 
        int size = WritableUtils.readVInt(in);
        for (int i = 0; i < size; i ++){
                // 依次读取两个字符串,形成一个Map值
                key = in.readUTF();
                value = in.readUTF();
                fields.put(key,value);
            }
        }
    }
    @Override
    public void write(DataOutput out) throws IOException {
        String key = null, value = null;
 
        Iterator<String> iter = fields.keySet().iterator();
        while(iter.hasNext()){
            key = iter.next();
            value = fields.get(key);
             
            // 依次写入<Key,Value>两个字符串
            out.writeUTF(key);
            out.writeUTF(value);
        }
    }
}

三.  hadoop上利用lucene实现分布式索引

import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.mapred.RecordWriter;

public class HDSDocumentOutput extends FileOutputFormat&lt;Text, HDFSDocument&gt;{

@Override
public RecordWriter&lt;Text, HDFSDocument&gt; getRecordWriter(
final FileSystem fs, JobConf job, String name, final Progressable progress)
throws IOException {
// LuceneWriter是包含Lucene的IndexWriter对象的类
final LuceneWriter lw = new LuceneWriter();
// 完成索引前的配置工作
lw.open(job, name);

return new RecordWriter&lt;Text, HDFSDocument&gt;(){

@Override
public void close(final Reporter reporter) throws IOException {
// 完成索引优化,关闭IndexWriter的对象
lw.close();
}

@Override
public void write(Text arg0, HDFSDocument doc) throws IOException {
// 建立索引
lw.write(doc);
}
};
}
}

LuceneWriter类接受HDFSDocument类的对象,从中读取信息,完成建立索引和优化的操作。LuceneWriter类的代码如下:

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Random;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.lucene.analysis.LimitTokenCountAnalyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LogDocMergePolicy;
import org.apache.lucene.index.LogMergePolicy;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.Version;

public class LuceneWriter {

private Path perm;
private Path temp;
private FileSystem fs;
private IndexWriter writer;

public void open(JobConf job, String name) throws IOException{
this.fs = FileSystem.get(job);
perm = new Path(FileOutputFormat.getOutputPath(job), name);

// 临时本地路径,存储临时的索引结果
temp = job.getLocalPath("index/_" + Integer.toString(new Random().nextInt()));
fs.delete(perm, true);

// 配置IndexWriter(个人对Lucene索引的参数不是太熟悉)
LimitTokenCountAnalyzer ltca = new LimitTokenCountAnalyzer(new StandardAnalyzer(Version.LUCENE_34),
Integer.MAX_VALUE);
IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_34, ltca);
conf.setMaxBufferedDocs(100000);
LogMergePolicy mergePolicy = new LogDocMergePolicy();
mergePolicy.setMergeFactor(100000);
mergePolicy.setMaxMergeDocs(100000);
conf.setMergePolicy(mergePolicy);
conf.setRAMBufferSizeMB(256);
conf.setMergePolicy(mergePolicy);

writer = new IndexWriter(FSDirectory.open(new File(fs.startLocalOutput(perm, temp).toString())),
conf);
}
public void close() throws IOException{
// 索引优化和IndexWriter对象关闭
writer.optimize();
writer.close();

// 将本地索引结果拷贝到HDFS上
fs.completeLocalOutput(perm, temp);
fs.createNewFile(new Path(perm,"index.done"));
}

/*
* 接受HDFSDocument对象,从中读取信息并建立索引
*/

public void write(HDFSDocument doc) throws IOException{

String key = null;
HashMap&lt;String, String&gt; fields = doc.getFields();
Iterator iter = fields.keySet().iterator();
while(iter.hasNext()){
key = iter.next();

Document luceneDoc = new Document();

// 如果使用Field.Index.ANALYZED选项,则默认情况下会对中文进行分词。
// 如果这时候采用Term的形式进行检索,将会出现检索失败的情况。
luceneDoc.add(new Field("key", key, Field.Store.YESField.Index.NOT_ANALYZED));
luceneDoc.add(new Field("value", fields.get(key)Field.Store.YESField.Index.NOT_ANALYZED));
writer.addDocument(luceneDoc);
}
}
}

最后,需要设置任务的输出格式,代码如下:

job.setOutputValueClass(HDFSDocument.class);
job.setOutputFormat(HDSDocumentOutput.class);

到此,基于Lucene的HDFS分布式索引构建完成。可以看出,这种建立索引方式,是先在本地建立索引,然后再拷贝到HDFS上的。

三.  hadoop上利用lucene实现分布式检索

import java.io.IOException;

import org.apache.lucene.document.Document;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;

public class LuceneHDFSSearcher {
    private IndexReader reader;
    private IndexSearcher searcher;
    public LuceneHDFSSearcher(FsDirectory dir) throws CorruptIndexException, IOException{
    reader = IndexReader.open(dir);
    searcher = new IndexSearcher(reader);
}

public Document search(String value) throws CorruptIndexException, IOException{
    Query query = null;
    Term term = new Term("key",value);
    query = new TermQuery(term);

    Document doc = null;
    TopDocs hits = searcher.search(query,1);
    for(ScoreDoc scoreDoc:hits.scoreDocs){
        doc = searcher.doc(scoreDoc.doc);
    }
    return doc;
}
public void close() throws IOException{
// 关闭对象
    searcher.close();
    reader.close();
}
}

可以看出,该检索方法和单机进行Lucene检索的方法是基本一样的,唯一一个不同的地方是:单机检索时,使用的是FSDirectory类来构建IndexReader,而这里则使用FsDirectory类。这个类封装了原有类的方法,并支持HDFS。Nutch中使用的就是FsDirectory类,可以从Nutch的源码中获得该类的源码,直接拷贝过来使用即可。

Nutch1.0里的FsDirectory.java源代码

   import java.io;
   import java.util.Random;   
    import org.apache.lucene.store;
    import org.apache.nutch.util.HadoopFSUtil;
    import org.apache.hadoop.fs;
   import org.apache.hadoop.conf.Configuration;
    /** Reads a Lucene index stored in DFS. */
   public class FsDirectory extends Directory {
    private FileSystem fs;
     private Path directory;
   private int ioFileBufferSize;
   public FsDirectory(FileSystem fs, Path directory, boolean create, Configuration conf)  throws IOException {
   this.fs = fs;
    this.directory = directory;
    this.ioFileBufferSize = conf.getInt("io.file.buffer.size"4096);    
    if (create) {
         create();
       } 
     if (!fs.getFileStatus(directory).isDir())
        throw new IOException(directory + " not a directory");
     }
        private void create() throws IOException {
     if (!fs.exists(directory)) {
          fs.mkdirs(directory);
       }
   
    if (!fs.getFileStatus(directory).isDir())
     throw new IOException(directory + " not a directory"); 
      // clear old files
     FileStatus[] fstats = fs.listStatus(directory, HadoopFSUtil.getPassAllFilter());
     Path[] files = HadoopFSUtil.getPaths(fstats);
     for (int i = 0; i < files.length; i++) {
      if (!fs.delete(files[i]false))
       throw new IOException("Cannot delete " + files[i]);
     }
     }  
   public String[] list() throws IOException {
  FileStatus[] fstats = fs.listStatus(directory, HadoopFSUtil.getPassAllFilter());
      Path[] files = HadoopFSUtil.getPaths(fstats);
       if (files == null) return null;
    
        String[] result = new String[files.length];
       for (int i = 0; i < files.length; i++) {
         result[i] = files[i].getName();
        }
         return result;
   }
   
    public boolean fileExists(String name) throws IOException {
      return fs.exists(new Path(directory, name));
      } 
      public long fileModified(String name) {
       throw new UnsupportedOperationException();
     }  
      public void touchFile(String name) {
      throw new UnsupportedOperationException();
     }  
      public long fileLength(String name) throws IOException {
        return fs.getFileStatus(new Path(directory, name)).getLen();
     }
  
    public void deleteFile(String name) throws IOException {
     if (!fs.delete(new Path(directory, name)false))
          throw new IOException("Cannot delete " + name);
      }
     public void renameFile(String from, String to) throws IOException {
       // DFS is currently broken when target already exists,
        // so we explicitly delete the target first.
        Path target = new Path(directory, to);
        if (fs.exists(target)) {
         fs.delete(target, false);
        }
        fs.rename(new Path(directory, from), target);
      }
    
      public IndexOutput createOutput(String name) throws IOException {
        Path file = new Path(directory, name);
       if (fs.exists(file) && !fs.delete(file, false))      // delete existing, if any
         throw new IOException("Cannot overwrite: " + file);
    
      return new DfsIndexOutput(file, this.ioFileBufferSize);
      }  
       public IndexInput openInput(String name) throws IOException {
      return new DfsIndexInput(new Path(directory, name)this.ioFileBufferSize);
       }  
       public Lock makeLock(final String name) {
       return new Lock() {
          public boolean obtain() {
          return true;
          }
          public void release() {
         }
         public boolean isLocked() {
            throw new UnsupportedOperationException();
          }
          public String toString() {
            return "Lock@" + new Path(directory, name);
          }
       };
    }
     public synchronized void close() throws IOException {
     fs.close();
     }
    public String toString() {
        return this.getClass().getName() + "@" + directory;
   }   
  private class DfsIndexInput extends BufferedIndexInput {
    /** Shared by clones. */
    private class Descriptor {
     public FSDataInputStream in;
      public long position;                       // cache of in.getPos()
         public Descriptor(Path file, int ioFileBufferSize) throws IOException {
          this.in = fs.open(file);
           }
         }
    
        private final Descriptor descriptor;
       private final long length;
       private boolean isClone;
    
       public DfsIndexInput(Path path, int ioFileBufferSize) throws IOException {
          descriptor = new Descriptor(path,ioFileBufferSize);
          length = fs.getFileStatus(path).getLen();
       }
    
        protected void readInternal(byte[] b, int offset, int len)
           throws IOException {
          synchronized (descriptor) {
             long position = getFilePointer();
           if (position != descriptor.position) {
           descriptor.in.seek(position);
              descriptor.position = position;
          }
             int total = 0;
           do {
              int i = descriptor.in.read(b, offset+total, len-total);
             if (== -1)
               throw new IOException("read past EOF");
              descriptor.position += i;
             total += i;
            } while (total < len);
          }
         }
    
         public void close() throws IOException {
          if (!isClone) {
             descriptor.in.close();
          }
         }
   
    protected void seekInternal(long position) {} // handled in readInternal()
  
         public long length() {
          return length;
       }
     
        protected void finalize() throws IOException {
          close();                                      // close the file
        }
     
        public Object clone() {
          DfsIndexInput clone = (DfsIndexInput)super.clone();
          clone.isClone = true;
           return clone;
         }
     }
   
     private class DfsIndexOutput extends BufferedIndexOutput {
        private FSDataOutputStream out;
        private RandomAccessFile local;
        private File localFile;
    public DfsIndexOutput(Path path, int ioFileBufferSize) throws IOException {
          
          // create a temporary local file and set it to delete on exit
         String randStr = Integer.toString(new Random().nextInt(Integer.MAX_VALUE));
          localFile = File.createTempFile("index_" + randStr, ".tmp");
           localFile.deleteOnExit();
           local = new RandomAccessFile(localFile, "rw");
    
          out = fs.create(path);
        }
     
        public void flushBuffer(byte[] b, int offset, int size) throws IOException {
           local.write(b, offset, size);
         }
     
       public void close() throws IOException {
       super.close();
          
           // transfer to dfs from local
           byte[] buffer = new byte[4096];
           local.seek(0);
           int read = -1;
           while ((read = local.read(buffer)) != -1) {
             out.write(buffer, 0, read);
           }
           out.close();
           local.close();
       }
    
        public void seek(long pos) throws IOException {
           super.seek(pos);
           local.seek(pos);
         }
     
         public long length() throws IOException {
          return local.length();
        }  
      }
    }







以上是关于HDFS与Lucene的主要内容,如果未能解决你的问题,请参考以下文章

lucene的介绍与优化

HDFS原理与实操

HDFS 实践 | 快手 EB 级 HDFS 挑战与实践

快手EB级HDFS挑战与实践

HDFS Committer:HDFS的发展与挑战

HDFS原理 | 一文读懂HDFS架构与设计