📄 stdrendezvousservice.java
字号:
} } else { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Received " + message + " (" + propHdr.getMsgId() + ") from " + pve); } } } else { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Received " + message + " (" + propHdr.getMsgId() + ") from " + pConn); } } } else { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Received " + message + " (" + propHdr.getMsgId() + ") from loopback."); } } } else { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Received " + message + " (" + propHdr.getMsgId() + ") from network -- repropagating with TTL 2"); } propHdr.setTTL(Math.min(propHdr.getTTL(), 3)); // will be reduced during repropagate stage. } super.processReceivedMessage(message, propHdr, srcAddr, dstAddr); } /** * @inheritDoc **/ public void propagate(Enumeration destPeerIDs, Message msg, String serviceName, String serviceParam, int ttl) { ttl = Math.min(ttl, MAX_TTL); RendezVousPropagateMessage propHdr = updatePropHeader(msg, getPropHeader(msg), serviceName, serviceParam, ttl); if (null != propHdr) { int numPeers = 0; try { while (destPeerIDs.hasMoreElements()) { ID dest = (ID) destPeerIDs.nextElement(); try { PeerConnection pConn = (PeerConnection) getPeerConnection(dest); //TODO: make use of PeerView connections as well if (null == pConn) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Sending " + msg + " (" + propHdr.getMsgId() + ") to " + dest); } EndpointAddress addr = mkAddress((PeerID) dest, PropSName, PropPName); Messenger messenger = rdvService.endpoint.getMessengerImmediate(addr, null); if (null != messenger) { try { messenger.sendMessage(msg); } catch (IOException ignored) { continue; } } else { continue; } } else { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Sending " + msg + " (" + propHdr.getMsgId() + ") to " + pConn); } if (pConn.isConnected()) { pConn.sendMessage((Message) msg.clone(), PropSName, PropPName); } else { continue; } } numPeers++; } catch (Exception failed) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Failed to send " + msg + " (" + propHdr.getMsgId() + ") to " + dest); } } } } finally { if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) { rendezvousMeter.propagateToPeers(numPeers); } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Propagated " + msg + " (" + propHdr.getMsgId() + ") to " + numPeers + " peers."); } } } else { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Declined to send " + msg + " ( no propHdr )"); } } } /** * {@inheritDoc} **/ public void propagateToNeighbors(Message msg, String serviceName, String serviceParam, int ttl) throws IOException { ttl = Math.min(MAX_TTL, ttl); RendezVousPropagateMessage propHdr = updatePropHeader(msg, getPropHeader(msg), serviceName, serviceParam, ttl); if (null != propHdr) { try { sendToNetwork(msg, propHdr); if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) { rendezvousMeter.propagateToNeighbors(); } } catch (IOException failed) { if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) { rendezvousMeter.propagateToNeighborsFailed(); } throw failed; } } } /** * Returns the peer connection or null if not present. * * @return PeerConnection the peer connection or null if not present. **/ public abstract PeerConnection getPeerConnection(ID id); /** * Returns a list of the current peer connections. * * @return PeerConnection the peer connection or null if not present. **/ protected abstract PeerConnection[] getPeerConnections(); /** * Sends to all connected peers. * * <p/>Note: The original msg is not modified and may be reused upon return. * * @param msg is the message to propagate. * @param serviceName is the name of the service * @param serviceParam is the parameter of the service */ protected int sendToEachConnection(Message msg, RendezVousPropagateMessage propHdr) { int sentToPeers = 0; List peers = Arrays.asList(getPeerConnections()); Iterator eachClient = peers.iterator(); while (eachClient.hasNext()) { PeerConnection pConn = (PeerConnection) eachClient.next(); // Check if this rendezvous has already processed this propagated message. if (!pConn.isConnected()) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Skipping " + pConn + " for " + msg + "(" + propHdr.getMsgId() + ") -- disconnected."); } // next! continue; } if (propHdr.isVisited(pConn.getPeerID().toURI())) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Skipping " + pConn + " for " + msg + "(" + propHdr.getMsgId() + ") -- already visited."); } // next! continue; } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Sending " + msg + "(" + propHdr.getMsgId() + ") to " + pConn); } if( pConn.sendMessage((Message) msg.clone(), PropSName, PropPName) ) {; sentToPeers++; } } if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("Sent " + msg + "(" + propHdr.getMsgId() + ") to " + sentToPeers + " of " + peers.size() + " peers."); } return sentToPeers; } /** * Sends a disconnect message to the specified peer. * * @param id the peer which we will disconnect from. * @param Messenger the messenger to use in sending the disconnect. **/ protected void sendDisconnect( PeerID peerid, PeerAdvertisement padv ) { Message msg = new Message(); // The request simply includes the local peer advertisement. try { msg.replaceMessageElement("jxta", new TextDocumentMessageElement(DisconnectRequest, getPeerAdvertisementDoc(), null)); EndpointAddress addr = mkAddress(peerid, null, null); RouteAdvertisement hint = null; if (null != padv) { hint = RendezVousServiceImpl.extractRouteAdv(padv); } Messenger messenger = rdvService.endpoint.getMessenger(addr, hint); if (null == messenger) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Could not get messenger for " + peerid); } return; } messenger.sendMessage(msg, pName, pParam); } catch (Exception e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("sendDisconnect failed", e); } } } /** * Sends a disconnect message to the specified peer. * * @param id the peer which we will disconnect from. * @param Messenger the messenger to use in sending the disconnect. **/ protected void sendDisconnect( PeerConnection pConn ) { Message msg = new Message(); // The request simply includes the local peer advertisement. try { msg.replaceMessageElement("jxta", new TextDocumentMessageElement(DisconnectRequest, getPeerAdvertisementDoc(), null)); pConn.sendMessage(msg, pName, pParam); } catch (Exception e) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("sendDisconnect failed", e); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -