📄 stdrendezvousservice.java
字号:
try { while (destPeerIDs.hasMoreElements()) { ID dest = destPeerIDs.nextElement(); try { PeerConnection pConn = getPeerConnection(dest); // TODO: make use of PeerView connections as well if (null == pConn) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sending " + msg + " (" + propHdr.getMsgId() + ") to " + dest); } EndpointAddress addr = mkAddress(dest, PropSName, PropPName); Messenger messenger = rdvService.endpoint.getMessengerImmediate(addr, null); if (null != messenger) { try { messenger.sendMessage(msg); } catch (IOException ignored) { continue; } } else { continue; } } else { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sending " + msg + " (" + propHdr.getMsgId() + ") to " + pConn); } if (pConn.isConnected()) { pConn.sendMessage(msg.clone(), PropSName, PropPName); } else { continue; } } numPeers++; } catch (Exception failed) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Failed to send " + msg + " (" + propHdr.getMsgId() + ") to " + dest); } } } } finally { if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) { rendezvousMeter.propagateToPeers(numPeers); } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Propagated " + msg + " (" + propHdr.getMsgId() + ") to " + numPeers + " peers."); } } } else { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Declined to send " + msg + " ( no propHdr )"); } } } /** * {@inheritDoc} */ @Override public void propagateToNeighbors(Message msg, String serviceName, String serviceParam, int initialTTL) throws IOException { msg = msg.clone(); int useTTL = Math.min(initialTTL, MAX_TTL); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Propagating " + msg + "(TTL=" + useTTL + ") to neighbors to :" + "\n\tsvc name:" + serviceName+ "\tsvc params:" + serviceParam); } RendezVousPropagateMessage propHdr = updatePropHeader(msg, getPropHeader(msg), serviceName, serviceParam, useTTL); if (null != propHdr) { try { sendToNetwork(msg, propHdr); if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) { rendezvousMeter.propagateToNeighbors(); } } catch (IOException failed) { if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) { rendezvousMeter.propagateToNeighborsFailed(); } throw failed; } } } /** * {@inheritDoc} */ @Override protected void repropagate(Message msg, RendezVousPropagateMessage propHdr, String serviceName, String serviceParam) { msg = msg.clone(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Repropagating " + msg + " (" + propHdr.getMsgId() + ")"); } if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) { rendezvousMeter.receivedMessageRepropagatedInGroup(); } try { propHdr = updatePropHeader(msg, propHdr, serviceName, serviceParam, MAX_TTL); if (null != propHdr) { // Note (hamada): This is an unnecessary operation, and serves // no purpose other than the additional loads it imposes on the // rendezvous. Local subnet network operations should be (and are) // sufficient to achieve the goal. // sendToEachConnection(msg, propHdr); sendToNetwork(msg, propHdr); } else { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("No propagate header, declining to repropagate " + msg + ")"); } } } catch (Exception ez1) { // Not much we can do if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { if (propHdr != null) { LOG.log(Level.WARNING, "Failed to repropagate " + msg + " (" + propHdr.getMsgId() + ")", ez1); } else { LOG.log(Level.WARNING, "Could to repropagate " + msg, ez1); } } } } /** * Returns the peer connection or null if not present. * * @param id the node ID * @return PeerConnection the peer connection or null if not present. */ public abstract PeerConnection getPeerConnection(ID id); /** * Returns an array of the current peer connections. * * @return An array of the current peer connections. */ protected abstract PeerConnection[] getPeerConnections(); /** * Sends to all connected peers. * <p/> * Note: The original msg is not modified and may be reused upon return. * * @param msg The message to be sent. * @param propHdr The propagation header associated with the message. * @return the number of nodes the message was sent to */ protected int sendToEachConnection(Message msg, RendezVousPropagateMessage propHdr) { List<PeerConnection> peers = Arrays.asList(getPeerConnections()); int sentToPeers = 0; if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sending " + msg + "(" + propHdr.getMsgId() + ") to " + peers.size() + " peers."); } for (PeerConnection pConn : peers) { // Check if this rendezvous has already processed this propagated message. if (!pConn.isConnected()) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Skipping " + pConn + " for " + msg + "(" + propHdr.getMsgId() + ") -- disconnected."); } // next! continue; } if (propHdr.isVisited(pConn.getPeerID().toURI())) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Skipping " + pConn + " for " + msg + "(" + propHdr.getMsgId() + ") -- already visited."); } // next! continue; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sending " + msg + "(" + propHdr.getMsgId() + ") to " + pConn); } if (pConn.sendMessage(msg.clone(), PropSName, PropPName)) { sentToPeers++; } } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Sent " + msg + "(" + propHdr.getMsgId() + ") to " + sentToPeers + " of " + peers.size() + " peers."); } return sentToPeers; } /** * Sends a disconnect message to the specified peer. * * @param peerid The peer to be disconnected. * @param padv The peer to be disconnected. */ protected void sendDisconnect(ID peerid, PeerAdvertisement padv) { Message msg = new Message(); // The request simply includes the local peer advertisement. try { msg.replaceMessageElement("jxta", new TextDocumentMessageElement(DisconnectRequest, getPeerAdvertisementDoc(), null)); EndpointAddress addr = mkAddress(peerid, null, null); RouteAdvertisement hint = null; if (null != padv) { hint = EndpointUtils.extractRouteAdv(padv); } Messenger messenger = rdvService.endpoint.getMessengerImmediate(addr, hint); if (null == messenger) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.warning("Could not get messenger for " + peerid); } return; } messenger.sendMessage(msg, pName, pParam); } catch (Exception e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "sendDisconnect failed", e); } } } /** * Sends a disconnect message to the specified peer. * * @param pConn The peer to be disconnected. */ protected void sendDisconnect(PeerConnection pConn) { Message msg = new Message(); // The request simply includes the local peer advertisement. try { msg.replaceMessageElement("jxta", new TextDocumentMessageElement(DisconnectRequest, getPeerAdvertisementDoc(), null)); pConn.sendMessage(msg, pName, pParam); } catch (Exception e) { if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) { LOG.log(Level.WARNING, "sendDisconnect failed", e); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -