📄 rendezvousserviceimpl.java
字号:
// let's remove it the removed list
if (removedRendezVous.contains(tmp)) {
removedRendezVous.remove(tmp);
}
return;
}
/**
*Description of the Method
*
* @param peer Description of Parameter
*/
private synchronized void removeRdv(String peer) {
if (peer == null) {
return;
}
PeerConnection tmp = new PeerConnection(group, peer, 0);
// Check if the peer is already registered.
if (!rendezVous.contains(tmp)) {
return;
}
tmp.close();
rendezVous.removeElement(tmp);
//let's add it to the list of removed rendezvous
removedRendezVous.addElement(tmp);
generateEvent(RendezvousEvent.RDVDISCONNECT, peer);
return;
}
/**
* This portion is for peers that are RendezVousService
*
* @param peer The feature to be added to the Client attribute
* @param lease The feature to be added to the Client attribute
*/
private synchronized void addClient(String peer, long lease) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("addClient starts");
}
if (peer == null) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("no peer");
return;
}
if (!isRendezVous) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("is not a rendezvous");
return;
}
PeerConnection tmp = new PeerConnection(group, peer, lease);
// Check if the peer is already registered.
if (clients.contains(tmp)) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" already registered");
// Already connected, just upgrade the lease
try {
int index = clients.indexOf(tmp);
if (index == -1) {
// This should not happen
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("addClient: cannot access PeerConnection for " + peer);
} else {
tmp = (PeerConnection) clients.elementAt(index);
long time = System.currentTimeMillis();
if (lease < 0) {
// Forever
tmp.setLease(lease);
} else {
tmp.setLease( time + lease);
}
tmp.connect();
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("Renewed leased for " + peer);
}
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("addClient has failed [1] ", e);
}
return;
}
tmp.connect();
clients.addElement(tmp);
generateEvent(RendezvousEvent.CLIENTCONNECT, peer);
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" done");
return;
}
/**
*Description of the Method
*
* @param peer Description of Parameter
*/
private synchronized void removeClient(String peer) {
if (peer == null) {
return;
}
PeerConnection tmp = new PeerConnection(group, peer, 0);
// Check if the peer is already registered.
if (!clients.contains(tmp)) {
return;
}
tmp.close();
clients.removeElement(tmp);
generateEvent(RendezvousEvent.CLIENTDISCONNECT, peer);
return;
}
/**
* This portion of the code is common to both RendezVousService and Clients
*
* It defines the protocol between both ends
*
* @exception IOException Description of Exception
*/
private void createLocalEndpoint() throws IOException {
if (endpointCreated) {
return;
}
try {
// Enable our EndpointService receiving end
endpoint.addListener(pName + pParam, this);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
// We must convert the exception because this entire
// service assumes everything is an IOException.
throw new IOException(e.getMessage());
}
endpointCreated = true;
}
/**
*Description of the Method
*/
private void deleteLocalEndpoint() {
if (!endpointCreated) {
return;
}
endpoint.removeListener(pName + pParam, this);
endpointCreated = false;
}
/**
*Description of the Method
*
* @param messenger Description of Parameter
* @exception IOException Description of Exception
*/
private void connectToRendezVous(EndpointMessenger messenger)
throws IOException {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("connectToRendezVous begins");
}
if (messenger == null) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" messenger is null");
throw new IOException(" messenger is null");
}
createLocalEndpoint();
Message msg = endpoint.newMessage();
try {
// The request simply includes the local peer advertisement.
InputStream ip = null;
Document doc = null;
doc = localPeerAdv.getDocument(textXml);
ip = doc.getStream();
msg.addElement( msg.newMessageElement(ConnectRequest,
textXml,
ip));
messenger.sendMessage(msg);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("connectToRendezVous failed", e);
throw new IOException("connectToRendezVous failed : " + e.getMessage() );
}
}
/**
*Description of the Method
*
* @param messenger Description of Parameter
* @exception IOException Description of Exception
*/
private void disconnectFromRendezVous(EndpointMessenger messenger)
throws IOException {
if (messenger == null) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" messenger is null");
throw new IOException(" messenger is null");
}
Message msg = endpoint.newMessage();
// The request simply includes the local peer advertisement.
try {
InputStream ip = null;
Document doc = null;
doc = localPeerAdv.getDocument(textXml);
ip = doc.getStream();
msg.addElement(msg.newMessageElement(DisconnectRequest,
textXml,
ip));
messenger.sendMessage(msg);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("disconnectFromRendezVous failed", e);
throw new IOException("disconnectFromRendezVous failed :" + e.getMessage() );
}
}
/**
*Description of the Method
*
* @param msg Description of Parameter
*/
private void processConnectRequest(Message msg) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("processConnectRequest");
}
if ((!isRendezVous) || (manager == null)) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" not a rendezvous");
return;
}
PeerAdvertisement adv = null;
try {
MessageElement elem = msg.getElement(ConnectRequest);
msg.removeElement(ConnectRequest);
adv = (PeerAdvertisement)
AdvertisementFactory.newAdvertisement(textXml, elem.getStream());
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" cannot retrieve advertisment from request");
return;
}
if (adv == null) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" advertisement is null");
return;
}
long lease = manager.requestConnection(adv);
// Send the reply
if (!sendReply(adv.getPeerID().toString(), adv, lease)) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("processConnectRequest. Cannot reach client "
+ adv.getPeerID());
// The client is not reachable. Just drop it
return;
}
if (lease != 0) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" addClient");
addClient(adv.getPeerID().toString(), lease);
}
return;
}
/**
*Description of the Method
*
* @param msg Description of Parameter
*/
private void processDisconnectRequest(Message msg) {
if ((!isRendezVous) || (manager == null)) {
return;
}
PeerAdvertisement adv = null;
try {
adv = (PeerAdvertisement)
AdvertisementFactory.newAdvertisement(textXml,
msg.getElement(DisconnectRequest).getStream());
} catch (Exception e) {
return;
}
if (adv == null) {
return;
}
removeClient(adv.getPeerID().toString());
return;
}
/**
*Description of the Method
*
* @param msg Description of Parameter
*/
private void processConnectedReply(Message msg) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("processConnectedReply");
int size = 0;
InputStream is = msg.getElement(ConnectedRdvAdvReply).getStream();
Advertisement adv = null;
if (is != null) {
try {
adv = AdvertisementFactory.newAdvertisement(textXml, is);
} catch (Exception e) {
}
}
if (adv == null) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" no advertisement");
} else {
// Make sure that EndpointService has also the document
publishInParentGroup(adv);
try {
// This is not our own peer adv so we must not keep it
// longer than its expiration time.
group.getDiscoveryService().publish(adv,
DiscoveryService.PEER,
DiscoveryService.DEFAULT_EXPIRATION,
DiscoveryService.DEFAULT_EXPIRATION);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" publish failed with " + e);
}
}
long lease = 0;
try {
MessageElement el = msg.getElement(ConnectedLeaseReply);
if (el == null) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("missing lease");
return;
}
byte[] buffer = el.getBytesOffset();
int offset = el.getOffset();
int len = el.getLength();
lease = Long.parseLong(new String(buffer, offset, len));
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("lease=" + lease);
}
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" leased failed with " + e);
return;
}
String peer = null;
try {
MessageElement el = msg.getElement(ConnectedPeerReply);
if (el == null) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("missing rdv peer");
return;
}
byte[] buffer = el.getBytesOffset();
peer = new String(el.getBytesOffset(), el.getOffset(), el.getLength());
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("peer=" + peer);
}
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" getting peer failed with " + e);
return;
}
PeerID pId = null;
try {
pId = (PeerID) IDFactory.fromURL(IDFactory.jxtaURL(peer));
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" getting peer advertisement failed with " + e);
return;
}
if (lease == 0) {
if (monitor != null) {
monitor.disconnected(pId);
}
} else {
addRdv(peer, lease);
if (monitor != null) {
monitor.connected(pId, lease);
}
}
}
/**
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -