⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 discoveryserviceimpl.java

📁 jxta平台的开发包
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
/* *  Copyright (c) 2001-2006 Sun Microsystems, Inc.  All rights reserved. * *  Redistribution and use in source and binary forms, with or without *  modification, are permitted provided that the following conditions *  are met: * *  1. Redistributions of source code must retain the above copyright *  notice, this list of conditions and the following disclaimer. * *  2. Redistributions in binary form must reproduce the above copyright *  notice, this list of conditions and the following disclaimer in *  the documentation and/or other materials provided with the *  distribution. * *  3. The end-user documentation included with the redistribution, *  if any, must include the following acknowledgment: *  "This product includes software developed by the *  Sun Microsystems, Inc. for Project JXTA." *  Alternately, this acknowledgment may appear in the software itself, *  if and wherever such third-party acknowledgments normally appear. * *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must *  not be used to endorse or promote products derived from this *  software without prior written permission. For written *  permission, please contact Project JXTA at http://www.jxta.org. * *  5. Products derived from this software may not be called "JXTA", *  nor may "JXTA" appear in their name, without prior written *  permission of Sun. * *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED *  WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES *  OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE *  DISCLAIMED.  IN NO EVENT SHALL SUN MICROSYSTEMS OR *  ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF *  USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND *  ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, *  OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT *  OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF *  SUCH DAMAGE. *  ==================================================================== * *  This software consists of voluntary contributions made by many *  individuals on behalf of Project JXTA.  For more *  information on Project JXTA, please see *  <http://www.jxta.org/>. * *  This license is based on the BSD license adopted by the Apache Foundation. * *  $Id: DiscoveryServiceImpl.java,v 1.139 2006/08/22 00:24:02 hamada Exp $ */package net.jxta.impl.discovery;import java.beans.PropertyChangeEvent;import java.beans.PropertyChangeListener;import java.io.InputStream;import java.io.StringReader;import java.net.URI;import java.util.Enumeration;import java.util.HashSet;import java.util.Hashtable;import java.util.List;import java.util.Iterator;import java.util.Map;import java.util.Set;import java.util.Vector;import java.io.IOException;import java.util.NoSuchElementException;import org.apache.log4j.Level;import org.apache.log4j.Logger;import net.jxta.credential.Credential;import net.jxta.discovery.DiscoveryEvent;import net.jxta.discovery.DiscoveryListener;import net.jxta.discovery.DiscoveryService;import net.jxta.document.Advertisement;import net.jxta.document.AdvertisementFactory;import net.jxta.document.XMLDocument;import net.jxta.document.MimeMediaType;import net.jxta.document.StructuredDocument;import net.jxta.document.StructuredDocumentFactory;import net.jxta.endpoint.EndpointAddress;import net.jxta.endpoint.OutgoingMessageEvent;import net.jxta.id.ID;import net.jxta.membership.MembershipService;import net.jxta.peer.PeerID;import net.jxta.peergroup.PeerGroup;import net.jxta.platform.Module;import net.jxta.protocol.ConfigParams;import net.jxta.protocol.DiscoveryQueryMsg;import net.jxta.protocol.DiscoveryResponseMsg;import net.jxta.protocol.ModuleImplAdvertisement;import net.jxta.protocol.PeerAdvertisement;import net.jxta.protocol.PeerGroupAdvertisement;import net.jxta.protocol.ResolverQueryMsg;import net.jxta.protocol.ResolverResponseMsg;import net.jxta.protocol.ResolverSrdiMsg;import net.jxta.protocol.SrdiMessage;import net.jxta.rendezvous.RendezVousService;import net.jxta.rendezvous.RendezvousEvent;import net.jxta.rendezvous.RendezvousListener;import net.jxta.resolver.ResolverService;import net.jxta.resolver.SrdiHandler;import net.jxta.service.Service;import net.jxta.exception.PeerGroupException;import net.jxta.impl.cm.Cm;import net.jxta.impl.cm.Srdi;import net.jxta.impl.cm.SrdiIndex;import net.jxta.impl.peergroup.StdPeerGroup;import net.jxta.impl.protocol.DiscoveryConfigAdv;import net.jxta.impl.protocol.DiscoveryQuery;import net.jxta.impl.protocol.DiscoveryResponse;import net.jxta.impl.protocol.ResolverQuery;import net.jxta.impl.protocol.ResolverResponse;import net.jxta.impl.protocol.SrdiMessageImpl;import net.jxta.impl.resolver.InternalQueryHandler;import net.jxta.impl.util.TimeUtils;/** * This Discovery Service implementation provides a mechanism to discover * Advertisements using the Resolver service and SRDI. * * <p/>This implementation uses the standard JXTA Peer Discovery Protocol * (PDP). * * <p/>The DiscoveryService service also provides a way to obtain information * from a specified peer and request other peer advertisements, this method is * particularly useful in the case of a portal where new relationships may be * established starting from a predetermined peer (perhaps described in address * book, or through an invitation). * * @see net.jxta.discovery.DiscoveryService * @see net.jxta.protocol.DiscoveryQueryMsg * @see net.jxta.impl.protocol.DiscoveryQuery * @see net.jxta.protocol.DiscoveryResponseMsg * @see net.jxta.impl.protocol.DiscoveryResponse * @see net.jxta.resolver.ResolverService * @see <a href="http://spec.jxta.org/nonav/v1.0/docbook/JXTAProtocols.html#proto-pdp" target="_blank">JXTA Protocols Specification : Peer Discovery Protocol</a> */public class DiscoveryServiceImpl implements DiscoveryService,            InternalQueryHandler,            RendezvousListener,            SrdiHandler,    Srdi.SrdiInterface {    /**     *  Log4J Logger     */    private final static Logger LOG = Logger.getLogger(DiscoveryServiceImpl.class.getName());    /**     *  adv types     */    protected final static String[] dirname = { "Peers", "Groups", "Adv"};    private final static String srdiIndexerFileName = "discoverySrdi";    /**     *  The current query ID. The next query will be issued with this id.     */    private static int qid = 0;    /**     *  The maximum number of responses we will return for ANY query.     */    private final int MAX_RESPONSES = 50;    /**     * The cache manager we're going to use to cache jxta advertisements     */    protected Cm cm;    /**     *  assignedID as a String.     */    private PeerGroup group = null;    private String handlerName = null;    private ModuleImplAdvertisement implAdvertisement = null;    private ResolverService resolver = null;    private RendezVousService rendezvous = null;    private MembershipService membership = null;    private String localPeerId = null;    private boolean localonly = false;    private boolean alwaysUseReplicaPeer = false;    private boolean started = false;    /**     *  The table of discovery listeners.     *     *  <p/><ul>     *      <li>Values are <@link net.jxta.discovery.DiscoveryListener}</li>     *  </ul>     */    private Set listeners = new HashSet();    /**     *  The table of discovery query listeners.     *     *  <p/><ul>     *      <li>Keys are the query ID as an {@link java.lang.Integer}</li>     *      <li>Values are <@link net.jxta.discovery.DiscoveryListener}</li>     *  </ul>     */    private Hashtable listenerTable = new Hashtable();    private final Object checkPeerAdvLock = new String("Check/Update PeerAdvertisement Lock");    private PeerAdvertisement lastPeerAdv = null;    private int lastModCount = -1;    private boolean isRdv = false;    private SrdiIndex srdiIndex = null;    private Srdi srdi = null;    private Thread srdiThread = null;    private CredentialListener membershipCredListener = null;    private Credential credential = null;    private StructuredDocument credentialDoc = null;    private long initialDelay = 60 * TimeUtils.ASECOND;    private long runInterval = 30 * TimeUtils.ASECOND;    /**     *  the discovery interface object     */    private DiscoveryService discoveryInterface = null;    /**     *  Listener we use for membership property events.     */    private class CredentialListener implements PropertyChangeListener {        /**         *  {@inheritDoc}         */        public void propertyChange(PropertyChangeEvent evt) {            if ("defaultCredential".equals(evt.getPropertyName())) {                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("New default credential event");                }                synchronized (DiscoveryServiceImpl.this) {                    credential = (Credential) evt.getNewValue();                    credentialDoc = null;                    if (null != credential) {                        try {                            credentialDoc = credential.getDocument(MimeMediaType.XMLUTF8);                        } catch (Exception all) {                            if (LOG.isEnabledFor(Level.WARN)) {                                LOG.warn("Could not generate credential document", all);                            }                        }                    }                }            }        }    }    /**     *  {@inheritDoc}     */    public synchronized Service getInterface() {        if (discoveryInterface == null) {            discoveryInterface = new DiscoveryServiceInterface(this);        }        return discoveryInterface;    }    /**     *  {@inheritDoc}     */    public Advertisement getImplAdvertisement() {        return implAdvertisement;    }    /**     *  {@inheritDoc}     */    public int getRemoteAdvertisements(String peer,                                       int type,                                       String attribute,                                       String value,                                       int threshold) {        return getRemoteAdvertisements(peer, type, attribute, value, threshold, null);    }    /**     *  {@inheritDoc}     */    public int getRemoteAdvertisements(String peer,                                       int type,                                       String attribute,                                       String value,                                       int threshold,                                       DiscoveryListener listener) {        int myQueryID = nextQid();        if (localonly) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("localonly, no network operations performed");            }            return myQueryID;        }        if (resolver == null) {            // warn about calling the service before it started            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("resolver has not started yet, query discarded.");            }            return myQueryID;        }        if (LOG.isEnabledFor(Level.DEBUG)) {            StringBuffer query = new StringBuffer("Sending query#" + myQueryID + " for " + threshold + " " + dirname[type] + " advs");            if (attribute != null) {                query.append("\n\tattr = " + attribute);                if (value != null) {                    query.append("\tvalue = " + value);                }            }            LOG.debug(query);        }        long t0 = System.currentTimeMillis();        DiscoveryQueryMsg dquery = new DiscoveryQuery();        dquery.setDiscoveryType(type);        dquery.setAttr(attribute);        dquery.setValue(value);        dquery.setThreshold(threshold);        if (listener != null) {            listenerTable.put(new Integer(myQueryID), listener);        }        ResolverQueryMsg query = new ResolverQuery();        query.setHandlerName(handlerName);        query.setCredential(credentialDoc);        query.setSrc(localPeerId);        query.setQuery(dquery.toString());        query.setQueryId(myQueryID);        // check srdi        if (peer == null && srdiIndex != null) {            Vector res = srdiIndex.query(dirname[type], attribute, value, threshold);            if (res.size() > 0) {                srdi.forwardQuery(res, query, threshold);                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("Srdi forward a query #" + myQueryID + " in " + (System.currentTimeMillis() - t0) + "ms.");                }                return myQueryID;                // nothing in srdi, get a starting point in rpv            }            else                if (group.isRendezvous() && attribute != null && value != null) {                    PeerID destPeer = srdi.getReplicaPeer(dirname[type] + attribute + value);                    if (destPeer != null) {                        if (!destPeer.equals(group.getPeerID())) {                            // forward query increments the hopcount to indicate getReplica                            // has been invoked once                            srdi.forwardQuery(destPeer, query);                            if (LOG.isEnabledFor(Level.DEBUG)) {                                LOG.debug("Srdi forward query #" + myQueryID + " to " + destPeer + " in " + (System.currentTimeMillis() - t0) + "ms.");                            }                            return myQueryID;                        }                    }                }        }        // no srdi, not a rendezvous, start the walk        resolver.sendQuery(peer, query);        if (LOG.isEnabledFor(Level.DEBUG)) {            if (peer == null) {                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("Sent a query #" + myQueryID + " in " + (System.currentTimeMillis() - t0) + "ms.");                }            } else {                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("Sent a query #" + myQueryID + " to " + peer + " in " + (System.currentTimeMillis() - t0) + "ms.");                }            }        }        return myQueryID;    }    /**     *  {@inheritDoc}     */    public Enumeration getLocalAdvertisements(int type,            String attribute,            String value) throws IOException {        if ((type > 2) || (type < 0)) {            throw new IllegalArgumentException("Unknown Advertisement type");        }        if (LOG.isEnabledFor(Level.DEBUG)) {            StringBuffer query = new StringBuffer("Searching for " + dirname[type] + " advs");            if (attribute != null) {                query.append("\n\tattr = " + attribute);            }            if (value != null) {                query.append("\tvalue = " + value);            }            LOG.debug(query);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -