📄 discoveryserviceimpl.java
字号:
/* * Copyright (c) 2001-2006 Sun Microsystems, Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Sun Microsystems, Inc. for Project JXTA." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must * not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", * nor may "JXTA" appear in their name, without prior written * permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL SUN MICROSYSTEMS OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of Project JXTA. For more * information on Project JXTA, please see * <http://www.jxta.org/>. * * This license is based on the BSD license adopted by the Apache Foundation. * * $Id: DiscoveryServiceImpl.java,v 1.139 2006/08/22 00:24:02 hamada Exp $ */package net.jxta.impl.discovery;import java.beans.PropertyChangeEvent;import java.beans.PropertyChangeListener;import java.io.InputStream;import java.io.StringReader;import java.net.URI;import java.util.Enumeration;import java.util.HashSet;import java.util.Hashtable;import java.util.List;import java.util.Iterator;import java.util.Map;import java.util.Set;import java.util.Vector;import java.io.IOException;import java.util.NoSuchElementException;import org.apache.log4j.Level;import org.apache.log4j.Logger;import net.jxta.credential.Credential;import net.jxta.discovery.DiscoveryEvent;import net.jxta.discovery.DiscoveryListener;import net.jxta.discovery.DiscoveryService;import net.jxta.document.Advertisement;import net.jxta.document.AdvertisementFactory;import net.jxta.document.XMLDocument;import net.jxta.document.MimeMediaType;import net.jxta.document.StructuredDocument;import net.jxta.document.StructuredDocumentFactory;import net.jxta.endpoint.EndpointAddress;import net.jxta.endpoint.OutgoingMessageEvent;import net.jxta.id.ID;import net.jxta.membership.MembershipService;import net.jxta.peer.PeerID;import net.jxta.peergroup.PeerGroup;import net.jxta.platform.Module;import net.jxta.protocol.ConfigParams;import net.jxta.protocol.DiscoveryQueryMsg;import net.jxta.protocol.DiscoveryResponseMsg;import net.jxta.protocol.ModuleImplAdvertisement;import net.jxta.protocol.PeerAdvertisement;import net.jxta.protocol.PeerGroupAdvertisement;import net.jxta.protocol.ResolverQueryMsg;import net.jxta.protocol.ResolverResponseMsg;import net.jxta.protocol.ResolverSrdiMsg;import net.jxta.protocol.SrdiMessage;import net.jxta.rendezvous.RendezVousService;import net.jxta.rendezvous.RendezvousEvent;import net.jxta.rendezvous.RendezvousListener;import net.jxta.resolver.ResolverService;import net.jxta.resolver.SrdiHandler;import net.jxta.service.Service;import net.jxta.exception.PeerGroupException;import net.jxta.impl.cm.Cm;import net.jxta.impl.cm.Srdi;import net.jxta.impl.cm.SrdiIndex;import net.jxta.impl.peergroup.StdPeerGroup;import net.jxta.impl.protocol.DiscoveryConfigAdv;import net.jxta.impl.protocol.DiscoveryQuery;import net.jxta.impl.protocol.DiscoveryResponse;import net.jxta.impl.protocol.ResolverQuery;import net.jxta.impl.protocol.ResolverResponse;import net.jxta.impl.protocol.SrdiMessageImpl;import net.jxta.impl.resolver.InternalQueryHandler;import net.jxta.impl.util.TimeUtils;/** * This Discovery Service implementation provides a mechanism to discover * Advertisements using the Resolver service and SRDI. * * <p/>This implementation uses the standard JXTA Peer Discovery Protocol * (PDP). * * <p/>The DiscoveryService service also provides a way to obtain information * from a specified peer and request other peer advertisements, this method is * particularly useful in the case of a portal where new relationships may be * established starting from a predetermined peer (perhaps described in address * book, or through an invitation). * * @see net.jxta.discovery.DiscoveryService * @see net.jxta.protocol.DiscoveryQueryMsg * @see net.jxta.impl.protocol.DiscoveryQuery * @see net.jxta.protocol.DiscoveryResponseMsg * @see net.jxta.impl.protocol.DiscoveryResponse * @see net.jxta.resolver.ResolverService * @see <a href="http://spec.jxta.org/nonav/v1.0/docbook/JXTAProtocols.html#proto-pdp" target="_blank">JXTA Protocols Specification : Peer Discovery Protocol</a> */public class DiscoveryServiceImpl implements DiscoveryService, InternalQueryHandler, RendezvousListener, SrdiHandler, Srdi.SrdiInterface { /** * Log4J Logger */ private final static Logger LOG = Logger.getLogger(DiscoveryServiceImpl.class.getName()); /** * adv types */ protected final static String[] dirname = { "Peers", "Groups", "Adv"}; private final static String srdiIndexerFileName = "discoverySrdi"; /** * The current query ID. The next query will be issued with this id. */ private static int qid = 0; /** * The maximum number of responses we will return for ANY query. */ private final int MAX_RESPONSES = 50; /** * The cache manager we're going to use to cache jxta advertisements */ protected Cm cm; /** * assignedID as a String. */ private PeerGroup group = null; private String handlerName = null; private ModuleImplAdvertisement implAdvertisement = null; private ResolverService resolver = null; private RendezVousService rendezvous = null; private MembershipService membership = null; private String localPeerId = null; private boolean localonly = false; private boolean alwaysUseReplicaPeer = false; private boolean started = false; /** * The table of discovery listeners. * * <p/><ul> * <li>Values are <@link net.jxta.discovery.DiscoveryListener}</li> * </ul> */ private Set listeners = new HashSet(); /** * The table of discovery query listeners. * * <p/><ul> * <li>Keys are the query ID as an {@link java.lang.Integer}</li> * <li>Values are <@link net.jxta.discovery.DiscoveryListener}</li> * </ul> */ private Hashtable listenerTable = new Hashtable(); private final Object checkPeerAdvLock = new String("Check/Update PeerAdvertisement Lock"); private PeerAdvertisement lastPeerAdv = null; private int lastModCount = -1; private boolean isRdv = false; private SrdiIndex srdiIndex = null; private Srdi srdi = null; private Thread srdiThread = null; private CredentialListener membershipCredListener = null; private Credential credential = null; private StructuredDocument credentialDoc = null; private long initialDelay = 60 * TimeUtils.ASECOND; private long runInterval = 30 * TimeUtils.ASECOND; /** * the discovery interface object */ private DiscoveryService discoveryInterface = null; /** * Listener we use for membership property events. */ private class CredentialListener implements PropertyChangeListener { /** * {@inheritDoc} */ public void propertyChange(PropertyChangeEvent evt) { if ("defaultCredential".equals(evt.getPropertyName())) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("New default credential event"); } synchronized (DiscoveryServiceImpl.this) { credential = (Credential) evt.getNewValue(); credentialDoc = null; if (null != credential) { try { credentialDoc = credential.getDocument(MimeMediaType.XMLUTF8); } catch (Exception all) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Could not generate credential document", all); } } } } } } } /** * {@inheritDoc} */ public synchronized Service getInterface() { if (discoveryInterface == null) { discoveryInterface = new DiscoveryServiceInterface(this); } return discoveryInterface; } /** * {@inheritDoc} */ public Advertisement getImplAdvertisement() { return implAdvertisement; } /** * {@inheritDoc} */ public int getRemoteAdvertisements(String peer, int type, String attribute, String value, int threshold) { return getRemoteAdvertisements(peer, type, attribute, value, threshold, null); } /** * {@inheritDoc} */ public int getRemoteAdvertisements(String peer, int type, String attribute, String value, int threshold, DiscoveryListener listener) { int myQueryID = nextQid(); if (localonly) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("localonly, no network operations performed"); } return myQueryID; } if (resolver == null) { // warn about calling the service before it started if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("resolver has not started yet, query discarded."); } return myQueryID; } if (LOG.isEnabledFor(Level.DEBUG)) { StringBuffer query = new StringBuffer("Sending query#" + myQueryID + " for " + threshold + " " + dirname[type] + " advs"); if (attribute != null) { query.append("\n\tattr = " + attribute); if (value != null) { query.append("\tvalue = " + value); } } LOG.debug(query); } long t0 = System.currentTimeMillis(); DiscoveryQueryMsg dquery = new DiscoveryQuery(); dquery.setDiscoveryType(type); dquery.setAttr(attribute); dquery.setValue(value); dquery.setThreshold(threshold); if (listener != null) { listenerTable.put(new Integer(myQueryID), listener); } ResolverQueryMsg query = new ResolverQuery(); query.setHandlerName(handlerName); query.setCredential(credentialDoc); query.setSrc(localPeerId); query.setQuery(dquery.toString()); query.setQueryId(myQueryID); // check srdi if (peer == null && srdiIndex != null) { Vector res = srdiIndex.query(dirname[type], attribute, value, threshold); if (res.size() > 0) { srdi.forwardQuery(res, query, threshold); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Srdi forward a query #" + myQueryID + " in " + (System.currentTimeMillis() - t0) + "ms."); } return myQueryID; // nothing in srdi, get a starting point in rpv } else if (group.isRendezvous() && attribute != null && value != null) { PeerID destPeer = srdi.getReplicaPeer(dirname[type] + attribute + value); if (destPeer != null) { if (!destPeer.equals(group.getPeerID())) { // forward query increments the hopcount to indicate getReplica // has been invoked once srdi.forwardQuery(destPeer, query); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Srdi forward query #" + myQueryID + " to " + destPeer + " in " + (System.currentTimeMillis() - t0) + "ms."); } return myQueryID; } } } } // no srdi, not a rendezvous, start the walk resolver.sendQuery(peer, query); if (LOG.isEnabledFor(Level.DEBUG)) { if (peer == null) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Sent a query #" + myQueryID + " in " + (System.currentTimeMillis() - t0) + "ms."); } } else { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Sent a query #" + myQueryID + " to " + peer + " in " + (System.currentTimeMillis() - t0) + "ms."); } } } return myQueryID; } /** * {@inheritDoc} */ public Enumeration getLocalAdvertisements(int type, String attribute, String value) throws IOException { if ((type > 2) || (type < 0)) { throw new IllegalArgumentException("Unknown Advertisement type"); } if (LOG.isEnabledFor(Level.DEBUG)) { StringBuffer query = new StringBuffer("Searching for " + dirname[type] + " advs"); if (attribute != null) { query.append("\n\tattr = " + attribute); } if (value != null) { query.append("\tvalue = " + value); } LOG.debug(query);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -