⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 archiveindexer.java

📁 openfire 服务器源码下载
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/**
 * $Revision$
 * $Date$
 *
 * Copyright (C) 2008 Jive Software. All rights reserved.
 *
 * This software is published under the terms of the GNU Public License (GPL),
 * a copy of which is included in this distribution, or a commercial license
 * agreement with Jive.
 */

package org.jivesoftware.openfire.archive;

import org.jivesoftware.openfire.reporting.util.TaskEngine;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.DateTools;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexModifier;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.dom4j.DocumentFactory;
import org.dom4j.io.OutputFormat;
import org.dom4j.io.XMLWriter;
import org.jivesoftware.database.DbConnectionManager;
import org.jivesoftware.util.JiveConstants;
import org.jivesoftware.util.JiveGlobals;
import org.jivesoftware.util.Log;
import org.jivesoftware.util.XMLProperties;
import org.picocontainer.Startable;
import org.xmpp.packet.JID;

import java.io.*;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Indexes archived conversations. If conversation archiving is not enabled,
 * this class does nothing. The search index is maintained in the <tt>monitoring/search</tt>
 * directory of the Openfire home directory. It's automatically updated with the latest
 * conversation content as long as conversation archiving is enabled. The index update
 * interval is controllec by the Jive property "conversation.search.updateInterval" and
 * the default value is 15 minutes.
 *
 * @see ArchiveSearcher
 * @author Matt Tucker
 */
public class ArchiveIndexer implements Startable {

    private static final String ALL_CONVERSATIONS =
            "SELECT conversationID, isExternal FROM ofConversation";
    private static final String NEW_CONVERSATIONS =
            "SELECT DISTINCT conversationID FROM ofMessageArchive WHERE sentDate > ?";
    private static final String CONVERSATION_METADATA =
            "SELECT isExternal FROM ofConversation WHERE conversationID=?";
    private static final String CONVERSATION_MESSAGES =
            "SELECT conversationID, sentDate, fromJID, toJID, body FROM ofMessageArchive " +
            "WHERE conversationID IN ? ORDER BY conversationID";

    private File searchDir;
    private TaskEngine taskEngine;
    private ConversationManager conversationManager;
    private XMLProperties indexProperties;
    private Directory directory;
    private IndexSearcher searcher;
    private Lock writerLock;
    private boolean stopped = false;

    private boolean rebuildInProgress = false;
    private RebuildFuture rebuildFuture;

    private long lastModified = 0;

    private TimerTask indexUpdater;

    /**
     * Constructs a new archive indexer.
     *
     * @param conversationManager a ConversationManager instance.
     * @param taskEngine a task engine instance.
     */
    public ArchiveIndexer(ConversationManager conversationManager, TaskEngine taskEngine) {
        this.conversationManager = conversationManager;
        this.taskEngine = taskEngine;
    }

    public void start() {
        searchDir = new File(JiveGlobals.getHomeDirectory() +
                    File.separator + "monitoring" + File.separator + "search");
        if (!searchDir.exists()) {
            searchDir.mkdirs();
        }
        boolean indexCreated = false;
        try {
            loadPropertiesFile(searchDir);
            // If the index already exists, use it.
            if (IndexReader.indexExists(searchDir)) {
                directory = FSDirectory.getDirectory(searchDir, false);
            }
            // Otherwise, create a new index.
            else {
                directory = FSDirectory.getDirectory(searchDir, true);
                indexCreated = true;
            }
        }
        catch (IOException ioe) {
            Log.error(ioe);
        }
        writerLock = new ReentrantLock(true);

        // Force the directory unlocked if it's locked (due to non-clean app shut-down,
        // for example).
        try {
            if (IndexReader.isLocked(directory)) {
                Log.warn("Archiving search index was locked, probably due to non-clean " +
                        "application shutdown.");
                IndexReader.unlock(directory);
            }
        }
        catch (IOException ioe) {
            Log.error(ioe);
        }

        String modified = indexProperties.getProperty("lastModified");
        if (modified != null) {
            try {
                lastModified = Long.parseLong(modified);
            }
            catch (NumberFormatException nfe) {
                // Ignore.
            }
        }
        // If the index has never been updated, build it from scratch.
        if (lastModified == 0 || indexCreated) {
            taskEngine.submit(new Runnable() {
                public void run() {
                    rebuildIndex();
                }
            });
        }

        indexUpdater = new TimerTask() {
            public void run() {
                updateIndex();
            }
        };
        int updateInterval = JiveGlobals.getIntProperty("conversation.search.updateInterval", 15);
        taskEngine.scheduleAtFixedRate(indexUpdater, JiveConstants.MINUTE * 5,
                JiveConstants.MINUTE * updateInterval);
    }

    public void stop() {
        stopped = true;
        indexUpdater.cancel();
        if (searcher != null) {
            try {
                searcher.close();
            }
            catch (Exception e) {
                Log.error(e);
            }
            searcher = null;
        }
        try {
            directory.close();
        }
        catch (Exception e) {
            Log.error(e);
        }
        directory = null;
        indexProperties = null;
        conversationManager = null;
        searchDir = null;
        rebuildFuture = null;
    }

    /**
     * Returns the total size of the search index (in bytes).
     *
     * @return the total size of the search index (in bytes).
     */
    public long getIndexSize() {
        File [] files = searchDir.listFiles(new FilenameFilter() {
            public boolean accept(File dir, String name) {
                // Ignore the index properties file since it's not part of the index.
                return !name.equals("indexprops.xml");
            }
        });
        if (files == null) {
            // Search folder does not exist so size of index is 0
            return 0;
        }
        long size = 0;
        for (File file : files) {
            size += file.length();
        }
        return size;
    }

    /**
     * Updates the search index with all new conversation data since the last index update.
     */
    public void updateIndex() {
        // Immediately return if the service has been stopped.
        if (stopped) {
            return;
        }
        // Do nothing if archiving is disabled.
        if (!conversationManager.isArchivingEnabled()) {
            return;
        }
        // If we're currently rebuilding the index, return.
        if (rebuildInProgress) {
            return;
        }
        writerLock.lock();
        IndexModifier writer = null;
        try {
            writer = new IndexModifier(directory, new StandardAnalyzer(), false);
            List<Long> conversationIDs = new ArrayList<Long>();
            Connection con = null;
            PreparedStatement pstmt = null;
            ResultSet rs = null;
            try {
                con = DbConnectionManager.getConnection();
                pstmt = con.prepareStatement(NEW_CONVERSATIONS);
                pstmt.setLong(1, lastModified);
                rs = pstmt.executeQuery();
                while (rs.next()) {
                    conversationIDs.add(rs.getLong(1));
                }
            }
            catch (SQLException sqle) {
                Log.error(sqle);
            }
            finally {
                DbConnectionManager.closeConnection(rs, pstmt, con);
            }

            // Delete any conversations found -- they may have already been indexed, but
            // updated since then.
            for (long conversationID : conversationIDs) {
                writer.deleteDocuments(new Term("conversationID", Long.toString(conversationID)));
            }

            // Load meta-data for each conversation.
            Map<Long, Boolean> externalMetaData = new HashMap<Long, Boolean>();
            for (long conversationID : conversationIDs) {
                try {
                    con = DbConnectionManager.getConnection();
                    pstmt = con.prepareStatement(CONVERSATION_METADATA);
                    pstmt.setLong(1, conversationID);
                    rs = pstmt.executeQuery();
                    while (rs.next()) {
                        externalMetaData.put(conversationID, rs.getInt(1) == 1);
                    }
                }
                catch (SQLException sqle) {
                    Log.error(sqle);
                }
                finally {
                    DbConnectionManager.closeConnection(rs, pstmt, con);
                }
            }

            // Now index all the new conversations.
            long newestDate = indexConversations(conversationIDs, externalMetaData, writer, false);

            writer.optimize();

            // Done indexing so store a last modified date.
            if (newestDate != -1) {
                lastModified = newestDate;
                indexProperties.setProperty("lastModified", Long.toString(lastModified));
            }
        }
        catch (IOException ioe) {
            Log.error(ioe);
        }
        finally {
            if (writer != null) {
                try {
                    writer.close();
                }
                catch (Exception e) {
                    Log.error(e);
                }
            }
            writerLock.unlock();
        }
    }

    /**
     * Rebuilds the search index with all archived conversation data. This method returns
     * a Future that represents the status of the index rebuild process (also available
     * via {@link #getIndexRebuildProgress()}). The integer value
     * (values 0 through 100) represents the percentage of work done. If message archiving
     * is disabled, this method will return <tt>null</tt>.
     *
     * @return a Future to indicate the status of rebuilding the index or <tt>null</tt> if
     *      rebuilding the index is not possible.
     */
    public synchronized Future<Integer> rebuildIndex() {
        // Immediately return if the service has been stopped.
        if (stopped) {
            return null;
        }
        // If a rebuild is already happening, return.
        if (rebuildInProgress) {
            return null;
        }
        rebuildInProgress = true;
        // Do nothing if archiving is disabled.
        if (!conversationManager.isArchivingEnabled()) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -