📄 rendezvousserviceimpl.java
字号:
new MimeMediaType("text", "xml"), "Parm");
Element e = params.createElement("Rdv", "true");
params.appendChild(e);
((PeerAdvertisement) localPeerAdv).putServiceParam(
assignedID, params);
} catch(Exception noSweat) {
// the name says it all
}
// Get an rdv adv published.
((RdvMonitor) monitor).startRdv();
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("RendezVousService is started");
return;
}
/**
* Start the local peer as a RendezVousService peer with the default rdv manager.
*/
public synchronized void startRendezVous() {
if (isRendezVous) return;
try {
startRendezVous(new RdvManager(group));
} catch (Exception impossible) {
}
}
/**
* Stop the RendezVousService function on the local Peer. All connected Peer are
* disconnected.
*
* @throws IOException if the local peer was not a RendezVousService peer.
*/
public synchronized void stopRendezVous() {
if (!isRendezVous) {
return;
}
// Update the peeradv with that information:
// (Easy, the only thing we have is that flag, so remove all of it.)
((PeerAdvertisement) localPeerAdv).removeServiceParam(assignedID);
isRendezVous = false;
deleteLocalEndpoint();
clients = null;
}
/**
*Description of the Method
*
* @param msg Description of Parameter
* @param srcAddr Description of Parameter
* @param dstAddr Description of Parameter
*/
public void processIncomingMessage(
Message msg,
EndpointAddress srcAddr,
EndpointAddress dstAddr
) {
if (LOG.isDebugEnabled()) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("demux");
}
try {
if (msg.hasElement(ConnectRequest)) {
processConnectRequest(msg);
}
if (msg.hasElement(DisconnectRequest)) {
processDisconnectRequest(msg);
}
if (msg.hasElement(RdvAdvReply)) {
processRdvAdvReply(msg);
}
if (msg.hasElement(ConnectedPeerReply)) {
processConnectedReply(msg);
}
if (msg.hasElement(PingRequest)) {
processPingRequest(msg);
}
if (msg.hasElement(PingReply)) {
processPingReply(msg);
}
} catch (Exception e) {
}
}
// This class is a wrapper handler used to do all propagation
// control on messages incoming through the rendezvous service
private class PropagateListener implements EndpointListener {
private RendezVousServiceImpl rdv = null;
public PropagateListener(RendezVousServiceImpl rdv) {
this.rdv = rdv;
}
public void processIncomingMessage(Message message,
EndpointAddress srcAddr,
EndpointAddress dstAddr) {
MessageElement el = null;
RendezVousPropagateMessage propHdr = null;
try {
el = message.getElement(headerName);
if (el == null) {
// XXX-lomax@jxta.or: BACK COMPATIBILITY December 14th 2001
// We need to also associate a Message Filter with the following name
// for backward compatibility reasons: the name of the message element
// is changed to now include the group id. The following code allows to also
// process messages from older versions of this protocol. It should be removed
// when the older version will no longer be supported (probably the next release).
// Try to see if we have the old version Propagate message.
el = message.getElement(RendezVousPropagateMessage.Name);
// END OF COMPATIBILITY FIX
if (el == null) {
// There is really no header. Discard message
return;
}
}
propHdr = new RendezVousPropagateMessage(el.getStream());
} catch(Exception neverMind) {
// No header. Discard the message
return;
}
// If the message does not have a message id, create one now.
// This is a safety guard.
if (propHdr.getMsgId() == null) {
// Improper message. Drop it
if (LOG.isDebugEnabled()) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug ("Received a message without msgid. Drop it.");
}
return;
}
// Get the destination real destination of the message
String sName = propHdr.getDestSName();
String sParam = propHdr.getDestSParam();
// Check if we have a local listener for this message
rdv.processReceivedMessage (message,
propHdr.getMsgId(),
sName,
sParam,
srcAddr,
dstAddr);
}
}
protected void processReceivedMessage (Message message,
String msgId,
String sName,
String sParam,
EndpointAddress srcAddr,
EndpointAddress dstAddr) {
if (LOG.isDebugEnabled()) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug ("processReceivedMessage starts");
}
EndpointListener listener = (EndpointListener) listeners.get (sName + sParam);
if (listener != null) {
// We have a local listener for this message.
// Deliver it.
if (LOG.isDebugEnabled()) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug ("Local listener is called");
}
// Rebuild the address
EndpointAddress realDst = endpoint.newEndpointAddress (dstAddr.toString());
realDst.setServiceName (sName);
realDst.setServiceParameter (sParam);
// Add the message id into our local cache
// Since the message is going to be locally delivered to services
// that may want to repropagate it, we must remember it has already been here.
if (isMsgIdRecorded(msgId)) {
// Got that one already. Discard.
return;
}
addMsgId(msgId);
listener.processIncomingMessage(message, srcAddr, realDst);
} else {
// We do not have a local listener. Reprogate it.
addMsgId(msgId);
if (LOG.isDebugEnabled()) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug ("No local listener - repropagate");
}
rePropagateInGroup (message);
}
}
/**
* Clients of the rendezvous service can use this to receive raw
* propagation rather than crawling query/responses. It takes care of TTL
* counting and loop detection.
*
* @param name The name of the listener.
* @param listener An EndpointListener to process the message.
* @exception IOException Description of Exception
*/
public synchronized void addPropagateListener(String name,
EndpointListener listener)
throws IOException {
listeners.put(name, listener);
try {
endpoint.addListener (name, listener);
} catch (Exception ez1) {
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn ("Cannot connect listener " + name, ez1);
}
}
/**
* Removes a Listener previously added with addPropagateListener.
*
* @param name The name of the listener.
* @param listener An EndpointListener to process the message.
* @exception IOException Description of Exception
*/
public synchronized void removePropagateListener(String name,
EndpointListener listener)
throws IOException {
EndpointListener myListener = (EndpointListener) listeners.get(name);
if (myListener == null) {
throw new IOException();
}
listeners.remove(name);
}
/**
* Add a listener for RenDezVousEvents
*
* @param listener An RendezvousListener to process the event.
*/
public void addListener(RendezvousListener listener) {
applisteners.addElement(listener);
}
/**
* Removes a Listener previously added with addListener.
*
* @param listener the RendezvousListener listener remove
* @return Description of the Returned Value
*/
public boolean removeListener(RendezvousListener listener) {
return (applisteners.removeElement(listener));
}
public void propagate (Message msg,
String serviceName,
String serviceParam,
int defaultTTL)
throws IOException
{
if (defaultTTL == 1)
propagateToNeighbors (msg, serviceName,
serviceParam, defaultTTL, null);
else
propagateInGroup (msg, serviceName,
serviceParam, defaultTTL, null);
}
/*
* Propagation with TTL.
* The methods that follow provide an abstract propagation that may
* use point-to-point communication through rendezvous or direct
* propagation on the network, or both.
* The caller only specifies which scope is desired (neighbors or all
* group). The strategy actualy used is not specified at the API level,
* only whether an effort is made at reaching farther, or not.
*
*/
/**
* This API is provided to the Core implementation in order to
* propagate a message onto as many peers on the local network
* as possible. Typically the message will go to all the peers to
* which at least one endpoint transport can address without using
* the router.
*
* Only a single HOP at a time is performed. Messages are always
* delivered to the destination handler on arrival. This handler
* is responsible for repropagating further, if deemed appropropriate.
*
* Loop and TTL control are performed automatically.
*
* Messages can be propagated via this method for the first time or
* can be re-propagated by re-using a message that came in via propagation.
* In the later case, the TTL and loop detection parameters CANNOT be
* re-initialized. If one wants to "re-propagate" a message with a new TTL
* and blank gateways list one must generate a completely new message.
* This limits the risk of accidental propagation storms, although they
* can always be engineered deliberately.
*
* Note: The original msg is not modified and may be reused upon return.
*
* @param msg is the message to propagate.
* @param serviceName is the name of the service
* @param serviceParam is the parameter of the service
* @param prunePeer is a peer to prune in the propagation.
* @param defaultTTL Description of Parameter
* @exception IOException Description of Exception
*/
public void propagateToNeighbors(Message msg,
String serviceName,
String serviceParam,
int defaultTTL,
String prunePeer)
throws IOException
{
// Do not touch the original.
Message dupMsg = (Message) msg.clone();
// If this fails it's fatal. We just let it bail out.
if (updatePropHeader(dupMsg, defaultTTL, serviceName, serviceParam)) {
// Now that the message is safe we'll try to propagate it.
// It may fail silently. Propagation is never garanteed.
sendToNetwork(dupMsg, PropSName, PropPName);
}
}
/**
* This API is provided to the Core implementation in order to
* propagate a message onto as many peers in the group as possible.
*
* Only a single HOP at a time is performed. Messages are always
* delivered to the destination handler on arrival. This handler
* is responsible for repropagating further, if deemed appropropriate.
*
* Loop and TTL control are performed automatically.
*
* Messages can be propagated via this method for the first time or
* can be re-propagated by re-using a message that came in via propagation.
* In the later case, the TTL and loop detection parameters CANNOT be
* re-initialized. If one wants to "re-propagate" a message with a new TTL
* and blank gateways list one must generate a completely new message.
* This limits the risk of accidental propagation storms, although they
* can always be engineered deliberately.
*
* Note: The original msg is not modified and may be reused upon return.
*
* @param msg is the message to propagate.
* @param serviceName is the name of the service
* @param serviceParam is the parameter of the service
* @param defaultTTL Description of Parameter
* @exception IOException Description of Exception
*/
public void propagateInGroup( Message msg, String serviceName,
String serviceParam, int defaultTTL,
String prunePeer)
throws IOException {
// Do not touch the original.
Message dupMsg = (Message) msg.clone();
// If this fails it's fatal. We just let it bail out.
if (updatePropHeader(dupMsg, defaultTTL, serviceName, serviceParam)) {
// Now that the message is safe we'll try to propagate it.
// It may fail silently. Propagation is never garanteed.
sendToNetwork(dupMsg, PropSName, PropPName);
sendToEachRendezVous(dupMsg, PropSName, PropPName);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -