⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 srdiindex.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/* *  Copyright (c) 2001 Sun Microsystems, Inc.  All rights reserved. * *  Redistribution and use in source and binary forms, with or without *  modification, are permitted provided that the following conditions *  are met: * *  1. Redistributions of source code must retain the above copyright *  notice, this list of conditions and the following disclaimer. * *  2. Redistributions in binary form must reproduce the above copyright *  notice, this list of conditions and the following disclaimer in *  the documentation and/or other materials provided with the *  distribution. * *  3. The end-user documentation included with the redistribution, *  if any, must include the following acknowledgment: *  "This product includes software developed by the *  Sun Microsystems, Inc. for Project JXTA." *  Alternately, this acknowledgment may appear in the software itself, *  if and wherever such third-party acknowledgments normally appear. * *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" *  must not be used to endorse or promote products derived from this *  software without prior written permission. For written *  permission, please contact Project JXTA at http://www.jxta.org. * *  5. Products derived from this software may not be called "JXTA", *  nor may "JXTA" appear in their name, without prior written *  permission of Sun. * *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED *  WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES *  OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE *  DISCLAIMED.  IN NO EVENT SHALL SUN MICROSYSTEMS OR *  ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF *  USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND *  ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, *  OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT *  OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF *  SUCH DAMAGE. *  ==================================================================== * *  This software consists of voluntary contributions made by many *  individuals on behalf of Project JXTA.  For more *  information on Project JXTA, please see *  <http://www.jxta.org/>. * *  This license is based on the BSD license adopted by the Apache Foundation. * *  $Id: SrdiIndex.java,v 1.49 2006/05/02 17:38:23 hamada Exp $ */package net.jxta.impl.cm;import java.io.File;import java.io.InputStream;import java.io.DataInputStream;import java.io.DataOutputStream;import java.io.ByteArrayOutputStream;import java.net.URI;import java.util.HashMap;import java.util.HashSet;import java.util.Set;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Vector;import java.util.ArrayList;import java.io.IOException;import java.io.EOFException;import java.lang.reflect.UndeclaredThrowableException;import java.net.URISyntaxException;import org.apache.log4j.Level;import org.apache.log4j.Logger;import net.jxta.id.IDFactory;import net.jxta.peer.PeerID;import net.jxta.peergroup.PeerGroup;import net.jxta.impl.util.TimeUtils;import net.jxta.impl.xindice.core.data.Key;import net.jxta.impl.xindice.core.data.Value;import net.jxta.impl.xindice.core.data.Record;import net.jxta.impl.xindice.core.DBException;import net.jxta.impl.xindice.core.filer.BTreeFiler;import net.jxta.impl.xindice.core.filer.BTreeCallback;import net.jxta.impl.xindice.core.indexer.IndexQuery;import net.jxta.impl.xindice.core.indexer.NameIndexer;/** *  SrdiIndex */public class SrdiIndex implements Runnable {    /**     *  Log4J Logger     */    private final static Logger LOG = Logger.getLogger(SrdiIndex.class.getName());    private long interval = 1000 * 60 * 10;    private volatile boolean stop = false;    private Indexer srdiIndexer = null;    private BTreeFiler cacheDB = null;    private Thread gcThread = null;    private Set gcPeerTBL = new HashSet();    private final String indexName;        /**     *  Constructor for the SrdiIndex     *     * @param  group group     * @param indexName     */    public SrdiIndex(PeerGroup group, String indexName) {        this.indexName = indexName;        try {            String pgdir = null;            File storeHome;            if (group == null) {                pgdir = "srdi-index";                storeHome = new File(".jxta");            } else {                pgdir = group.getPeerGroupID().getUniqueValue().toString();                storeHome = new File(group.getStoreHome());            }            File rootDir = new File(new File(storeHome, "cm"), pgdir);            rootDir = new File(rootDir, "srdi");            if (!rootDir.exists()) {                // We need to create the directory                if (!rootDir.mkdirs()) {                    throw new RuntimeException("Cm cannot create directory " + rootDir);                }            }            // peerid database            // Storage            cacheDB = new BTreeFiler();            // lazy checkpoint            cacheDB.setSync(false);            cacheDB.setLocation(rootDir.getCanonicalPath(), indexName);            if (!cacheDB.open()) {                cacheDB.create();                // now open it                cacheDB.open();            }            // index            srdiIndexer = new Indexer(false);            srdiIndexer.setLocation(rootDir.getCanonicalPath(), indexName);            if (!srdiIndexer.open()) {                srdiIndexer.create();                // now open it                srdiIndexer.open();            }            if (LOG.isEnabledFor(Level.INFO)) {                LOG.info("[" + ((group == null) ? "none" : group.getPeerGroupName()) + "] : " + "Initialized " + indexName);            }        } catch (DBException de) {            if (LOG.isEnabledFor(Level.FATAL)) {                LOG.fatal("Unable to Initialize databases", de);            }            throw new UndeclaredThrowableException(de, "Unable to Initialize databases");        } catch (Throwable e) {            if (LOG.isEnabledFor(Level.ERROR)) {                LOG.error("Unable to create Cm", e);            }            if (e instanceof Error) {                throw (Error) e;            } else if (e instanceof RuntimeException) {                throw (RuntimeException) e;            } else {                throw new UndeclaredThrowableException(e, "Unable to create Cm");            }        }    }    /**     *  Construct a SrdiIndex and starts a GC thread which runs every "interval"     *  milliseconds     *     * @param  interval   the interval at which the gc will run in milliseconds     * @param  group      group context     * @param  indexName  SrdiIndex name     */    public SrdiIndex(PeerGroup group, String indexName, long interval) {        this(group, indexName);        this.interval = interval;        startGC(group, indexName, interval);    }    /** Start the GC thread */    protected synchronized void startGC(PeerGroup group, String indexName, long interval) {        if (LOG.isEnabledFor(Level.INFO)) {            LOG.info("[" + ((group == null) ? "none" : group.getPeerGroupName()) + "] : Starting SRDI GC Thread for " + indexName);        }        gcThread = new Thread(group.getHomeThreadGroup(), this, "SrdiIndex GC :" + indexName + " every " + interval + "ms");        gcThread.setDaemon(true);        gcThread.start();    }    /**     *  Returns the name of this srdi index.     *     *  @return index name.     */    public String getIndexName() {        return indexName;    }    /**     *  add an index entry     *     *@param  primaryKey  primary key     *@param  attribute   Attribute String to query on     *@param  value       value of the attribute string     *@param  expiration  expiration associated with this entry relative time in     *      milliseconds     *@param  pid         peerid reference     */    public synchronized void add(String primaryKey,            String attribute,            String value,            PeerID pid,            long expiration) {        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("[" + indexName + "] Adding " + primaryKey + "/" + attribute + " = '" + value + "' for " + pid);        }        try {            Key key = new Key(primaryKey + attribute + value);            long expiresin = TimeUtils.toAbsoluteTimeMillis(expiration);            // update the record if it exists            synchronized(cacheDB) {                // FIXME hamada 10/14/04 it is possible a peer re-appears with                // a different set of indexes since it's been marked for garbage                // collection.  will address this issue in a subsequent patch                gcPeerTBL.remove(pid);                                Record record = cacheDB.readRecord(key);                ArrayList old;                if (record != null) {                    old = readRecord(record).list;                } else {                    old = new ArrayList();                }                Entry entry = new Entry(pid, expiresin);                if (!old.contains(entry)) {                    old.add(entry);                } else {                    // entry exists, replace it (effectively updating expiration)                    old.remove(old.indexOf(entry));                    old.add(entry);                }                // no sense in keeping expired entries.                old = removeExpired(old);                long t0 = System.currentTimeMillis();                byte[] data = getData(key, old);                // if (LOG.isEnabledFor(Level.DEBUG)) {                // LOG.debug("Serialized result in : " + (System.currentTimeMillis() - t0) + "ms.");                // }                if (data == null) {                    if (LOG.isEnabledFor(Level.ERROR)) {                        LOG.error("Failed to serialize data");                    }                    return;                }                Value recordValue = new Value(data);                long pos = cacheDB.writeRecord(key, recordValue);                Map indexables = getIndexMap(primaryKey + attribute, value);                srdiIndexer.addToIndex(indexables, pos);            }        } catch (IOException de) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("Failed to add SRDI", de);            }        } catch (DBException de) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("Failed to add SRDI", de);            }        }    }    /**     *  retrieves a record     *     *@param  pkey  primary key     *@param  skey  secondary key     *@param  value       value     *@return             List of Entry objects     */    public List getRecord(String pkey, String skey, String value) {        Record record = null;        try {            Key key = new Key(pkey + skey + value);            synchronized (cacheDB) {                record = cacheDB.readRecord(key);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -