📄 gdataindexer.java
字号:
/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.lucene.gdata.search.index;import java.io.IOException;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;import java.util.concurrent.atomic.AtomicBoolean;import java.util.concurrent.atomic.AtomicInteger;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.lucene.gdata.search.config.IndexSchema;import org.apache.lucene.index.IndexReader;import org.apache.lucene.index.IndexWriter;import org.apache.lucene.index.TermDocs;import org.apache.lucene.search.IndexSearcher;import org.apache.lucene.store.Directory;/** * A GDataIndexer encapsulates every writing access to the search index. * <p> * Insert, updates and deletes to the index happens inside this class. All * modification will be base on an instance of * {@link org.apache.lucene.gdata.search.index.IndexDocument} which contains all * informations and command for the indexer.<br> * Although this class provides methods to add, remove and update document in * the index all <tt>IndexDocument</tt> instances should be added to the task * queue via the {@link GDataIndexer#addIndexableDocumentTask(Future)} method. * Inside this class runs an instance of * {@link org.apache.lucene.gdata.search.index.IndexTask} listening on this * queue. The analysis of the actual documents happens inside the * {@link com.sun.corba.se.impl.orbutil.closure.Future} object added to the * queue. This enables the indexer to do his actual work. Documents will be * build / analyzed concurrently while already finished tasks can be added to * the index. * </p> * * * * @author Simon Willnauer */public class GDataIndexer { private static final Log LOG = LogFactory.getLog(GDataIndexer.class); protected IndexWriter writer; protected IndexSearcher searcher; protected AtomicInteger committed = new AtomicInteger(0); protected AtomicInteger optimized = new AtomicInteger(0); private AtomicBoolean isDestroyed = new AtomicBoolean(false); protected AtomicInteger docsAdded = new AtomicInteger(); protected AtomicInteger docsUpdated = new AtomicInteger(); protected AtomicInteger docsDeleted = new AtomicInteger(); private final Directory dir; private final List<IndexEventListener> listeners = new ArrayList<IndexEventListener>(); protected final BlockingQueue<Future<IndexDocument>> futurQueue = new LinkedBlockingQueue<Future<IndexDocument>>( 100); private final IndexSchema serviceConfiguration; private final ExecutorService indexTaskExecutor; protected IndexTask indexTask; private static final Integer ZERO = new Integer(0); private static final Integer ONE = new Integer(1); private final Map<IndexDocument, Integer> action; protected GDataIndexer(final IndexSchema schema, Directory dir, boolean create) throws IOException { if (schema == null) throw new IllegalArgumentException( "IndexServiceConfiguration must not be null"); if (dir == null) throw new IllegalArgumentException( "IndexDirectory must not be null"); this.serviceConfiguration = schema; this.dir = dir; openWriter(create); this.indexTaskExecutor = Executors.newSingleThreadExecutor(); this.action = new HashMap<IndexDocument, Integer>(128); } protected void setIndexTask(final IndexTask task) { if (task != null && this.indexTask == null) this.indexTask = task; } protected void init() { if (this.indexTask == null) this.indexTask = new IndexTask(this, this.futurQueue); this.indexTaskExecutor.execute(this.indexTask); } /** * Adds the given future task to the queue, and waits if the queue is full. * The queue size is set to 100 by default. * * @param task - * the task to be scheduled * @throws InterruptedException - * if the queue is interrupted */ public void addIndexableDocumentTask(final Future<IndexDocument> task) throws InterruptedException { if (this.isDestroyed.get()) throw new IllegalStateException( "Indexer has already been destroyed"); this.futurQueue.put(task); } /* * a added doc should not be in the index, be sure and delete possible * duplicates */ protected synchronized void addDocument(IndexDocument indexable) throws IOException { if (!indexable.isInsert()) throw new GdataIndexerException( "Index action must be set to insert"); setAction(indexable); doWrite(indexable); this.docsAdded.incrementAndGet(); } private void setAction(IndexDocument doc) { Integer docCountToKeep = this.action.get(doc); if (!doc.isDelete() && (docCountToKeep == null || docCountToKeep == 0)) { /* * add a ONE for ONE documents to keep for this IndexDocument when * doDelete. doDelete will keep the latest added document and * deletes all other documents for this IndexDocument e.g. all * duplicates */ this.action.put(doc, ONE); } else if (doc.isDelete() && (docCountToKeep == null || docCountToKeep > 0)) { /* * add a zero for zero documents to keep for this IndexDocument when * doDelete */ this.action.put(doc, ZERO); } } protected synchronized void updateDocument(IndexDocument indexable) throws IOException { if (!indexable.isUpdate()) throw new GdataIndexerException( "Index action must be set to update"); setAction(indexable); doWrite(indexable); this.docsUpdated.incrementAndGet(); } protected synchronized void deleteDocument(IndexDocument indexable) { if (!indexable.isDelete()) throw new GdataIndexerException( "Index action must be set to delete"); setAction(indexable); this.docsDeleted.incrementAndGet(); } /** * This method commits all changes to the index and closes all open * resources (e.g. IndexWriter and IndexReader). This method notifies all * registered Commit listeners if invoked. * * @param optimize - * <code>true</code> if the index should be optimized on this * commit * @throws IOException - * if an IOException occurs */ protected synchronized void commit(boolean optimize) throws IOException { if (LOG.isInfoEnabled()) LOG.info("Commit called with optimize = " + optimize); int changes = this.docsAdded.intValue() + this.docsDeleted.intValue() + this.docsUpdated.intValue(); /* * don't call listeners to prevent unnecessary close / open of searchers */ if (changes == 0) return; this.committed.incrementAndGet(); if(optimize) this.optimized.incrementAndGet(); doDeltete(); if (optimize) { closeSearcher(); openWriter(); this.writer.optimize(); } closeSearcher(); closeWriter(); this.docsAdded.set(0); this.docsDeleted.set(0); this.docsUpdated.set(0); notifyCommitListeners(this.serviceConfiguration.getName()); } /** * Registers a new IndexEventListener. All registered listeners will be * notified if the index has been committed. * * @param listener - * the listener to register * */ public void registerIndexEventListener(IndexEventListener listener) { if (listener == null || this.listeners.contains(listener)) return; this.listeners.add(listener);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -