📄 piperesolver.java
字号:
/* * $Id: PipeResolver.java,v 1.134 2006/07/04 14:17:02 vwilliams Exp $ * * Copyright (c) 2001 Sun Microsystems, Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Sun Microsystems, Inc. for Project JXTA." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" * must not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", * nor may "JXTA" appear in their name, without prior written * permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of Project JXTA. For more * information on Project JXTA, please see * <http://www.jxta.org/>. * * This license is based on the BSD license adopted by the Apache Foundation. * */package net.jxta.impl.pipe;import java.beans.PropertyChangeEvent;import java.beans.PropertyChangeListener;import java.io.IOException;import java.io.Reader;import java.io.StringReader;import java.util.Arrays;import java.util.Collection;import java.util.EventListener;import java.util.EventObject;import java.util.HashMap;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Set;import java.util.Vector;import org.apache.log4j.Level;import org.apache.log4j.Logger;import net.jxta.credential.Credential;import net.jxta.discovery.DiscoveryService;import net.jxta.document.MimeMediaType;import net.jxta.document.StructuredDocument;import net.jxta.document.StructuredDocumentFactory;import net.jxta.document.StructuredTextDocument;import net.jxta.endpoint.EndpointAddress;import net.jxta.endpoint.EndpointListener;import net.jxta.endpoint.OutgoingMessageEvent;import net.jxta.id.ID;import net.jxta.membership.MembershipService;import net.jxta.peer.PeerID;import net.jxta.peergroup.PeerGroup;import net.jxta.pipe.InputPipe;import net.jxta.pipe.PipeID;import net.jxta.pipe.PipeService;import net.jxta.protocol.PeerAdvertisement;import net.jxta.protocol.PipeAdvertisement;import net.jxta.protocol.PipeResolverMessage;import net.jxta.protocol.PipeResolverMessage.MessageType;import net.jxta.protocol.ResolverQueryMsg;import net.jxta.protocol.ResolverResponseMsg;import net.jxta.protocol.ResolverSrdiMsg;import net.jxta.protocol.SrdiMessage;import net.jxta.protocol.SrdiMessage.Entry;import net.jxta.resolver.ResolverService;import net.jxta.resolver.SrdiHandler;import net.jxta.impl.cm.Srdi;import net.jxta.impl.cm.Srdi.SrdiInterface;import net.jxta.impl.cm.SrdiIndex;import net.jxta.impl.protocol.PipeResolverMsg;import net.jxta.impl.protocol.ResolverQuery;import net.jxta.impl.protocol.SrdiMessageImpl;import net.jxta.impl.resolver.InternalQueryHandler;import net.jxta.impl.util.TimeUtils;/** * This class implements the Resolver interfaces for a PipeServiceImpl. * **/class PipeResolver implements SrdiInterface, InternalQueryHandler, SrdiHandler, PipeRegistrar { /** * Log4J Logger **/ private final static Logger LOG = Logger.getLogger(PipeResolver.class.getName()); private final static String PipeResolverName = "JxtaPipeResolver"; private final static String srdiIndexerFileName = "pipeResolverSrdi"; /** * Local SRDI GC Interval **/ private final static long GcDelay = 1 * TimeUtils.AMINUTE; /** * Constant for pipe event listeners to signify any query id. **/ final static int ANYQUERY = 0; /** * The current query ID. The next value returned by {@link #getNextQueryID()} * will be one greater than this value. **/ private static int currentQueryID = 1; /** * Group we are working for **/ private PeerGroup myGroup = null; /** * Resolver Service we will register with **/ private ResolverService resolver = null; /** * The discovery service we will use **/ private DiscoveryService discovery = null; /** * Membership Service we will use **/ private MembershipService membership = null; private Srdi srdi = null; private Thread srdiThread = null; private SrdiIndex srdiIndex = null; /** * Listener we register for credential changes. **/ private CredentialListener membershipCredListener = null; /** * The credential we will include in queries and responses. **/ private Credential credential = null; /** * The credential as a document. **/ private StructuredDocument credentialDoc = null; /** * The locally registered {@link net.jxta.pipe.InputPipe}s * * <p/><ul> * <li>Keys are {@link net.jxta.pipe.PipeID}s</li> * <li>Values are {@link net.jxta.pipe.InputPipe}.</li> * </ul> **/ private Map localInputPipes = new HashMap(); /** * Registered listeners for pipe events. * * <p/><ul> * <li>Keys are {@link net.jxta.pipe.PipeID}s</li> * <li>Values are {@link java.util.HashMap}. * <ul> * <li>Keys are query ids as {@link java.lang.Integer}s</li> * <li>Values are {@link Listener}.</li> * </ul> * </li> * </ul> **/ private Map outputpipeListeners = new HashMap(); /** * Listener we use for membership property events. */ private class CredentialListener implements PropertyChangeListener { /** * {@inheritDoc} **/ public void propertyChange(PropertyChangeEvent evt) { if ("defaultCredential".equals(evt.getPropertyName())) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("New default credential event"); } synchronized (PipeResolver.this) { credential = (Credential) evt.getNewValue(); credentialDoc = null; if (null != credential) { try { credentialDoc = credential.getDocument(MimeMediaType.XMLUTF8); } catch (Exception all) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Could not generate credential document", all); } } } } } } } /** * A pipe resolver event. **/ static class Event extends EventObject { private PeerID peerid = null; private PipeID pipeid = null; private String type = null; private int queryID = -1; /** * Creates a new pipe resolution event * * @param source The PipeResolver generating the event. * @param peerid The peer on which the pipe was found * @param pipeid the pipe which was found * @param type the type of pipe which was found * @param queryid The query id associated with the response returned in this event **/ public Event(Object source, PeerID peerid, PipeID pipeid, String type, int queryid) { super(source); this.peerid = peerid; this.pipeid = pipeid; this.type = type; this.queryID = queryid; } /** * Returns the peer associated with the event * * @return peerid **/ public PeerID getPeerID() { return peerid; } /** * Returns the pipe associated with the event * * @return pipeid **/ public PipeID getPipeID() { return pipeid; } /** * Returns the type of the pipe that is associated with the event * * @return type **/ public String getType() { return type; } /** * Returns The query id associated with the response returned in this event * * @return query id associated with the response **/ public int getQueryID() { return queryID; } } /** * Pipe Resolver Event Listener. Implement this interface is you wish to * Receive Pipe Resolver events. **/ interface Listener extends EventListener { /** * Pipe Resolve Event * * @param PipeResolverEvent event the PipeResolver Event * @return true if the event was handled otherwise false **/ boolean pipeResolveEvent(Event event); /** * A NAK Event was received for this pipe * * @param PipeResolverEvent event the PipeResolver Event * @return true if the event was handled otherwise false **/ boolean pipeNAKEvent(Event event); } /** * return the next query id. * * @return the next eligible query id. **/ static synchronized int getNextQueryID() { currentQueryID++; if (currentQueryID == Integer.MAX_VALUE) currentQueryID = 1; return currentQueryID; } /** * Constructor for the PipeResolver object * * @param g group for which this PipeResolver operates in **/ PipeResolver(PeerGroup g) { myGroup = g; resolver = myGroup.getResolverService(); membership = myGroup.getMembershipService(); // Register to the Generic ResolverServiceImpl resolver.registerHandler(PipeResolverName, this); // start srdi srdiIndex = new SrdiIndex(myGroup, srdiIndexerFileName, GcDelay); srdi = new Srdi(myGroup, PipeResolverName, this, srdiIndex, 2 * TimeUtils.AMINUTE, 1 * TimeUtils.AYEAR); srdiThread = new Thread(myGroup.getHomeThreadGroup(), srdi, "Pipe Resolver Srdi Thread"); srdiThread.setDaemon(true); srdiThread.start(); resolver.registerSrdiHandler(PipeResolverName, this); synchronized (this) { try { credential = membership.getDefaultCredential(); if (null != credential) { credentialDoc = credential.getDocument(MimeMediaType.XMLUTF8); } else { credentialDoc = null; } } catch (Exception all) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("could not get credential", all); } } membershipCredListener = new CredentialListener(); membership.addPropertyChangeListener("defaultCredential", membershipCredListener); } } /** * {@inheritDoc} **/ public int processQuery(ResolverQueryMsg query) { return processQuery(query, null); } /** * {@inheritDoc} **/ public int processQuery(ResolverQueryMsg query, EndpointAddress srcAddr) { String queryFrom; if (null != srcAddr) { if ("jxta".equals(srcAddr.getProtocolName())) { queryFrom = ID.URIEncodingName + ":" + ID.URNNamespace + ":" + srcAddr.getProtocolAddress(); } else { // we don't know who routed us the query. Assume it came from the source. queryFrom = query.getSrc(); } } else { // we don't know who routed us the query. Assume it came from the source. queryFrom = query.getSrc(); } String responseDest = query.getSrc(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Starting for :" + query.getQueryId() + " from " + srcAddr); } Reader queryReader = new StringReader(query.getQuery()); StructuredTextDocument doc = null; try {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -