📄 rendezvousserviceimpl.java
字号:
// is changed to now include the group id. The following code allows to also
// process messages from older versions of this protocol. It should be removed
// when the older version will no longer be supported (probably the next release).
// Try to see if we have the old version Propagate message.
el = message.getElement(RendezVousPropagateMessage.Name);
// END OF COMPATIBILITY FIX
if (el == null) {
// There is really no header. Discard message
return;
}
}
propHdr = new RendezVousPropagateMessage(el.getStream());
} catch(Exception neverMind) {
// No header. Discard the message
return;
}
// If the message does not have a message id, create one now.
// This is a safety guard.
if (propHdr.getMsgId() == null) {
// Improper message. Drop it
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("Received a message without msgid. Drop it.");
}
return;
}
// Get the destination real destination of the message
String sName = propHdr.getDestSName();
String sParam = propHdr.getDestSParam();
// Check if we have a local listener for this message
rdv.processReceivedMessage(message,
propHdr.getMsgId(),
sName,
sParam,
srcAddr,
dstAddr);
}
}
protected void processReceivedMessage(Message message,
String msgId,
String sName,
String sParam,
EndpointAddress srcAddr,
EndpointAddress dstAddr) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("processReceivedMessage starts");
EndpointListener listener = (EndpointListener) listeners.get(sName + sParam);
if (listener != null) {
// We have a local listener for this message.
// Deliver it.
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("Local listener is called");
// Rebuild the address
EndpointAddress realDst = endpoint.newEndpointAddress(dstAddr.toString());
realDst.setServiceName(sName);
realDst.setServiceParameter(sParam);
// Add the message id into our local cache
// Since the message is going to be locally delivered to services
// that may want to repropagate it, we must remember it has already been here.
if (isMsgIdRecorded(msgId)) {
// Got that one already. Discard.
return;
}
addMsgId(msgId);
listener.processIncomingMessage(message, srcAddr, realDst);
} else {
// We do not have a local listener. Reprogate it.
addMsgId(msgId);
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("No local listener - repropagate");
}
rePropagateInGroup(message);
}
}
/**
* Clients of the rendezvous service can use this to receive raw
* propagation rather than crawling query/responses. It takes care of TTL
* counting and loop detection.
*
* @param name The name of the listener.
* @param listener An EndpointListener to process the message.
* @exception IOException Description of Exception
*/
public synchronized void addPropagateListener(String name,
EndpointListener listener) throws IOException {
listeners.put(name, listener);
try {
endpoint.addListener(name, listener);
} catch (Exception ez1) {
if (LOG.isEnabledFor(Priority.WARN))
LOG.warn("Cannot connect listener " + name, ez1);
}
}
/**
* Removes a Listener previously added with addPropagateListener.
*
* @param name The name of the listener.
* @param listener An EndpointListener to process the message.
* @exception IOException Description of Exception
*/
public synchronized void removePropagateListener(String name,
EndpointListener listener) throws IOException {
EndpointListener myListener = (EndpointListener) listeners.get(name);
if (myListener == null) {
throw new IOException("No propagate listener named \"" + name + "\"");
}
listeners.remove(name);
}
/**
* Add a listener for RenDezVousEvents
*
* @param listener An RendezvousListener to process the event.
*/
public void addListener(RendezvousListener listener) {
applisteners.addElement(listener);
}
/**
* Removes a Listener previously added with addListener.
*
* @param listener the RendezvousListener listener remove
* @return Description of the Returned Value
*/
public boolean removeListener(RendezvousListener listener) {
return (applisteners.removeElement(listener));
}
public void propagate(Message msg,
String serviceName,
String serviceParam,
int defaultTTL)
throws IOException {
if (defaultTTL == 1)
propagateToNeighbors(msg, serviceName,
serviceParam, defaultTTL, null);
else
propagateInGroup(msg, serviceName,
serviceParam, defaultTTL, null);
}
/*
* Propagation with TTL.
* The methods that follow provide an abstract propagation that may
* use point-to-point communication through rendezvous or direct
* propagation on the network, or both.
* The caller only specifies which scope is desired (neighbors or all
* group). The strategy actualy used is not specified at the API level,
* only whether an effort is made at reaching farther, or not.
*
*/
/**
* This API is provided to the Core implementation in order to
* propagate a message onto as many peers on the local network
* as possible. Typically the message will go to all the peers to
* which at least one endpoint transport can address without using
* the router.
*
* Only a single HOP at a time is performed. Messages are always
* delivered to the destination handler on arrival. This handler
* is responsible for repropagating further, if deemed appropropriate.
*
* Loop and TTL control are performed automatically.
*
* Messages can be propagated via this method for the first time or
* can be re-propagated by re-using a message that came in via propagation.
* In the later case, the TTL and loop detection parameters CANNOT be
* re-initialized. If one wants to "re-propagate" a message with a new TTL
* and blank gateways list one must generate a completely new message.
* This limits the risk of accidental propagation storms, although they
* can always be engineered deliberately.
*
* Note: The original msg is not modified and may be reused upon return.
*
* @param msg is the message to propagate.
* @param serviceName is the name of the service
* @param serviceParam is the parameter of the service
* @param prunePeer is a peer to prune in the propagation.
* @param defaultTTL Description of Parameter
* @exception IOException Description of Exception
*/
public void propagateToNeighbors(Message msg,
String serviceName,
String serviceParam,
int defaultTTL,
String prunePeer) throws IOException {
// Do not touch the original.
Message dupMsg = (Message) msg.clone();
// If this fails it's fatal. We just let it bail out.
if (updatePropHeader(dupMsg, defaultTTL, serviceName, serviceParam)) {
// Now that the message is safe we'll try to propagate it.
// It may fail silently. Propagation is never garanteed.
sendToNetwork(dupMsg, PropSName, PropPName);
}
}
/**
* This API is provided to the Core implementation in order to
* propagate a message onto as many peers in the group as possible.
*
* Only a single HOP at a time is performed. Messages are always
* delivered to the destination handler on arrival. This handler
* is responsible for repropagating further, if deemed appropropriate.
*
* Loop and TTL control are performed automatically.
*
* Messages can be propagated via this method for the first time or
* can be re-propagated by re-using a message that came in via propagation.
* In the later case, the TTL and loop detection parameters CANNOT be
* re-initialized. If one wants to "re-propagate" a message with a new TTL
* and blank gateways list one must generate a completely new message.
* This limits the risk of accidental propagation storms, although they
* can always be engineered deliberately.
*
* Note: The original msg is not modified and may be reused upon return.
*
* @param msg is the message to propagate.
* @param serviceName is the name of the service
* @param serviceParam is the parameter of the service
* @param defaultTTL Description of Parameter
* @exception IOException Description of Exception
*/
public void propagateInGroup(Message msg,
String serviceName,
String serviceParam,
int defaultTTL,
String prunePeer) throws IOException {
// Do not touch the original.
Message dupMsg = (Message) msg.clone();
// If this fails it's fatal. We just let it bail out.
if (updatePropHeader(dupMsg, defaultTTL, serviceName, serviceParam)) {
// Now that the message is safe we'll try to propagate it.
// It may fail silently. Propagation is never garanteed.
sendToNetwork(dupMsg, PropSName, PropPName);
sendToEachRendezVous(dupMsg, PropSName, PropPName);
sendToEachClient(dupMsg, PropSName, PropPName);
}
}
protected void rePropagateInGroup(Message msg) {
try {
// Do not touch the original.
Message dupMsg = (Message) msg.clone();
// If this fails it's fatal. We just let it bail out.
if (updatePropHeader(dupMsg, MaxTTL, null, null)) {
// Now that the message is safe we'll try to propagate it.
// It may fail silently. Propagation is never garanteed.
sendToNetwork(dupMsg, PropSName, PropPName);
sendToEachRendezVous(dupMsg, PropSName, PropPName);
sendToEachClient(dupMsg, PropSName, PropPName);
}
} catch (Exception ez1) {
// Not much we can do
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("Cannot repropagate the message ", ez1);
}
}
/**
* Check if a particular peer has already received the propagated
* message, based on the Propagation header embedded within the message.
*
* @param msg Description of Parameter
* @param peer Description of Parameter
* @return The peerInPropHeader value
*/
private boolean isPeerInPropHeader(Message msg,
String peer) {
MessageElement elem = msg.getElement(headerName);
if (elem == null) {
// XXX-lomax@jxta.or: BACK COMPATIBILITY December 14th 2001
// We need to also associate a Message Filter with the following name
// for backward compatibility reasons: the name of the message element
// is changed to now include the group id. The following code allows to also
// process messages from older versions of this protocol. It should be removed
// when the older version will no longer be supported (probably the next release).
// Try to see if we have the old version Propagate message.
elem = msg.getElement(RendezVousPropagateMessage.Name);
// END OF COMPATIBILITY FIX
if (elem == null) {
// There is really no header.
return false;
}
}
try {
InputStream ip = elem.getStream();
if (ip == null) {
return false;
}
RendezVousPropagateMessage propHdr = null;
propHdr = new RendezVousPropagateMessage(ip);
if (propHdr.hasInPath(peer)) {
return true;
}
return false;
} catch (Exception ez1) {
return false;
}
}
/**
*Description of the Method
*
* @param destPeer Description of Parameter
* @return Description of the Returned Value
*/
private EndpointAddress mkAddress(String destPeer) {
try {
PeerID asID = (PeerID) IDFactory.fromURL(IDFactory.jxtaURL(destPeer));
String asString = "jxta://"
+ asID.getUniqueValue().toString();
EndpointAddress addr = endpoint.newEndpointAddress(asString);
return addr;
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.WARN))
LOG.warn("Invalid peerID string : " + destPeer, e );
return null;
}
}
/**
*Description of the Method
*
* @param destPeer Description of Parameter
* @param serv Description of Parameter
* @param parm Description of Parameter
* @return Description of the Returned Value
*/
private EndpointAddress mkAddress(String destPeer,
String serv,
String parm) {
EndpointAddress addr = mkAddress(destPeer);
addr.setServiceName(serv);
addr.setServiceParameter(parm);
return addr;
}
/**
*Adds a feature to the Rdv attribute of the RendezVousServiceImpl object
*
* @param peer The feature to be added to the Rdv attribute
* @param lease The feature to be added to the Rdv attribute
*/
private synchronized void addRdv(String peer, long lease) {
if (peer == null) {
return;
}
if (!isClient) {
// This is the first rendezVous
rendezVous = new Vector();
isClient = true;
}
PeerConnection tmp = new PeerConnection(group, peer, lease);
// Check if the peer is already registered.
if (rendezVous.contains(tmp)) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug(" already registered");
}
// Already connected, just upgrade the lease
try {
int index = rendezVous.indexOf(tmp);
if (index == -1) {
// This should not happen
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("addRdv: cannot access PeerConnection for " + peer);
} else {
tmp = (PeerConnection) rendezVous.elementAt(index);
long time = System.currentTimeMillis();
if (lease < 0) {
// Forever
tmp.setLease(lease);
} else {
tmp.setLease(time + lease);
}
tmp.connect();
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("Got renewed leased for " + peer);
}
}
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("addRdv has failed [1] ", e);
}
return;
}
tmp.connect();
rendezVous.addElement(tmp);
generateEvent(RendezvousEvent.RDVCONNECT, peer);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -