📄 rendezvousserviceprovider.java
字号:
/* * * $Id: RendezVousServiceProvider.java,v 1.29 2006/06/07 17:40:23 hamada Exp $ * * Copyright (c) 2001-2006 Sun Microsystems, Inc. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in * the documentation and/or other materials provided with the * distribution. * * 3. The end-user documentation included with the redistribution, * if any, must include the following acknowledgment: * "This product includes software developed by the * Sun Microsystems, Inc. for Project JXTA." * Alternately, this acknowledgment may appear in the software itself, * if and wherever such third-party acknowledgments normally appear. * * 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" * must not be used to endorse or promote products derived from this * software without prior written permission. For written * permission, please contact Project JXTA at http://www.jxta.org. * * 5. Products derived from this software may not be called "JXTA", * nor may "JXTA" appear in their name, without prior written * permission of Sun. * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL SUN MICROSYSTEMS OR * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * * ========================================================= * * This software consists of voluntary contributions made by many * individuals on behalf of Project JXTA. For more * information on Project JXTA, please see * <http://www.jxta.org/>. * * This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.impl.rendezvous;import java.net.URI;import java.util.Enumeration;import java.util.Vector;import java.io.IOException;import java.net.URISyntaxException;import org.apache.log4j.Level;import org.apache.log4j.Logger;import net.jxta.document.MimeMediaType;import net.jxta.document.StructuredDocument;import net.jxta.document.StructuredDocumentFactory;import net.jxta.document.XMLDocument;import net.jxta.document.XMLElement;import net.jxta.endpoint.EndpointAddress;import net.jxta.endpoint.EndpointListener;import net.jxta.endpoint.Message;import net.jxta.endpoint.MessageElement;import net.jxta.endpoint.TextDocumentMessageElement;import net.jxta.id.ID;import net.jxta.id.IDFactory;import net.jxta.peergroup.PeerGroup;import net.jxta.protocol.PeerAdvertisement;import net.jxta.impl.id.UUID.UUID;import net.jxta.impl.rendezvous.rendezvousMeter.RendezvousMeter;import net.jxta.impl.rendezvous.rendezvousMeter.RendezvousMeterBuildSettings;import net.jxta.impl.rendezvous.rendezvousMeter.RendezvousServiceMonitor;/** * This abstract class must be extended for all RendezVous Service providers * that are managed by RendezVousServiceImpl. * * <p/>Implementors of providers are responsible for using appropriate * synchronization. The RendezvousServiceImpl provides synchronization control * only only those methods which involve changing the active provider. */public abstract class RendezVousServiceProvider implements EndpointListener { /** * Log4J Category */ private final static Logger LOG = Logger.getLogger(RendezVousServiceProvider.class.getName()); protected static final String PropSName = "JxtaPropagate"; protected static final String MESSAGE_NAMESPACE_NAME = "jxta"; protected final String PropPName; protected final String HEADER_NAME; /** * Maximum TTL we will allow for propagation and repropagation issued by * this peer. */ protected int MAX_TTL; protected final PeerGroup group; protected final RendezVousServiceImpl rdvService; protected boolean closed = false; private PeerAdvertisement cachedPeerAdv = null; private int cachedPeerAdvModCount = -1; private XMLDocument cachedPeerAdvDoc = null; protected RendezvousServiceMonitor rendezvousServiceMonitor = null; protected RendezvousMeter rendezvousMeter = null; /** */ protected RendezVousServiceProvider(PeerGroup g, RendezVousServiceImpl rdvService) { this.group = g; this.rdvService = rdvService; PropPName = group.getPeerGroupID().getUniqueValue().toString(); HEADER_NAME = RendezVousPropagateMessage.Name + PropPName; } /** * {@inheritDoc} * * <p/>EndpointListener for the JxtaPropagate/<peergroup-unique value> */ public void processIncomingMessage(Message msg, EndpointAddress srcAddr, EndpointAddress dstAddr) { RendezVousPropagateMessage propHdr = checkPropHeader(msg); if (null != propHdr) { // Get the destination real destination of the message String sName = propHdr.getDestSName(); String sParam = propHdr.getDestSParam(); if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Processing " + msg + "(" + propHdr.getMsgId() + ") for " + sName + "/" + sParam + " from " + srcAddr); } // Check if we have a local listener for this message processReceivedMessage(msg, propHdr, srcAddr, new EndpointAddress(dstAddr, sName, sParam)); } } protected XMLDocument getPeerAdvertisementDoc() { PeerAdvertisement newPadv = null; synchronized (this) { newPadv = group.getPeerAdvertisement(); int newModCount = newPadv.getModCount(); if ((cachedPeerAdv != newPadv) || (cachedPeerAdvModCount != newModCount)) { cachedPeerAdv = newPadv; cachedPeerAdvModCount = newModCount; } else { newPadv = null; } if (null != newPadv) { cachedPeerAdvDoc = (XMLDocument) cachedPeerAdv.getDocument(MimeMediaType.XMLUTF8); } } return cachedPeerAdvDoc; } /** * Supply arguments and starts this service if it hadn't started by itself. * * <p/>Currently this service starts by itself and does not expect * arguments. */ protected int startApp(String[] arg) { // All propagated messages originated by RendezvousService.propagate are handled by the // rendezvous service before being delivered to their local recipient. // This includes: // messages delivered here via netWorkPropagation. Therefore the rdv service has a special // endpointService listener for that purpose. // messages delivered here by rdv-to-rdv walk. Therefore the rdv service also has a special // "propagateListener" to which messages propagated via walk are addressed. // in both cases the listener object is the same, the method is the same; it's just // registered at the two places through which messages might come in, with a name // appropriate for each. try { // This must stay despite the call to addPropagateListener below. // The somewhat equivalent call done inside addPropagateListener // may be removed in the future and this here would remain the only // case were both a propagate listener and an endpoint listener are connected. if (!rdvService.endpoint.addIncomingMessageListener(this, PropSName, PropPName)) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Cannot register the propagation listener (already registered)"); } } rdvService.addPropagateListener(PropSName + PropPName, this); } catch (Exception ez1) { // Not much we can do here. if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Failed registering the propagation listener", ez1); } } try { // Update the peeradv with our status if (rdvService.isRendezVous()) { XMLDocument params = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(MimeMediaType.XMLUTF8, "Parm"); XMLElement e = (XMLElement) params.createElement("Rdv", Boolean.TRUE.toString()); params.appendChild(e); group.getPeerAdvertisement().putServiceParam(rdvService.getAssignedID(), params); } else { group.getPeerAdvertisement().removeServiceParam(rdvService.getAssignedID()); } } catch (Exception ignored) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Could not update Rdv Params in Peer Advertisement", ignored); } } return 0; } /** * Ask this service to stop. */ protected void stopApp() { EndpointListener shouldbeMe = rdvService.endpoint.removeIncomingMessageListener(PropSName, PropPName); if (this != shouldbeMe) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Unregistered listener was not as expected." + this + " != " + shouldbeMe); } } // Update the peeradv. We are not a rdv. group.getPeerAdvertisement().removeServiceParam(rdvService.getAssignedID()); } /** * Set the RendezvousServiceMonitor, not to be confused with the RendeszousMonitor. * The RendezvousServiceMonitor is used to meter the activity of the RendezvousService * @see net.jxta.impl.meter.MonitorManager * * @param RendezvousServiceMonitor */ public void setRendezvousServiceMonitor(RendezvousServiceMonitor rendezvousServiceMonitor) { if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING) { this.rendezvousServiceMonitor = rendezvousServiceMonitor;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -