⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 srdiindex.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/* * Copyright (c) 2001-2007 Sun Microsystems, Inc.  All rights reserved. * *  The Sun Project JXTA(TM) Software License * *  Redistribution and use in source and binary forms, with or without *  modification, are permitted provided that the following conditions are met: * *  1. Redistributions of source code must retain the above copyright notice, *     this list of conditions and the following disclaimer. * *  2. Redistributions in binary form must reproduce the above copyright notice, *     this list of conditions and the following disclaimer in the documentation *     and/or other materials provided with the distribution. * *  3. The end-user documentation included with the redistribution, if any, must *     include the following acknowledgment: "This product includes software *     developed by Sun Microsystems, Inc. for JXTA(TM) technology." *     Alternately, this acknowledgment may appear in the software itself, if *     and wherever such third-party acknowledgments normally appear. * *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must *     not be used to endorse or promote products derived from this software *     without prior written permission. For written permission, please contact *     Project JXTA at http://www.jxta.org. * *  5. Products derived from this software may not be called "JXTA", nor may *     "JXTA" appear in their name, without prior written permission of Sun. * *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, *  INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND *  FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN *  MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, *  INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, *  OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF *  LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING *  NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, *  EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * *  JXTA is a registered trademark of Sun Microsystems, Inc. in the United *  States and other countries. * *  Please see the license information page at : *  <http://www.jxta.org/project/www/license.html> for instructions on use of *  the license in source files. * *  ==================================================================== * *  This software consists of voluntary contributions made by many individuals *  on behalf of Project JXTA. For more information on Project JXTA, please see *  http://www.jxta.org. * *  This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.impl.cm;import net.jxta.id.IDFactory;import net.jxta.impl.util.TimeUtils;import net.jxta.impl.xindice.core.DBException;import net.jxta.impl.xindice.core.data.Key;import net.jxta.impl.xindice.core.data.Record;import net.jxta.impl.xindice.core.data.Value;import net.jxta.impl.xindice.core.filer.BTreeCallback;import net.jxta.impl.xindice.core.filer.BTreeFiler;import net.jxta.impl.xindice.core.indexer.IndexQuery;import net.jxta.impl.xindice.core.indexer.NameIndexer;import net.jxta.logging.Logging;import net.jxta.peer.PeerID;import net.jxta.peergroup.PeerGroup;import java.io.ByteArrayOutputStream;import java.io.DataInputStream;import java.io.DataOutputStream;import java.io.EOFException;import java.io.File;import java.io.IOException;import java.io.InputStream;import java.lang.reflect.UndeclaredThrowableException;import java.net.URI;import java.net.URISyntaxException;import java.util.ArrayList;import java.util.Collections;import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Set;import java.util.logging.Level;import java.util.logging.Logger;/** * SrdiIndex */public class SrdiIndex implements Runnable {        /**     * Logger     */    private final static transient Logger LOG = Logger.getLogger(SrdiIndex.class.getName());        private long interval = 1000 * 60 * 10;    private volatile boolean stop = false;    private final Indexer srdiIndexer;    private final BTreeFiler cacheDB;    private Thread gcThread = null;    private final Set<PeerID> gcPeerTBL = new HashSet<PeerID>();        private final String indexName;        /**     * Constructor for the SrdiIndex     *     * @param group     group     * @param indexName the index name     */    public SrdiIndex(PeerGroup group, String indexName) {        this.indexName = indexName;                try {            String pgdir = null;            File storeHome;                        if (group == null) {                pgdir = "srdi-index";                storeHome = new File(".jxta");            } else {                pgdir = group.getPeerGroupID().getUniqueValue().toString();                storeHome = new File(group.getStoreHome());            }                        File rootDir = new File(new File(storeHome, "cm"), pgdir);                        rootDir = new File(rootDir, "srdi");            if (!rootDir.exists()) {                // We need to create the directory                if (!rootDir.mkdirs()) {                    throw new RuntimeException("Cm cannot create directory " + rootDir);                }            }            // peerid database            // Storage            cacheDB = new BTreeFiler();            // lazy checkpoint            cacheDB.setSync(false);            cacheDB.setLocation(rootDir.getCanonicalPath(), indexName);                        if (!cacheDB.open()) {                cacheDB.create();                // now open it                cacheDB.open();            }                        // index            srdiIndexer = new Indexer(false);            srdiIndexer.setLocation(rootDir.getCanonicalPath(), indexName);            if (!srdiIndexer.open()) {                srdiIndexer.create();                // now open it                srdiIndexer.open();            }                        if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {                LOG.info("[" + ((group == null) ? "none" : group.toString()) + "] : Initialized " + indexName);            }        } catch (DBException de) {            if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                LOG.log(Level.SEVERE, "Unable to Initialize databases", de);            }                        throw new UndeclaredThrowableException(de, "Unable to Initialize databases");        } catch (Throwable e) {            if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                LOG.log(Level.SEVERE, "Unable to create Cm", e);            }                        if (e instanceof Error) {                throw (Error) e;            } else if (e instanceof RuntimeException) {                throw (RuntimeException) e;            } else {                throw new UndeclaredThrowableException(e, "Unable to create Cm");            }        }    }        /**     * Construct a SrdiIndex and starts a GC thread which runs every "interval"     * milliseconds     *     * @param interval  the interval at which the gc will run in milliseconds     * @param group     group context     * @param indexName SrdiIndex name     */        public SrdiIndex(PeerGroup group, String indexName, long interval) {        this(group, indexName);        this.interval = interval;        startGC(group, indexName, interval);    }        /**     * Start the GC thread     *     * @param group the PeerGroup     * @param indexName index name     * @param interval interval in milliseconds     */    protected void startGC(PeerGroup group, String indexName, long interval) {        if (Logging.SHOW_INFO && LOG.isLoggable(Level.INFO)) {            LOG.info("[" + ((group == null) ? "none" : group.toString()) + "] : Starting SRDI GC Thread for " + indexName);        }                gcThread = new Thread(group.getHomeThreadGroup(), this, "SrdiIndex GC :" + indexName + " every " + interval + "ms");        gcThread.setDaemon(true);        gcThread.start();    }        /**     * Returns the name of this srdi index.     *     * @return index name.     */    public String getIndexName() {        return indexName;    }        /**     * add an index entry     *     * @param primaryKey primary key     * @param attribute  Attribute String to query on     * @param value      value of the attribute string     * @param expiration expiration associated with this entry relative time in     *                   milliseconds     * @param pid        peerid reference     */    public synchronized void add(String primaryKey, String attribute, String value, PeerID pid, long expiration) {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("[" + indexName + "] Adding " + primaryKey + "/" + attribute + " = \'" + value + "\' for " + pid);        }                try {            Key key = new Key(primaryKey + attribute + value);            long expiresin = TimeUtils.toAbsoluteTimeMillis(expiration);                        // update the record if it exists            synchronized (cacheDB) {                // FIXME hamada 10/14/04 it is possible a peer re-appears with                // a different set of indexes since it's been marked for garbage                // collection.  will address this issue in a subsequent patch                gcPeerTBL.remove(pid);                                Record record = cacheDB.readRecord(key);                List<Entry> old;                                if (record != null) {                    old = readRecord(record).list;                } else {                    old = new ArrayList<Entry>();                }                Entry entry = new Entry(pid, expiresin);                                if (!old.contains(entry)) {                    old.add(entry);                } else {                    // entry exists, replace it (effectively updating expiration)                    old.remove(old.indexOf(entry));                    old.add(entry);                }                // no sense in keeping expired entries.                old = removeExpired(old);                    long t0 = TimeUtils.timeNow();                byte[] data = getData(key, old);                                // if (LOG.isLoggable(Level.FINE)) {                // LOG.fine("Serialized result in : " + (TimeUtils.timeNow() - t0) + "ms.");                // }                if (data == null) {                    if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                        LOG.severe("Failed to serialize data");                    }                    return;                }                Value recordValue = new Value(data);                long pos = cacheDB.writeRecord(key, recordValue);                Map<String, String> indexables = getIndexMap(primaryKey + attribute, value);                                srdiIndexer.addToIndex(indexables, pos);            }        } catch (IOException de) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.log(Level.WARNING, "Failed to add SRDI", de);            }        } catch (DBException de) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.log(Level.WARNING, "Failed to add SRDI", de);            }        }    }        /**     * retrieves a record     *     * @param pkey  primary key     * @param skey  secondary key     * @param value value     * @return List of Entry objects     */    public List<Entry> getRecord(String pkey, String skey, String value) {        Record record = null;                try {            Key key = new Key(pkey + skey + value);                        synchronized (cacheDB) {                record = cacheDB.readRecord(key);            }        } catch (DBException de) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.log(Level.WARNING, "Failed to retrieve SrdiIndex record", de);            }        }        // if record is null, readRecord returns an empty list        return readRecord(record).list;            }    

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -