📄 piperesolver.java
字号:
/* * Copyright (c) 2001-2007 Sun Microsystems, Inc. All rights reserved. * * The Sun Project JXTA(TM) Software License * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * 3. The end-user documentation included with the redistribution, if any, must * include the following acknowledgment: "This product includes software * developed by Sun Microsystems, Inc. for JXTA(TM) technology." * Alternately, this acknowledgment may appear in the software itself, if * and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" must * not be used to endorse or promote products derived from this software * without prior written permission. For written permission, please contact * Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", nor may * "JXTA" appear in their name, without prior written permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED WARRANTIES, * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SUN * MICROSYSTEMS OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * JXTA is a registered trademark of Sun Microsystems, Inc. in the United * States and other countries. * * Please see the license information page at : * <http://www.jxta.org/project/www/license.html> for instructions on use of * the license in source files. * * ==================================================================== * * This software consists of voluntary contributions made by many individuals * on behalf of Project JXTA. For more information on Project JXTA, please see * http://www.jxta.org. * * This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.impl.pipe;import net.jxta.credential.Credential;import net.jxta.discovery.DiscoveryService;import net.jxta.document.MimeMediaType;import net.jxta.document.StructuredDocumentFactory;import net.jxta.document.StructuredTextDocument;import net.jxta.document.XMLDocument;import net.jxta.endpoint.EndpointAddress;import net.jxta.endpoint.EndpointListener;import net.jxta.endpoint.EndpointService;import net.jxta.endpoint.OutgoingMessageEvent;import net.jxta.id.ID;import net.jxta.impl.cm.Srdi;import net.jxta.impl.cm.Srdi.SrdiInterface;import net.jxta.impl.cm.SrdiIndex;import net.jxta.impl.protocol.PipeResolverMsg;import net.jxta.impl.protocol.ResolverQuery;import net.jxta.impl.protocol.SrdiMessageImpl;import net.jxta.impl.resolver.InternalQueryHandler;import net.jxta.impl.util.TimeUtils;import net.jxta.logging.Logging;import net.jxta.membership.MembershipService;import net.jxta.peer.PeerID;import net.jxta.peergroup.PeerGroup;import net.jxta.pipe.InputPipe;import net.jxta.pipe.PipeID;import net.jxta.pipe.PipeService;import net.jxta.protocol.PeerAdvertisement;import net.jxta.protocol.PipeAdvertisement;import net.jxta.protocol.PipeResolverMessage;import net.jxta.protocol.PipeResolverMessage.MessageType;import net.jxta.protocol.ResolverQueryMsg;import net.jxta.protocol.ResolverResponseMsg;import net.jxta.protocol.ResolverSrdiMsg;import net.jxta.protocol.SrdiMessage;import net.jxta.protocol.SrdiMessage.Entry;import net.jxta.rendezvous.RendezVousService;import net.jxta.rendezvous.RendezVousStatus;import net.jxta.resolver.ResolverService;import net.jxta.resolver.SrdiHandler;import java.beans.PropertyChangeEvent;import java.beans.PropertyChangeListener;import java.io.IOException;import java.io.Reader;import java.io.StringReader;import java.util.ArrayList;import java.util.Collection;import java.util.EventListener;import java.util.EventObject;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Set;import java.util.logging.Level;import java.util.logging.Logger;/** * This class implements the Resolver interfaces for a PipeServiceImpl. */class PipeResolver implements SrdiInterface, InternalQueryHandler, SrdiHandler, PipeRegistrar { /** * Logger */ private final static transient Logger LOG = Logger.getLogger(PipeResolver.class.getName()); private final static String PipeResolverName = "JxtaPipeResolver"; private final static String srdiIndexerFileName = "pipeResolverSrdi"; /** * Local SRDI GC Interval */ private final static long GcDelay = 1 * TimeUtils.AMINUTE; /** * Constant for pipe event listeners to signify any query id. */ final static int ANYQUERY = 0; /** * The current query ID. The next value returned by {@link #getNextQueryID()} * will be one greater than this value. */ private static int currentQueryID = 1; /** * Group we are working for */ private PeerGroup myGroup = null; /** * Group we are working for */ private EndpointService endpoint = null; /** * Resolver Service we will register with */ private ResolverService resolver = null; /** * The discovery service we will use */ private DiscoveryService discovery = null; /** * Membership Service we will use */ private MembershipService membership = null; private Srdi srdi = null; private Thread srdiThread = null; private SrdiIndex srdiIndex = null; private RendezVousService rendezvous = null; /** * The locally registered {@link net.jxta.pipe.InputPipe}s */ private final Map<ID, InputPipe> localInputPipes = new HashMap<ID, InputPipe>(); /** * Registered listeners for pipe events. */ private final Map<ID, Map<Integer, Listener>> outputpipeListeners = new HashMap<ID, Map<Integer, Listener>>(); /** * Encapsulates current Membership Service credential. */ final static class CurrentCredential { /** * The current default credential */ final Credential credential; /** * The current default credential in serialized XML form. */ final XMLDocument credentialDoc; CurrentCredential(Credential credential, XMLDocument credentialDoc) { this.credential = credential; this.credentialDoc = credentialDoc; } } /** * The current Membership service default credential. */ CurrentCredential currentCredential; /** * Listener we use for membership property events. */ private class CredentialListener implements PropertyChangeListener { /** * Standard Constructor */ CredentialListener() {} /** * {@inheritDoc} */ public void propertyChange(PropertyChangeEvent evt) { if (MembershipService.DEFAULT_CREDENTIAL_PROPERTY.equals(evt.getPropertyName())) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("New default credential event"); } synchronized (PipeResolver.this) { Credential cred = (Credential) evt.getNewValue(); XMLDocument credentialDoc; if (null != cred) { try { credentialDoc = (XMLDocument) cred.getDocument(MimeMediaType.XMLUTF8); currentCredential = new CurrentCredential(cred, credentialDoc); } catch (Exception all) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "Could not generate credential document", all); } currentCredential = null; } } else { currentCredential = null; } } } } } final CredentialListener membershipCredListener = new CredentialListener(); /** * A pipe resolver event. */ static class Event extends EventObject { private final ID peerid; private final ID pipeid; private final String type; private final int queryID; /** * Creates a new pipe resolution event * * @param source The PipeResolver generating the event. * @param peerid The peer on which the pipe was found * @param pipeid the pipe which was found * @param type the type of pipe which was found * @param queryid The query id associated with the response returned in this event */ public Event(PipeResolver source, ID peerid, ID pipeid, String type, int queryid) { super(source); this.peerid = peerid; this.pipeid = pipeid; this.type = type; this.queryID = queryid; } /** * Returns the peer associated with the event * * @return peerid */ public ID getPeerID() { return peerid; } /** * Returns the pipe associated with the event * * @return pipeid */ public ID getPipeID() { return pipeid; } /** * Returns the type of the pipe that is associated with the event * * @return type */ public String getType() { return type; } /** * Returns The query id associated with the response returned in this event * * @return query id associated with the response */ public int getQueryID() { return queryID; } } /** * Pipe Resolver Event Listener. Implement this interface is you wish to * Receive Pipe Resolver events. */ interface Listener extends EventListener { /** * Pipe Resolve event * * @param event event the PipeResolver Event * @return true if the event was handled otherwise false */ boolean pipeResolveEvent(Event event); /** * A NAK Event was received for this pipe * * @param event event the PipeResolver Event * @return true if the event was handled otherwise false */ boolean pipeNAKEvent(Event event); } /** * return the next query id. * * @return the next eligible query id. */ static synchronized int getNextQueryID() { currentQueryID++; if (currentQueryID == Integer.MAX_VALUE) { currentQueryID = 1; } return currentQueryID; } /** * Constructor for the PipeResolver object * * @param peerGroup group for which this PipeResolver operates in */ PipeResolver(PeerGroup peerGroup) { myGroup = peerGroup; resolver = myGroup.getResolverService(); membership = myGroup.getMembershipService(); rendezvous = myGroup.getRendezVousService(); endpoint = myGroup.getEndpointService(); // Register to the Generic ResolverServiceImpl resolver.registerHandler(PipeResolverName, this); // start srdi srdiIndex = new SrdiIndex(myGroup, srdiIndexerFileName, GcDelay); srdi = new Srdi(myGroup, PipeResolverName, this, srdiIndex, 2 * TimeUtils.AMINUTE, 1 * TimeUtils.AYEAR); srdiThread = new Thread(myGroup.getHomeThreadGroup(), srdi, "Pipe Resolver Srdi Thread"); srdiThread.setDaemon(true); srdiThread.start(); resolver.registerSrdiHandler(PipeResolverName, this); synchronized (this) { // register our credential listener. membership.addPropertyChangeListener(MembershipService.DEFAULT_CREDENTIAL_PROPERTY, membershipCredListener); try { // set the initial version of the default credential. currentCredential = null; Credential credential = membership.getDefaultCredential(); XMLDocument credentialDoc; if (null != credential) { credentialDoc = (XMLDocument) credential.getDocument(MimeMediaType.XMLUTF8); currentCredential = new CurrentCredential(credential, credentialDoc); } } catch (Exception all) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "could not get default credential", all); } } } } private boolean isRendezvous() { if (rendezvous == null) { rendezvous = myGroup.getRendezVousService(); } RendezVousStatus mode = rendezvous.getRendezVousStatus(); return (mode == RendezVousStatus.RENDEZVOUS ||mode == RendezVousStatus.AUTO_RENDEZVOUS); } /** * {@inheritDoc} */ public int processQuery(ResolverQueryMsg query) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -