📄 discoveryserviceimpl.java
字号:
/* * Copyright (c) 2001-2007 Sun Microsystems, Inc. All rights reserved. * * The Sun Project JXTA(TM) Software License * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * 3. The end-user documentation included with the redistribution, if any, must * include the following acknowledgment: "This product includes software * developed by Sun Microsystems, Inc. for JXTA(TM) technology." * Alternately, this acknowledgment may appear in the software itself, if * and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must * not be used to endorse or promote products derived from this software * without prior written permission. For written permission, please contact * Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", nor may * "JXTA" appear in their name, without prior written permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * JXTA is a registered trademark of Sun Microsystems, Inc. in the United * States and other countries. * * Please see the license information page at : * <http://www.jxta.org/project/www/license.html> for instructions on use of * the license in source files. * * ==================================================================== * * This software consists of voluntary contributions made by many individuals * on behalf of Project JXTA. For more information on Project JXTA, please see * http://www.jxta.org. * * This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.impl.discovery;import net.jxta.credential.Credential;import net.jxta.discovery.DiscoveryEvent;import net.jxta.discovery.DiscoveryListener;import net.jxta.discovery.DiscoveryService;import net.jxta.document.Advertisement;import net.jxta.document.AdvertisementFactory;import net.jxta.document.MimeMediaType;import net.jxta.document.StructuredDocument;import net.jxta.document.StructuredDocumentFactory;import net.jxta.document.XMLDocument;import net.jxta.endpoint.EndpointAddress;import net.jxta.endpoint.OutgoingMessageEvent;import net.jxta.exception.PeerGroupException;import net.jxta.id.ID;import net.jxta.impl.cm.Cm;import net.jxta.impl.cm.Srdi;import net.jxta.impl.cm.SrdiIndex;import net.jxta.impl.peergroup.StdPeerGroup;import net.jxta.impl.protocol.DiscoveryConfigAdv;import net.jxta.impl.protocol.DiscoveryQuery;import net.jxta.impl.protocol.DiscoveryResponse;import net.jxta.impl.protocol.ResolverQuery;import net.jxta.impl.protocol.ResolverResponse;import net.jxta.impl.protocol.SrdiMessageImpl;import net.jxta.impl.resolver.InternalQueryHandler;import net.jxta.impl.util.TimeUtils;import net.jxta.logging.Logging;import net.jxta.membership.MembershipService;import net.jxta.peer.PeerID;import net.jxta.peergroup.PeerGroup;import net.jxta.platform.Module;import net.jxta.protocol.ConfigParams;import net.jxta.protocol.DiscoveryQueryMsg;import net.jxta.protocol.DiscoveryResponseMsg;import net.jxta.protocol.ModuleImplAdvertisement;import net.jxta.protocol.PeerAdvertisement;import net.jxta.protocol.PeerGroupAdvertisement;import net.jxta.protocol.ResolverQueryMsg;import net.jxta.protocol.ResolverResponseMsg;import net.jxta.protocol.ResolverSrdiMsg;import net.jxta.protocol.SrdiMessage;import net.jxta.rendezvous.RendezVousService;import net.jxta.rendezvous.RendezvousEvent;import net.jxta.rendezvous.RendezvousListener;import net.jxta.resolver.ResolverService;import net.jxta.resolver.SrdiHandler;import net.jxta.service.Service;import java.beans.PropertyChangeEvent;import java.beans.PropertyChangeListener;import java.io.IOException;import java.io.InputStream;import java.io.StringReader;import java.net.URI;import java.text.MessageFormat;import java.util.ArrayList;import java.util.Collections;import java.util.Enumeration;import java.util.HashSet;import java.util.Hashtable;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.NoSuchElementException;import java.util.Set;import java.util.concurrent.atomic.AtomicInteger;import java.util.logging.Level;import java.util.logging.Logger;/** * This Discovery Service implementation provides a mechanism to discover * Advertisements using the Resolver service and SRDI. * <p/> * <p/>This implementation uses the standard JXTA Peer Discovery Protocol * (PDP). * <p/> * <p/>The DiscoveryService service also provides a way to obtain information * from a specified peer and request other peer advertisements, this method is * particularly useful in the case of a portal where new relationships may be * established starting from a predetermined peer (perhaps described in address * book, or through an invitation). * * @see net.jxta.discovery.DiscoveryService * @see net.jxta.protocol.DiscoveryQueryMsg * @see net.jxta.impl.protocol.DiscoveryQuery * @see net.jxta.protocol.DiscoveryResponseMsg * @see net.jxta.impl.protocol.DiscoveryResponse * @see net.jxta.resolver.ResolverService * @see <a href="https://jxta-spec.dev.java.net/nonav/JXTAProtocols.html#proto-pdp" target="_blank">JXTA Protocols Specification : Peer Discovery Protocol</a> */public class DiscoveryServiceImpl implements DiscoveryService, InternalQueryHandler, RendezvousListener, SrdiHandler, Srdi.SrdiInterface { /** * Logger */ private final static Logger LOG = Logger.getLogger(DiscoveryServiceImpl.class.getName()); /** * adv types */ final static String[] dirname = {"Peers", "Groups", "Adv"}; /** * The Query ID which will be associated with remote publish operations. */ private final static int REMOTE_PUBLISH_QUERYID = 0; private final static String srdiIndexerFileName = "discoverySrdi"; /** * The current resolver query ID. static to make debugging easier. */ private final static AtomicInteger qid = new AtomicInteger(0); /** * The maximum number of responses we will return for ANY query. */ private final static int MAX_RESPONSES = 50; /** * The cache manager we're going to use to cache jxta advertisements */ protected Cm cm; /** * assignedID as a String. */ private PeerGroup group = null; private String handlerName = null; private ModuleImplAdvertisement implAdvertisement = null; private ResolverService resolver = null; private RendezVousService rendezvous = null; private MembershipService membership = null; private PeerID localPeerId = null; private boolean localonly = false; private boolean alwaysUseReplicaPeer = false; private boolean stopped = true; /** * The table of discovery listeners. */ private Set<DiscoveryListener> listeners = new HashSet<DiscoveryListener>(); /** * The table of discovery query listeners. */ private Hashtable<Integer, DiscoveryListener> listenerTable = new Hashtable<Integer, DiscoveryListener>(); private final String checkPeerAdvLock = new String("Check/Update PeerAdvertisement Lock"); private PeerAdvertisement lastPeerAdv = null; private int lastModCount = -1; private boolean isRdv = false; private SrdiIndex srdiIndex = null; private Srdi srdi = null; private Thread srdiThread = null; private CredentialListener membershipCredListener = null; private Credential credential = null; private StructuredDocument credentialDoc = null; private long initialDelay = 60 * TimeUtils.ASECOND; private long runInterval = 30 * TimeUtils.ASECOND; /** * the discovery interface object */ private DiscoveryService discoveryInterface = null; /** * Listener we use for membership property events. */ private class CredentialListener implements PropertyChangeListener { /** * {@inheritDoc} */ public void propertyChange(PropertyChangeEvent evt) { if ("defaultCredential".equals(evt.getPropertyName())) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("New default credential event"); } synchronized (DiscoveryServiceImpl.this) { credential = (Credential) evt.getNewValue(); credentialDoc = null; if (null != credential) { try { credentialDoc = credential.getDocument(MimeMediaType.XMLUTF8); } catch (Exception all) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Could not generate credential document", all); } } } } } } } /** * {@inheritDoc} */ public synchronized Service getInterface() { if (discoveryInterface == null) { discoveryInterface = new DiscoveryServiceInterface(this); } return discoveryInterface; } /** * {@inheritDoc} */ public Advertisement getImplAdvertisement() { return implAdvertisement; } /** * {@inheritDoc} */ public int getRemoteAdvertisements(String peer, int type, String attribute, String value, int threshold) { return getRemoteAdvertisements(peer, type, attribute, value, threshold, null); } /** * {@inheritDoc} */ public int getRemoteAdvertisements(String peer, int type, String attribute, String value, int threshold, DiscoveryListener listener) { int myQueryID = qid.incrementAndGet(); if (localonly || stopped) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("localonly, no network operations performed"); } return myQueryID; } if (resolver == null) { // warn about calling the service before it started if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("resolver has not started yet, query discarded."); } return myQueryID; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { StringBuilder query = new StringBuilder( "Sending query#" + myQueryID + " for " + threshold + " " + dirname[type] + " advs"); if (attribute != null) { query.append("\n\tattr = ").append(attribute); if (value != null) { query.append("\tvalue = ").append(value); } } LOG.fine(query.toString()); } long t0 = System.currentTimeMillis(); DiscoveryQueryMsg dquery = new DiscoveryQuery(); dquery.setDiscoveryType(type); dquery.setAttr(attribute); dquery.setValue(value); dquery.setThreshold(threshold); if (listener != null) { listenerTable.put(myQueryID, listener); } ResolverQueryMsg query = new ResolverQuery(); query.setHandlerName(handlerName); query.setCredential(credentialDoc); query.setSrcPeer(localPeerId); query.setQuery(dquery.toString()); query.setQueryId(myQueryID); // check srdi if (peer == null && srdiIndex != null) { List<PeerID> res = srdiIndex.query(dirname[type], attribute, value, threshold); if (!res.isEmpty()) { srdi.forwardQuery(res, query, threshold); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Srdi forward a query #" + myQueryID + " in " + (System.currentTimeMillis() - t0) + "ms."); } return myQueryID; // nothing in srdi, get a starting point in rpv } else if (group.isRendezvous() && attribute != null && value != null) { PeerID destPeer = srdi.getReplicaPeer(dirname[type] + attribute + value); if (destPeer != null) { if (!destPeer.equals(group.getPeerID())) { // forward query increments the hopcount to indicate getReplica // has been invoked once srdi.forwardQuery(destPeer, query); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine( "Srdi forward query #" + myQueryID + " to " + destPeer + " in " + (System.currentTimeMillis() - t0) + "ms."); } return myQueryID; } } } } // no srdi, not a rendezvous, start the walk resolver.sendQuery(peer, query); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { if (peer == null) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sent a query #" + myQueryID + " in " + (System.currentTimeMillis() - t0) + "ms."); } } else { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sent a query #" + myQueryID + " to " + peer + " in " + (System.currentTimeMillis() - t0) + "ms."); } } } return myQueryID; } /** * {@inheritDoc} */ public Enumeration<Advertisement> getLocalAdvertisements(int type, String attribute, String value) throws IOException { if ((type > 2) || (type < 0)) { throw new IllegalArgumentException("Unknown Advertisement type"); } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { StringBuilder query = new StringBuilder("Searching for " + dirname[type] + " advs"); if (attribute != null) { query.append("\n\tattr = ").append(attribute); } if (value != null) { query.append("\tvalue = ").append(value); } LOG.fine(query.toString()); } return Collections.enumeration(search(type, attribute, value, Integer.MAX_VALUE, false, null)); } /** * {@inheritDoc} */ public void init(PeerGroup pg, ID assignedID, Advertisement impl) throws PeerGroupException { group = pg; handlerName = assignedID.toString(); implAdvertisement = (ModuleImplAdvertisement) impl; localPeerId = group.getPeerID(); ConfigParams confAdv = pg.getConfigAdvertisement();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -