📄 rendezvousserviceimpl.java
字号:
String peerId = adv.getPeerID().toString();
if (peerId.equals(localPeerId)) {
// No connection to itself
return;
}
// Try to get an EndpointMessenger to that peer.
// First publish the advertisement
try {
// This is not our own peer adv so we must not keep it
// longer than its expiration time.
group.getDiscoveryService().publish(adv,
DiscoveryService.PEER,
DiscoveryService.DEFAULT_EXPIRATION,
DiscoveryService.DEFAULT_EXPIRATION);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" publish failed with " + e);
}
// Make sure that EndpointService has also the document
// This is not necessary, especially since the parent group is very likely
// to have the peer advertisement already, and if it doesn't, it will query for it,
// but this optimization makes sure that the local peer will not have to query.
publishInParentGroup(adv);
// Check that the EndpointService can talk to that peer
EndpointAddress endpointAddress = mkAddress(peerId);
if (!endpoint.ping(endpointAddress)) {
throw new IOException("Unable to ping address " + endpointAddress);
}
endpointAddress = mkAddress(peerId, pName, pParam);
EndpointMessenger messenger = endpoint.getMessenger(endpointAddress);
if (messenger == null) {
throw new IOException("Unable to create endpoint messenger to " + endpointAddress);
}
connectToRendezVous(messenger);
return;
}
/**
* Connect to a rendezvous described by an endpoint address.
*
* NB: The address is modified: the serviceName memeber is set to
* "RendezvousService" and the serviceParameter member is nillified.
*
* @param addr The address.
* @exception IOException Description of Exception
*/
public void connectToRendezVous(EndpointAddress addr)
throws IOException {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("connectToRendezVous with EndpointAddress");
if (addr == null) {
if (LOG.isEnabledFor(Priority.WARN))
LOG.warn("bad endpoint address");
return;
}
if (LOG.isEnabledFor(Priority.DEBUG)) {
try {
LOG.debug(" to = " + addr.getProtocolName() + "://" + addr.getProtocolAddress());
} catch (Exception e) { }
}
String proto = addr.getProtocolName();
if (!proto.equals("jxta")) {
// Check if this address is reachable
if (!endpoint.ping(addr)) {
throw new IOException("Cannot ping rendezvous at " + addr);
}
} else {
String peerId = addr.getProtocolName() + "://" + addr.getProtocolAddress();
if (peerId.equals(localPeerAddr)) {
// No connection to itself
return;
}
}
addr.setServiceName(pName);
addr.setServiceParameter(pParam);
try {
EndpointMessenger messenger = endpoint.getMessenger(addr);
if (messenger == null) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("no messenger");
throw new IOException("no messenger for address " + addr);
}
connectToRendezVous(messenger);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("connectToRendezVous failed", e);
throw new IOException("connectToRendezVous at " + addr + " failed");
}
return;
}
/**
*Description of the Method
*
* @param peer Description of Parameter
*/
public void reconnectToRendezVous(String peer) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("reconnectToRendezVous to " + peer);
if (peer.equals(localPeerAddr)) {
return;
}
try {
EndpointAddress endpointAddress = mkAddress(peer, pName, pParam);
EndpointMessenger messenger = endpoint.getMessenger(endpointAddress);
if (messenger == null) {
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("Could not get messenger for " + endpointAddress + "; aborting");
throw new IOException("Could not get messenger for " + endpointAddress + "; aborting");
}
connectToRendezVous(messenger);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("reconnecteRendezVous failed ", e);
}
return;
}
/**
* Remove a RendezVousService point.
*
* @param rdv Description of Parameter
* @throws IOException if rendezVous is not connected.
*/
public void disconnectFromRendezVous(PeerID rdv) {
if (!isClient || (rdv == null)) {
// Sanity check
return;
}
String peerId = rdv.toString();
// First try to get an EndpointMessenger to that peer.
try {
EndpointMessenger messenger =
endpoint.getMessenger(mkAddress(peerId,
pName,
pParam));
if (messenger != null) {
disconnectFromRendezVous(messenger);
}
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("Error getting messenger to disconnect from rdv " +peerId, e);
}
removeRdv(peerId);
if (rendezVous.size() == 0) {
// No more rendezVous
rendezVous = null;
isClient = false;
}
return;
}
/**
* Returns an Enumeration of the PeerID all the RendezVousService on which this
* Peer is currentely connected.
*
* @return Description of the Returned Value
*/
public synchronized Enumeration getConnectedRendezVous() {
Vector result = new Vector();
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("getConnectedRendezVous");
if (!isClient) {
// Sanity check
return result.elements();
}
if ((rendezVous == null) || (rendezVous.size() == 0)) {
return result.elements();
}
for (int i = 0; i < rendezVous.size(); ++i) {
try {
PeerConnection pConn = (PeerConnection) rendezVous.elementAt(i);
String peerId = pConn.getPeer();
net.jxta.id.ID id = IDFactory.fromURL(IDFactory.jxtaURL(peerId));
result.addElement(id);
} catch (Exception ignored) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("getConnectedRendezVous failed to construct peerid ", ignored);
}
}
return result.elements();
}
/**
* Returns an Enumeration of the PeerID all the RendezVousService on which this
* Peer failed to connect to.
*
* @return Description of the Returned Value
*/
public synchronized Enumeration getDisconnectedRendezVous() {
Vector result = new Vector();
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("getDisconnectedRendezVous");
if (!isClient) {
// Sanity check
return result.elements();
}
if (removedRendezVous.size() == 0) {
return result.elements();
}
for (int i = 0; i < removedRendezVous.size(); ++i) {
try {
PeerConnection pConn = (PeerConnection)
removedRendezVous.elementAt(i);
String peerId = pConn.getPeer();
net.jxta.id.ID id = IDFactory.fromURL(IDFactory.jxtaURL(peerId));
result.addElement(id);
} catch (Exception ignored) {
}
}
return result.elements();
}
/**
* Sends advertisement about other RendezVousService to a given peer
*
* @param destPeer is the advertisement of the peer to which to
* send the RendezVousService advertisement.
* @param rdv Description of Parameter
*/
public void sendRendezVousAdv(PeerAdvertisement destPeer,
PeerAdvertisement rdv) {
// Try to get an EndpointMessenger to that peer.
// First publish the advertisement
try {
// This is not our own peer adv so we must not keep it
// longer than its expiration time.
group.getDiscoveryService().publish(destPeer,
DiscoveryService.PEER,
DiscoveryService.DEFAULT_EXPIRATION,
DiscoveryService.DEFAULT_EXPIRATION);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("publish failed with ", e);
}
// Make sure that EndpointService has also the document
publishInParentGroup(destPeer);
String peer = destPeer.getPeerID().toString();
// Check that the EndpointService can talk to that peer
if (!endpoint.ping(endpoint.newEndpointAddress(peer))) {
return;
}
try {
EndpointMessenger messenger =
endpoint.getMessenger(mkAddress(peer,
pName,
pParam));
if (messenger == null) {
return;
}
Message msg = endpoint.newMessage();
msg.addElement(msg.newMessageElement(RdvAdvReply,
textXml,
rdv.getDocument(textXml).getStream()));
messenger.sendMessage(msg);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("Error during send to rdv " , e);
}
}
/**
* Start the local peer as a RendezVousService peer. The caller must provide
* a handler to an authorization manager.
*
* @param handler is the RendezVousManager that will be invoked each
* time a new Peer request to be connected.
* @throws IOException when a handler has already been connected
*/
public synchronized void startRendezVous(RendezVousManager handler)
throws IOException {
if ((isRendezVous) || (handler == null)) {
// We are already a rendez vous. Throw an exception
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("startRendezvous: already a rendezvous");
throw new IOException("I'm already a rendezvous.");
}
manager = handler;
clients = new Vector();
createLocalEndpoint();
isRendezVous = true;
// Update the peeradv with that information:
try {
StructuredTextDocument params = (StructuredTextDocument)
StructuredDocumentFactory.newStructuredDocument(
textXml, "Parm");
Element e = params.createElement("Rdv", "true");
params.appendChild(e);
((PeerAdvertisement) localPeerAdv).putServiceParam(
assignedID, params);
} catch(Exception noSweat) {
// the name says it all
}
// Get an rdv adv published.
((RdvMonitor) monitor).startRdv();
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("RendezVousService is started");
return;
}
/**
* Start the local peer as a RendezVousService peer with the default rdv manager.
*/
public synchronized void startRendezVous() {
if (isRendezVous) return;
try {
startRendezVous(new RdvManager(group));
} catch (Exception impossible) {
}
}
/**
* Stop the RendezVousService function on the local Peer. All connected Peer are
* disconnected.
*
* @throws IOException if the local peer was not a RendezVousService peer.
*/
public synchronized void stopRendezVous() {
if (!isRendezVous) {
return;
}
// Update the peeradv with that information:
// (Easy, the only thing we have is that flag, so remove all of it.)
((PeerAdvertisement) localPeerAdv).removeServiceParam(assignedID);
isRendezVous = false;
deleteLocalEndpoint();
clients = null;
}
/**
*Description of the Method
*
* @param msg Description of Parameter
* @param srcAddr Description of Parameter
* @param dstAddr Description of Parameter
*/
public void processIncomingMessage(Message msg,
EndpointAddress srcAddr,
EndpointAddress dstAddr ) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("demux");
if (msg.hasElement(ConnectRequest)) {
processConnectRequest(msg);
}
if (msg.hasElement(DisconnectRequest)) {
processDisconnectRequest(msg);
}
if (msg.hasElement(RdvAdvReply)) {
processRdvAdvReply(msg);
}
if (msg.hasElement(ConnectedPeerReply)) {
processConnectedReply(msg);
}
if (msg.hasElement(PingRequest)) {
processPingRequest(msg);
}
if (msg.hasElement(PingReply)) {
processPingReply(msg);
}
}
// This class is a wrapper handler used to do all propagation
// control on messages incoming through the rendezvous service
private class PropagateListener implements EndpointListener {
private RendezVousServiceImpl rdv = null;
public PropagateListener(RendezVousServiceImpl rdv) {
this.rdv = rdv;
}
public void processIncomingMessage(Message message,
EndpointAddress srcAddr,
EndpointAddress dstAddr) {
MessageElement el = null;
RendezVousPropagateMessage propHdr = null;
try {
el = message.getElement(headerName);
if (el == null) {
// XXX-lomax@jxta.or: BACK COMPATIBILITY December 14th 2001
// We need to also associate a Message Filter with the following name
// for backward compatibility reasons: the name of the message element
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -