rendezvousserviceimpl.java
来自「jxme的一些相关程序,主要是手机上程序开发以及手机和计算机通信的一些程序资料,」· Java 代码 · 共 2,201 行 · 第 1/5 页
JAVA
2,201 行
*Description of the Method
*
* @param msg Description of Parameter
*/
private void processRdvAdvReply(Message msg) {
if (monitor == null) {
return;
}
Advertisement adv = null;
try {
adv = AdvertisementFactory.newAdvertisement(textXml,
msg.getElement(RdvAdvReply).getStream());
} catch (Exception e) {
return;
}
if (adv == null) {
return;
}
monitor.discovered(adv);
return;
}
/**
*Description of the Method
*
* @param peer Description of Parameter
* @param adv Description of Parameter
* @param lease Description of Parameter
* @return Description of the Returned Value
*/
private boolean sendReply(String peer, Advertisement adv, long lease) {
localPeerAdv = group.getPeerAdvertisement();
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("sendReply");
}
// Try to get an EndpointMessenger to that peer.
// First publish the advertisement
try {
// This is not our own peer adv so we must not keep it
// longer than its expiration time.
group.getDiscoveryService().publish(adv,
DiscoveryService.PEER,
DiscoveryService.DEFAULT_EXPIRATION,
DiscoveryService.DEFAULT_EXPIRATION);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" publish failed with " + e);
}
// Make sure that EndpointService has also the document
publishInParentGroup(adv);
ID asID;
EndpointAddress addr;
try {
addr = mkAddress(peer,
pName,
pParam);
} catch (Exception caught) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" malformed peer id " + peer);
return false;
}
// Check that the EndpointService can talk to that peer
if (!endpoint.ping(addr)) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" cannot get route to peer " + addr);
return false;
}
EndpointMessenger messenger = null;
try {
messenger = endpoint.getMessenger(addr);
if (messenger == null) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" no messenger");
return false;
}
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" getting messenger failed with ", e);
return false;
}
Message msg = endpoint.newMessage();
ByteArrayInputStream ipPeer =
new ByteArrayInputStream(localPeerId.getBytes());
ByteArrayInputStream ipLease =
new ByteArrayInputStream(Long.toString(lease).getBytes());
try {
InputStream ip = null;
Document doc = null;
doc = localPeerAdv.getDocument(textXml);
ip = doc.getStream();
msg.addElement(msg.newMessageElement(ConnectedRdvAdvReply,
textXml,
ip));
msg.setString(ConnectedPeerReply, localPeerId);
msg.setString(ConnectedLeaseReply, Long.toString(lease));
messenger.sendMessage(msg);
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug(" message sent");
}
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" failed with ", e);
return false;
}
return true;
}
/**
* Propagates on all endpoint protocols.
* This is the internal version. See also the public one that adds
* ttl/loop control.
*
* Note: The original msg is not modified and may be reused upon return.
*
* @param msg is the message to propagate.
* @param serviceName is the name of the service
* @param queueName Description of Parameter
*/
private void sendToNetwork(Message msg,
String serviceName,
String queueName) {
try {
endpoint.propagate(msg,
serviceName,
queueName);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(e.toString());
}
}
/**
* Sends to all rdv clients.
* This is the internal version. See also the public one that adds
* ttl/loop control.
*
* Note: The original msg is not modified and may be reused upon return.
*
* @param msg is the message to propagate.
* @param serviceName is the name of the service
* @param serviceParam is the parameter of the service
*/
private void sendToEachClient(Message msg,
String serviceName,
String serviceParam) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("sendToEachClient");
if ((!isRendezVous) || (clients == null) || (clients.size() == 0)) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" no clients");
return;
}
// Do it backwards so that we get the size only once and
// do not have to make adjustments when we remove entry.
for (int i = clients.size() - 1; i >= 0; --i) {
PeerConnection pConn = null;
try {
pConn = (PeerConnection) clients.elementAt(i);
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("to: " + pConn.getPeer());
// Check if this client has already processed this propagated
// message.
if (isPeerInPropHeader(msg, pConn.getPeer())) {
// Discard.
continue;
}
Message tmpMsg = (Message) msg.clone();
pConn.sendMessage(tmpMsg, serviceName, serviceParam);
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" sent to " + pConn.getPeer());
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" failed sent to " + pConn.getPeer(), e);
}
}
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" done");
}
/**
* Sends to rdv all rendezvous.
* This is the internal version. See also the public one that adds
* ttl/loop control.
*
* Note: The original msg is not modified and may be reused upon return.
*
* @param msg is the message to propagate.
* @param serviceName is the name of the service
* @param serviceParam is the parameter of the service
* @param prunePeer is a peer to prune in the propagation.
*/
private void sendToEachRendezVous(Message msg,
String serviceName,
String serviceParam) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("sendToEachRendezVous");
if ((!isClient) || (rendezVous == null) || (rendezVous.size() == 0)) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" no rendezvous");
return;
}
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" Sending to all rendezvous");
// Do it backwards so that we get the size only once and
// do not have to make adjustments when we remove entry.
for (int i = rendezVous.size() - 1; i >= 0; --i) {
try {
PeerConnection pConn =
(PeerConnection) rendezVous.elementAt(i);
// Check if this rendezvous has already processed
// this propagated message.
if (isPeerInPropHeader(msg, pConn.getPeer())) {
// Discard.
continue;
}
Message tmpMsg = (Message) msg.clone();
pConn.sendMessage(tmpMsg, serviceName, serviceParam);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" failed with " + e);
}
}
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" done");
}
/**
*Description of the Method
*
* @param peer Description of Parameter
*/
private void ping(String peer) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("ping " + peer);
EndpointMessenger messenger = null;
try {
messenger = endpoint.getMessenger(mkAddress(peer,
pName,
pParam));
if (messenger == null) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" no messenger");
return;
}
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" getting messenger failed with " + e);
return;
}
Message msg = endpoint.newMessage();
try {
msg.setString(PingRequest, localPeerId);
messenger.sendMessage(msg);
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" message sent");
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" failed with " + e);
}
return;
}
/**
*Description of the Method
*/
private synchronized void gcClients() {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("GC CLIENTS starts");
if ((clients == null) || (clients.size() == 0)) {
// Nothing to do.
return;
}
PeerConnection pConn = null;
for (int i = 0; i < clients.size(); ++i) {
try {
pConn = (PeerConnection) clients.elementAt(i);
long time = System.currentTimeMillis();
if ((!pConn.isConnected()) || (pConn.getLease() < time)) {
// This client has dropped out. Remove it.
// or the leased is over.
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("GC CLIENT: drop " + pConn.getPeer());
LOG.debug(" lease= " + pConn.getLease());
LOG.debug(" current time= " + time);
LOG.debug(" connected= " + pConn.isConnected());
}
clients.removeElementAt(i);
pConn.close();
// Decrement i because the index has changed
--i;
continue;
}
continue;
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("gcClient failed " + e);
}
}
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("GC RENDEZVOUS done");
}
private synchronized void gcRdvs() {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("GC Rendezvous starts");
if (rendezVous == null) {
return;
}
PeerConnection pConn = null;
for (int i = 0; i < rendezVous.size(); ++i) {
try {
pConn = (PeerConnection) rendezVous.elementAt(i);
long time = System.currentTimeMillis();
if ((!pConn.isConnected()) || (pConn.getLease() < time)) {
// This client has dropped out. Remove it.
// or the leased is over.
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("GC RDV: drop " + pConn.getPeer());
LOG.debug(" lease= " + pConn.getLease());
LOG.debug(" current time= " + time);
LOG.debug(" connected= " + pConn.isConnected());
}
rendezVous.removeElementAt(i);
pConn.close();
removedRendezVous.add(pConn);
// Decrement i because the index has changed
--i;
continue;
}
continue;
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("ping failed " + e);
}
}
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("GC RENDEZVOUS done");
}
private void gc() {
// GC the clients
gcClients();
// GC the rendezvous
gcRdvs();
}
/**
*Description of the Method
*
* @param peer Description of Parameter
*/
private synchronized void clientIsConnected(String peer) {
if (clients == null) {
return;
}
PeerConnection pConn = null;
for (int i = 0; i < clients.size(); ++i) {
try {
pConn = (PeerConnection) clients.elementAt(i);
if (pConn.getPeer().equals(peer)) {
pConn.connect();
return;
}
} catch (Exception e) {
}
}
return;
}
/**
*Description of the Method
*
* @param msg Description of Parameter
*/
private void processPingReply(Message msg) {
String peer = msg.getString(PingReply);
clientIsConnected(peer);
}
/**
*Description of the Method
*
* @param msg Description of Parameter
*/
private void processPingRequest(Message msg) {
EndpointMesseng
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?