⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 stdrendezvousservice.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
            try {                while (destPeerIDs.hasMoreElements()) {                    ID dest = destPeerIDs.nextElement();                    try {                        PeerConnection pConn = getPeerConnection(dest);                        // TODO: make use of PeerView connections as well                        if (null == pConn) {                            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                                LOG.fine("Sending " + msg + " (" + propHdr.getMsgId() + ") to " + dest);                            }                            EndpointAddress addr = mkAddress(dest, PropSName, PropPName);                            Messenger messenger = rdvService.endpoint.getMessengerImmediate(addr, null);                            if (null != messenger) {                                try {                                    messenger.sendMessage(msg);                                } catch (IOException ignored) {                                    continue;                                }                            } else {                                continue;                            }                        } else {                            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                                LOG.fine("Sending " + msg + " (" + propHdr.getMsgId() + ") to " + pConn);                            }                            if (pConn.isConnected()) {                                pConn.sendMessage(msg.clone(), PropSName, PropPName);                            } else {                                continue;                            }                        }                        numPeers++;                    } catch (Exception failed) {                        if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                            LOG.warning("Failed to send " + msg + " (" + propHdr.getMsgId() + ") to " + dest);                        }                    }                }            } finally {                if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) {                    rendezvousMeter.propagateToPeers(numPeers);                }                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("Propagated " + msg + " (" + propHdr.getMsgId() + ") to " + numPeers + " peers.");                }            }        } else {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Declined to send " + msg + " ( no propHdr )");            }        }    }    /**     * {@inheritDoc}     */    @Override    public void propagateToNeighbors(Message msg, String serviceName, String serviceParam, int initialTTL) throws IOException {        msg = msg.clone();        int useTTL = Math.min(initialTTL, MAX_TTL);        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Propagating " + msg + "(TTL=" + useTTL + ") to neighbors to :" + "\n\tsvc name:" + serviceName+ "\tsvc params:" + serviceParam);        }        RendezVousPropagateMessage propHdr = updatePropHeader(msg, getPropHeader(msg), serviceName, serviceParam, useTTL);        if (null != propHdr) {            try {                sendToNetwork(msg, propHdr);                if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) {                    rendezvousMeter.propagateToNeighbors();                }            } catch (IOException failed) {                if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) {                    rendezvousMeter.propagateToNeighborsFailed();                }                throw failed;            }        }    }    /**     * {@inheritDoc}     */    @Override    protected void repropagate(Message msg, RendezVousPropagateMessage propHdr, String serviceName, String serviceParam) {        msg = msg.clone();        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Repropagating " + msg + " (" + propHdr.getMsgId() + ")");        }        if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) {            rendezvousMeter.receivedMessageRepropagatedInGroup();        }        try {            propHdr = updatePropHeader(msg, propHdr, serviceName, serviceParam, MAX_TTL);            if (null != propHdr) {                // Note (hamada): This is an unnecessary operation, and serves                // no purpose other than the additional loads it imposes on the                // rendezvous.  Local subnet network operations should be (and are)                // sufficient to achieve the goal.                // sendToEachConnection(msg, propHdr);                sendToNetwork(msg, propHdr);            } else {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("No propagate header, declining to repropagate " + msg + ")");                }            }        } catch (Exception ez1) {            // Not much we can do            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                if (propHdr != null) {                    LOG.log(Level.WARNING, "Failed to repropagate " + msg + " (" + propHdr.getMsgId() + ")", ez1);                } else {                    LOG.log(Level.WARNING, "Could to repropagate " + msg, ez1);                }            }        }    }    /**     * Returns the peer connection or null if not present.     *     * @param id the node ID     * @return PeerConnection the peer connection or null if not present.     */    public abstract PeerConnection getPeerConnection(ID id);    /**     * Returns an array of the current peer connections.     *     * @return An array of the current peer connections.     */    protected abstract PeerConnection[] getPeerConnections();    /**     * Sends to all connected peers.     * <p/>     * Note: The original msg is not modified and may be reused upon return.     *     * @param msg     The message to be sent.     * @param propHdr The propagation header associated with the message.     * @return the number of nodes the message was sent to     */    protected int sendToEachConnection(Message msg, RendezVousPropagateMessage propHdr) {        List<PeerConnection> peers = Arrays.asList(getPeerConnections());        int sentToPeers = 0;        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Sending " + msg + "(" + propHdr.getMsgId() + ") to " + peers.size() + " peers.");        }        for (PeerConnection pConn : peers) {            // Check if this rendezvous has already processed this propagated message.            if (!pConn.isConnected()) {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("Skipping " + pConn + " for " + msg + "(" + propHdr.getMsgId() + ") -- disconnected.");                }                // next!                continue;            }            if (propHdr.isVisited(pConn.getPeerID().toURI())) {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("Skipping " + pConn + " for " + msg + "(" + propHdr.getMsgId() + ") -- already visited.");                }                // next!                continue;            }            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Sending " + msg + "(" + propHdr.getMsgId() + ") to " + pConn);            }            if (pConn.sendMessage(msg.clone(), PropSName, PropPName)) {                sentToPeers++;            }        }        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Sent " + msg + "(" + propHdr.getMsgId() + ") to " + sentToPeers + " of " + peers.size() + " peers.");        }        return sentToPeers;    }    /**     * Sends a disconnect message to the specified peer.     *     * @param peerid The peer to be disconnected.     * @param padv   The peer to be disconnected.     */    protected void sendDisconnect(ID peerid, PeerAdvertisement padv) {        Message msg = new Message();        // The request simply includes the local peer advertisement.        try {            msg.replaceMessageElement("jxta", new TextDocumentMessageElement(DisconnectRequest, getPeerAdvertisementDoc(), null));            EndpointAddress addr = mkAddress(peerid, null, null);            RouteAdvertisement hint = null;            if (null != padv) {                hint = EndpointUtils.extractRouteAdv(padv);            }            Messenger messenger = rdvService.endpoint.getMessengerImmediate(addr, hint);            if (null == messenger) {                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                    LOG.warning("Could not get messenger for " + peerid);                }                return;            }            messenger.sendMessage(msg, pName, pParam);        } catch (Exception e) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.log(Level.WARNING, "sendDisconnect failed", e);            }        }    }    /**     * Sends a disconnect message to the specified peer.     *     * @param pConn The peer to be disconnected.     */    protected void sendDisconnect(PeerConnection pConn) {        Message msg = new Message();        // The request simply includes the local peer advertisement.        try {            msg.replaceMessageElement("jxta", new TextDocumentMessageElement(DisconnectRequest, getPeerAdvertisementDoc(), null));            pConn.sendMessage(msg, pName, pParam);        } catch (Exception e) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.log(Level.WARNING, "sendDisconnect failed", e);            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -