📄 rendezvousserviceimpl.java
字号:
/*
* $Id: RendezVousServiceImpl.java,v 1.35 2002/06/18 20:46:01 hamada Exp $
*
* Copyright (c) 2001 Sun Microsystems, Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution,
* if any, must include the following acknowledgment:
* "This product includes software developed by the
* Sun Microsystems, Inc. for Project JXTA."
* Alternately, this acknowledgment may appear in the software itself,
* if and wherever such third-party acknowledgments normally appear.
*
* 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA"
* must not be used to endorse or promote products derived from this
* software without prior written permission. For written
* permission, please contact Project JXTA at http://www.jxta.org.
*
* 5. Products derived from this software may not be called "JXTA",
* nor may "JXTA" appear in their name, without prior written
* permission of Sun.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL SUN MICROSYSTEMS OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* =========================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of Project JXTA. For more
* information on Project JXTA, please see
* <http://www.jxta.org/>.
*
* This license is based on the BSD license adopted by the Apache Foundation.
*/
package net.jxta.impl.rendezvous;
import java.io.*;
import java.util.*;
import java.net.*;
import org.apache.log4j.Category;
import org.apache.log4j.Priority;
import net.jxta.peer.*;
import net.jxta.id.*;
import net.jxta.protocol.*;
import net.jxta.rendezvous.*;
import net.jxta.endpoint.*;
import net.jxta.discovery.*;
import net.jxta.service.*;
import net.jxta.document.*;
import net.jxta.peergroup.*;
import net.jxta.impl.peergroup.RefPeerGroup;
import net.jxta.util.*;
import net.jxta.exception.*;
import net.jxta.impl.util.*;
import java.util.Timer;
import java.util.TimerTask;
/**
* This class implements the RendezVousService service
*/
public class RendezVousServiceImpl implements RendezVousService, EndpointListener {
/**
*Description of the Field
*/
public final static int MaxTTL = 10;
private final static Category LOG = Category.getInstance(RendezVousServiceImpl.class.getName());
private final static long GCDelay = 60 * 1000;
private final static String ConnectRequest = "jxta:Connect";
private final static String DisconnectRequest = "jxta:Disconnect";
private final static String ConnectedPeerReply = "jxta:ConnectedPeer";
private final static String ConnectedLeaseReply = "jxta:ConnectedLease";
private final static String ConnectedRdvAdvReply = "jxta:RdvAdvReply";
private final static String PingRequest = "jxta:PingRequest";
private final static String PingReply = "jxta:PingReply";
private final static String RdvAdvReply = "jxta:RdvAdv";
private String pName = null;
private EndpointService endpoint = null;
private ID assignedID = null;
private PeerGroup group = null;
private String gId = null;
private Vector rendezVous = null;
private Vector removedRendezVous = new Vector();
private Vector clients = null;
private RendezVousManager manager = null;
private RendezVousMonitor monitor = null;
private boolean isRendezVous = false;
private boolean configIsRendezvous = false;
private boolean isClient = false;
private boolean endpointCreated = false;
// private long bootTime = 0;
private String pParam = null;
private String headerName = null;
private String localPeerId = null;
private String localPeerAddr = null;
private Advertisement localPeerAdv = null;
private Timer GCTimer = new Timer();
private ModuleImplAdvertisement implAdvertisement = null;
private Hashtable listeners = new Hashtable();
private EndpointFilterListener filterListener = null;
private Vector applisteners = new Vector();
private MimeMediaType textXml = new MimeMediaType("text/xml");
private RendAddrCompactor compactor = null;
private Vector initialCompactorParams = new Vector();
private boolean stopping = false;
private static final String PropSName = "JxtaPropagate";
private String PropPName = null;
private PropagateListener propagateListener = null;
private static final int MaxNbOfStoredIds = 1000; // 50Kbytes
private Vector msgIds = new Vector(MaxNbOfStoredIds);
/**
*Constructor for the RendezVousServiceImpl object
*/
public RendezVousServiceImpl() { }
/**
* Register a notification handler that is called each time a RendezVousService
* peer is not reachable anymore.
*
* @param handler a handler that is called each time a RendezVousService peer is
* is not reachable anymore.
* @return RendezVousMonitor returns, if any, the current RendezVousMonitor
*/
public synchronized RendezVousMonitor setMonitor(RendezVousMonitor handler) {
RendezVousMonitor old = monitor;
monitor = handler;
return old;
}
/**
* Service objects are not manipulated directly to protect usage
* of the service. A Service interface is returned to access the service
* methods.
*
* @return Service public interface of the service
*/
public Service getInterface() {
return new RendezVousServiceInterface(this);
}
/**
* Returns the advertisement for that service.
*
* @return Advertisement the advertisement.
* @since JXTA 1.0
*/
public Advertisement getImplAdvertisement() {
return implAdvertisement;
}
public boolean isRendezVous() {
return isRendezVous;
}
/**
* Returns an Enumeration of PeerID of the peers that are currentely
* connected.
*
* @return The connectedPeers value
*/
public synchronized Enumeration getConnectedPeers() {
Vector result = new Vector();
if (!isRendezVous) {
// Sanity check
return result.elements();
}
if ((clients == null) || (clients.size() == 0)) {
return result.elements();
}
for (int i = 0; i < clients.size(); ++i) {
try {
PeerConnection pConn = (PeerConnection) clients.elementAt(i);
String peerId = pConn.getPeer();
ID id = IDFactory.fromURL(IDFactory.jxtaURL(peerId));
result.addElement(id);
} catch (Exception e) {
}
}
return result.elements();
}
/**
* Gets the rendezvousConnected attribute of the RendezVousServiceImpl object
*
* @return true if connected to a rendezvous, false otherwise
*/
public boolean isConnectedToRendezVous() {
Enumeration enum = getConnectedRendezVous();
if (enum == null) {
return false;
}
return enum.hasMoreElements();
}
/**
* Supply arguments and starts this service if it hadn't started by itself.
*
* Currently this service starts by itself and does not expect
* arguments.
*
* @param arg A table of strings arguments.
* @return int status indication.
*/
public int startApp(String[] arg) {
// The other services may not be fully functional but they're there
// so we can start our subsystems.
// As for us, it does not matter if our methods are called between init
// and startApp().
if (configIsRendezvous) startRendezVous();
compactor = new RendAddrCompactor(group, initialCompactorParams);
// Create our listener and register it to the endpoint
propagateListener = new PropagateListener(this);
try {
endpoint.addListener(PropSName + PropPName, propagateListener);
} catch (Exception ez1) {
// Not much we can do here.
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("Cannot register the propagation listener", ez1);
}
// Start GC
GCTimer.scheduleAtFixedRate( new GCTask(this),
GCDelay,
GCDelay);
return 0;
}
/**
* Ask this service to stop.
*/
public synchronized void stopApp() {
stopping = true;
compactor.shutdown();
stopRendezVous();
endpoint.removeListener(PropSName + PropPName, propagateListener);
endpoint.removeFilterListener(headerName,
filterListener,
true);
}
/**
* Init routine
*
* @param g Description of Parameter
* @param sadv Description of Parameter
*/
public void init(PeerGroup g, ID assignedID, Advertisement impl) {
implAdvertisement = (ModuleImplAdvertisement) impl;
PeerAdvertisement confAdv = (PeerAdvertisement)
g.getConfigAdvertisement();
this.group = g;
this.gId = g.getPeerGroupID().toString();
this.assignedID = assignedID;
endpoint = g.getEndpointService();
// bootTime = System.currentTimeMillis();
pParam = group.getPeerGroupID().toString();
pName = assignedID.toString();
PropPName = pParam;
headerName = RendezVousPropagateMessage.Name + pParam;
localPeerId = group.getPeerID().toString();
localPeerAddr = "jxta"
+ "://"
+ group.getPeerID().getUniqueValue().toString();
// We share it with the group and on occasion we can update it
// the occasion being our status changing from rdv peer to client.
localPeerAdv = group.getPeerAdvertisement();
// Plug in our filter
this.filterListener = new FilterListener();
try {
endpoint.addFilterListener(headerName,
this.filterListener,
true);
// XXX-lomax@jxta.or: BACK COMPATIBILITY December 14th 2001
// We need to also associate a Message Filter with the following name
// for backward compatibility reasons: the name of the message element
// is changed to now include the group id. The following code allows to also
// process messages from older versions of this protocol. It should be removed
// when the older version will no longer be supported (probably the next release).
endpoint.addFilterListener(RendezVousPropagateMessage.Name,
this.filterListener,
true);
// END OF COMPATIBILITY FIX
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("Cannot register the filter: " + e);
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("This peer will not filter potentional loopbacks, expired"
+ " or duplicate messages.");
}
// Get the config. If we do not have a config, we're done; we just keep
// the defaults.
if (confAdv == null) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("RendezVousService is initialized");
return;
}
StructuredTextDocument params = (StructuredTextDocument)
confAdv.getServiceParam(assignedID);
if (params == null) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("RendezVousService is initialized");
return;
}
Enumeration param = params.getChildren("Rdv");
configIsRendezvous =
param.hasMoreElements()
&& "true".equals(((TextElement) param.nextElement()).getTextValue());
param = params.getChildren("Addr");
while (param.hasMoreElements()) {
initialCompactorParams.addElement(
((TextElement) param.nextElement()).getTextValue());
}
if (configIsRendezvous) {
// Update the peeradv with that information:
try {
params = (StructuredTextDocument)
StructuredDocumentFactory.newStructuredDocument(
textXml, "Parm");
Element e = params.createElement("Rdv", "true");
params.appendChild(e);
((PeerAdvertisement) localPeerAdv).putServiceParam(
assignedID, params);
} catch(Exception ohwell) {
// don't worry about it for now. It'll still work.
}
}
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("RendezVousService is initialized");
}
private void publishInParentGroup(Advertisement adv) {
// Publish into the parent group.
RefPeerGroup parent = ((RefPeerGroup) group).getParentGroup();
if (parent == null) {
// No parent... nothing to do.
return;
}
DiscoveryService parentDiscovery = parent.getDiscoveryService();
if (parentDiscovery == null) {
if (LOG.isEnabledFor(Priority.WARN))
LOG.warn("Cannot access parent's DiscoveryService Service");
return;
}
try {
// This is not our own peer adv so we must not keep it
// longer than its expiration time.
parentDiscovery.publish(adv, DiscoveryService.PEER,
DiscoveryService.DEFAULT_EXPIRATION,
DiscoveryService.DEFAULT_EXPIRATION);
} catch (IOException e) {
if (LOG.isEnabledFor(Priority.WARN))
LOG.warn("Cannot locally publish advertisementin parent group");
}
}
/**
* This portion of the API is for the peers that are connected
* to a RendezVousService peer.
*
* Add a peer as a new RendezVousService point.
*
* @param adv Description of Parameter
* @exception IOException Description of Exception
* @throws IOException when the RendezVousService peer is not reachable
*/
public void connectToRendezVous(PeerAdvertisement adv)
throws IOException {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("connectToRendezVous with advertisement");
if (adv == null) {
// Sanity check
return;
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -