当前位置: 代码迷 >> 综合 >> Lucene4.10使用教程(十一):Lucene的近实时搜索
  详细解决方案

Lucene4.10使用教程(十一):Lucene的近实时搜索

热度:41   发布时间:2024-01-21 00:30:38.0

近实时搜索(Near real time search)表示当索引数据发生变化时,Lucene能及时把这种变化反映到客户进行查询的结果中。

实现原理:
只有Index Writer上的commit操作才会导致ram directory上的数据完全同步到文件。Index Writer提供了实时获得reader的API,这个调用将导致flush操作,生成新的segment,但不会commit(fsync),从而减少 了IO。新的segment被加入到新生成的

reader里。从返回的reader里,可以看到更新。所以,只要每次新的搜索都从IndexWriter获得一个新的reader,就可以搜索到最新的内容。这一操作的开销仅仅是flush,相对commit来说,开销很小。

Lucene的index组织方式为一个index目录下的多个segment。新的doc会加入新的segment里,这些新的小segment每隔一段时间就合并起来。因为合并,总的segment数

量保持的较小,总体search速度仍然很快。为了防止读写冲突,lucene只创建新的segment,并在任何active的reader不在使用后删除掉老的segment。

flush是把数据写入到操作系统的缓冲区,只要缓冲区不满,就不会有硬盘操作,相对轻量级操作。

commit是把所有内存缓冲区的数据写入到硬盘,是完全的硬盘操作,重量级操作。

package com.johnny.lucene05.lucene_plugin.nrt;

import java.io.File;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.IntField;
import org.apache.lucene.document.LongField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TrackingIndexWriter;
import org.apache.lucene.search.ControlledRealTimeReopenThread;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.Version;

/**
 * NRT(near real time)近实时搜索测试
 * @author Johnny
 *
 */
public class NearRealTimeSearch {
    private static Version Lucene_Version = Version.LUCENE_4_10_2;
    private String[] ids = {"1","2","3","4","5","6"};
    private String[] emails = {"aa@itat.org","bb@itat.org","cc@cc.org","dd@sina.org","ee@zttc.edu","ff@itat.org"};
    private String[] contents = {
            "welcome to visited the space,I like book",
            "hello boy, I like pingpeng ball",
            "my name is cc I like game",
            "I like football",
            "I like football and I like basketball too",
            "I like movie and swim"
    };
    private Date[] dates = null;
    private int[] attachs = {2,3,1,4,5,5};
    private String[] names = {"zhangsan","lisi","john","jetty","mike","jake"};
    private Directory directory = null;
    IndexWriter writer = null;
    
    /**nrt init**/
    private TrackingIndexWriter trackingIndexWriter = null;
    private ReferenceManager<IndexSearcher> reMgr = null;//类似于Lucene3.x中的NrtManager
    private ControlledRealTimeReopenThread<IndexSearcher> crt = null;
    
    private void setDates() {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        try {
            dates = new Date[ids.length];
            dates[0] = sdf.parse("2010-02-19");
            dates[1] = sdf.parse("2012-01-11");
            dates[2] = sdf.parse("2011-09-19");
            dates[3] = sdf.parse("2010-12-22");
            dates[4] = sdf.parse("2012-01-01");
            dates[5] = sdf.parse("2011-05-19");
        } catch (ParseException e) {
            e.printStackTrace();
        }
    }
    //数据初始化
    public NearRealTimeSearch(){
        setDates();
        
        try{
            directory = FSDirectory.open(new File("/Users/ChinaMWorld/Desktop/index/"));
            writer = new IndexWriter(directory, new IndexWriterConfig(Lucene_Version, new StandardAnalyzer()));
            
            trackingIndexWriter = new TrackingIndexWriter(writer);
            reMgr = new SearcherManager(writer, true, new SearcherFactory());
            /**在0.025s~5.0s之间重启一次线程,这个是时间的最佳实践**/
            crt = new ControlledRealTimeReopenThread<>(trackingIndexWriter, reMgr, 5.0,0.025);
            crt.setDaemon(true);//设置为后台服务
            crt.setName("Index update to disk");//线程名称
            crt.start();//线程启动
            
        }catch(Exception e){
            e.printStackTrace();
        }
        
    }
    /**
     * 定期提交内存中得索引到硬盘上,防止丢失
     */
    public void commit(){
        try {
            writer.commit();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    /**建立索引
     * 要实现search nrt,需要使用TrackIndexWriter保存document,同时Writer也不需要关闭。
     *
     * **/
    public void index(boolean isNew) {
        if(isNew){
            try {
                Document doc = null;
                for(int i=0;i<ids.length;i++) {
                    doc = new Document();
                    doc.add(new StringField("id",ids[i],Store.YES));
                    doc.add(new StringField("email", emails[i],Store.YES));
                    doc.add(new TextField("content", contents[i], Store.NO));
                    doc.add(new StringField("name",names[i], Store.YES));
                    //存储数字
                    doc.add(new IntField("attach",attachs[i], Store.YES));
                    //存储日期
                    doc.add(new LongField("date", dates[i].getTime(), Store.YES));
                    //使用trackingIndexWriter创建document
                    trackingIndexWriter.addDocument(doc);
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally{
                commit();//首次创建,提交索引,只有提交后,才会在索引片段中也将信息改变
            }    
        }
    }
    
    
    /***查询**/
    public void query() {
        IndexSearcher is = getSearcher();
        
        try {
            //通过reader可以有效的获取到文档的数量
            System.out.println("numDocs:"+is.getIndexReader().numDocs());
            System.out.println("maxDocs:"+is.getIndexReader().maxDoc());
            System.out.println("deleteDocs:"+is.getIndexReader().numDeletedDocs());
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            try {
                reMgr.release(is);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    /**删除
     * 使用trackIndexWriter进行数据删除,也不需要关闭Writer
     * **/
    public void delete(){
        try{
            trackingIndexWriter.deleteDocuments(new Term("id","2"));
        }catch(Exception e){
            e.printStackTrace();
        }
    }
    
    
    /**修改
     * 使用trackIndexWriter进行修改,不需要关闭writer
     * **/
    public void update(){
        try{
            Document doc = new Document();
            /*
             * Lucene并没有提供更新,这里的更新操作其实是如下两个操作的合集
             * 先删除之后再添加
             */
            doc.add(new StringField("id", "21", Store.YES));
            doc.add(new TextField("email", "aa.bb@s", Store.YES));
            doc.add(new TextField("content", "update content like", Store.NO));
            doc.add(new StringField("name", "jackson", Store.YES));
            trackingIndexWriter.updateDocument(new Term("id","1"), doc);
        }catch(Exception e){
            e.printStackTrace();
        }
    }
    /**使用单例获取IndexSearch **/
    public IndexSearcher getSearcher() {
        IndexSearcher is = null;
        try {
            if(is==null) {
                reMgr.maybeRefresh();//刷新reMgr,获取最新的IndexSearcher
                is = reMgr.acquire();
                
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        if(is == null){
            throw new RuntimeException("indexSearcher is null!!!!");
        }
        return is;
        
    }
    
    /** 查询
     *
     * 查询时search如果使用完成,需要将search释放会searchFactory中,使用reMgr。release(indexSearcher)进行释放
     * **/
    public void search(){
        IndexSearcher is = getSearcher();
        try {
            TermQuery query = new TermQuery(new Term("content","like"));
            TopDocs tds = is.search(query, 10);
            for(ScoreDoc sd:tds.scoreDocs){
                Document doc = is.doc(sd.doc);
                System.out.println(doc.get("id")+"---->"+
                        doc.get("name")+"["+doc.get("email")+"]-->"+doc.get("id")+","+
                        doc.get("attach")+","+doc.get("date")+"," +doc.getValues("email")[0]);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally{
            try {
                reMgr.release(is);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        
    }
    /**
     * 关闭初始化线程的处理
     */
    public void close(){
        //stop the re-open thread
        crt.interrupt();
        crt.close();
        //close the indexWriter,commit 所有有过修改的内容
        try {
            writer.commit();
            writer.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

测试方法:

package com.johnny.lucene05.lucene_plugin.nrt;

import org.junit.Before;
import org.junit.Test;

public class NearRealTimeSearchTest {
    private static NearRealTimeSearch ns =null;
    @Before
    public void init(){
        ns = new NearRealTimeSearch();
    }
    @Test
    public void testIndex(){
        ns.index(true);
    }
    
    @Test
    public void testSearch(){
        ns.query();
        for(int i=0;i<10;i++){
            ns.search();
            System.out.println(i+"--------------------");
            ns.delete();
            if(i==3){
                ns.update();
                ns.query();
            }
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        ns.commit();
        ns.query();
    }
    @Test
    public void close(){
        ns.close();
    }
    @Test
    public void testQuery(){
        ns.query();
    }
}


  相关解决方案