📄 archiveindexer.java
字号:
/**
* $Revision$
* $Date$
*
* Copyright (C) 2008 Jive Software. All rights reserved.
*
* This software is published under the terms of the GNU Public License (GPL),
* a copy of which is included in this distribution, or a commercial license
* agreement with Jive.
*/
package org.jivesoftware.openfire.archive;
import org.jivesoftware.openfire.reporting.util.TaskEngine;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.DateTools;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexModifier;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.dom4j.DocumentFactory;
import org.dom4j.io.OutputFormat;
import org.dom4j.io.XMLWriter;
import org.jivesoftware.database.DbConnectionManager;
import org.jivesoftware.util.JiveConstants;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.XMLProperties;
import org.picocontainer.Startable;
import org.xmpp.packet.JID;
import java.io.*;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Indexes archived conversations. If conversation archiving is not enabled,
* this class does nothing. The search index is maintained in the <tt>monitoring/search</tt>
* directory of the Openfire home directory. It's automatically updated with the latest
* conversation content as long as conversation archiving is enabled. The index update
* interval is controllec by the Jive property "conversation.search.updateInterval" and
* the default value is 15 minutes.
*
* @see ArchiveSearcher
* @author Matt Tucker
*/
public class ArchiveIndexer implements Startable {
private static final String ALL_CONVERSATIONS =
"SELECT conversationID, isExternal FROM ofConversation";
private static final String NEW_CONVERSATIONS =
"SELECT DISTINCT conversationID FROM ofMessageArchive WHERE sentDate > ?";
private static final String CONVERSATION_METADATA =
"SELECT isExternal FROM ofConversation WHERE conversationID=?";
private static final String CONVERSATION_MESSAGES =
"SELECT conversationID, sentDate, fromJID, toJID, body FROM ofMessageArchive " +
"WHERE conversationID IN ? ORDER BY conversationID";
private File searchDir;
private TaskEngine taskEngine;
private ConversationManager conversationManager;
private XMLProperties indexProperties;
private Directory directory;
private IndexSearcher searcher;
private Lock writerLock;
private boolean stopped = false;
private boolean rebuildInProgress = false;
private RebuildFuture rebuildFuture;
private long lastModified = 0;
private TimerTask indexUpdater;
/**
* Constructs a new archive indexer.
*
* @param conversationManager a ConversationManager instance.
* @param taskEngine a task engine instance.
*/
public ArchiveIndexer(ConversationManager conversationManager, TaskEngine taskEngine) {
this.conversationManager = conversationManager;
this.taskEngine = taskEngine;
}
public void start() {
searchDir = new File(JiveGlobals.getHomeDirectory() +
File.separator + "monitoring" + File.separator + "search");
if (!searchDir.exists()) {
searchDir.mkdirs();
}
boolean indexCreated = false;
try {
loadPropertiesFile(searchDir);
// If the index already exists, use it.
if (IndexReader.indexExists(searchDir)) {
directory = FSDirectory.getDirectory(searchDir, false);
}
// Otherwise, create a new index.
else {
directory = FSDirectory.getDirectory(searchDir, true);
indexCreated = true;
}
}
catch (IOException ioe) {
Log.error(ioe);
}
writerLock = new ReentrantLock(true);
// Force the directory unlocked if it's locked (due to non-clean app shut-down,
// for example).
try {
if (IndexReader.isLocked(directory)) {
Log.warn("Archiving search index was locked, probably due to non-clean " +
"application shutdown.");
IndexReader.unlock(directory);
}
}
catch (IOException ioe) {
Log.error(ioe);
}
String modified = indexProperties.getProperty("lastModified");
if (modified != null) {
try {
lastModified = Long.parseLong(modified);
}
catch (NumberFormatException nfe) {
// Ignore.
}
}
// If the index has never been updated, build it from scratch.
if (lastModified == 0 || indexCreated) {
taskEngine.submit(new Runnable() {
public void run() {
rebuildIndex();
}
});
}
indexUpdater = new TimerTask() {
public void run() {
updateIndex();
}
};
int updateInterval = JiveGlobals.getIntProperty("conversation.search.updateInterval", 15);
taskEngine.scheduleAtFixedRate(indexUpdater, JiveConstants.MINUTE * 5,
JiveConstants.MINUTE * updateInterval);
}
public void stop() {
stopped = true;
indexUpdater.cancel();
if (searcher != null) {
try {
searcher.close();
}
catch (Exception e) {
Log.error(e);
}
searcher = null;
}
try {
directory.close();
}
catch (Exception e) {
Log.error(e);
}
directory = null;
indexProperties = null;
conversationManager = null;
searchDir = null;
rebuildFuture = null;
}
/**
* Returns the total size of the search index (in bytes).
*
* @return the total size of the search index (in bytes).
*/
public long getIndexSize() {
File [] files = searchDir.listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
// Ignore the index properties file since it's not part of the index.
return !name.equals("indexprops.xml");
}
});
if (files == null) {
// Search folder does not exist so size of index is 0
return 0;
}
long size = 0;
for (File file : files) {
size += file.length();
}
return size;
}
/**
* Updates the search index with all new conversation data since the last index update.
*/
public void updateIndex() {
// Immediately return if the service has been stopped.
if (stopped) {
return;
}
// Do nothing if archiving is disabled.
if (!conversationManager.isArchivingEnabled()) {
return;
}
// If we're currently rebuilding the index, return.
if (rebuildInProgress) {
return;
}
writerLock.lock();
IndexModifier writer = null;
try {
writer = new IndexModifier(directory, new StandardAnalyzer(), false);
List<Long> conversationIDs = new ArrayList<Long>();
Connection con = null;
PreparedStatement pstmt = null;
ResultSet rs = null;
try {
con = DbConnectionManager.getConnection();
pstmt = con.prepareStatement(NEW_CONVERSATIONS);
pstmt.setLong(1, lastModified);
rs = pstmt.executeQuery();
while (rs.next()) {
conversationIDs.add(rs.getLong(1));
}
}
catch (SQLException sqle) {
Log.error(sqle);
}
finally {
DbConnectionManager.closeConnection(rs, pstmt, con);
}
// Delete any conversations found -- they may have already been indexed, but
// updated since then.
for (long conversationID : conversationIDs) {
writer.deleteDocuments(new Term("conversationID", Long.toString(conversationID)));
}
// Load meta-data for each conversation.
Map<Long, Boolean> externalMetaData = new HashMap<Long, Boolean>();
for (long conversationID : conversationIDs) {
try {
con = DbConnectionManager.getConnection();
pstmt = con.prepareStatement(CONVERSATION_METADATA);
pstmt.setLong(1, conversationID);
rs = pstmt.executeQuery();
while (rs.next()) {
externalMetaData.put(conversationID, rs.getInt(1) == 1);
}
}
catch (SQLException sqle) {
Log.error(sqle);
}
finally {
DbConnectionManager.closeConnection(rs, pstmt, con);
}
}
// Now index all the new conversations.
long newestDate = indexConversations(conversationIDs, externalMetaData, writer, false);
writer.optimize();
// Done indexing so store a last modified date.
if (newestDate != -1) {
lastModified = newestDate;
indexProperties.setProperty("lastModified", Long.toString(lastModified));
}
}
catch (IOException ioe) {
Log.error(ioe);
}
finally {
if (writer != null) {
try {
writer.close();
}
catch (Exception e) {
Log.error(e);
}
}
writerLock.unlock();
}
}
/**
* Rebuilds the search index with all archived conversation data. This method returns
* a Future that represents the status of the index rebuild process (also available
* via {@link #getIndexRebuildProgress()}). The integer value
* (values 0 through 100) represents the percentage of work done. If message archiving
* is disabled, this method will return <tt>null</tt>.
*
* @return a Future to indicate the status of rebuilding the index or <tt>null</tt> if
* rebuilding the index is not possible.
*/
public synchronized Future<Integer> rebuildIndex() {
// Immediately return if the service has been stopped.
if (stopped) {
return null;
}
// If a rebuild is already happening, return.
if (rebuildInProgress) {
return null;
}
rebuildInProgress = true;
// Do nothing if archiving is disabled.
if (!conversationManager.isArchivingEnabled()) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -