⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rendezvousserviceprovider.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
            }            repropagate(message, propHdr, dstAddr.getServiceName(), dstAddr.getServiceParameter());        }    }    protected abstract void repropagate(Message msg, RendezVousPropagateMessage propHdr, String sName, String sParam);    public abstract void propagateInGroup(Message msg, String serviceName, String serviceParam, int ttl) throws IOException;    /**     * Propagates on all endpoint protocols.     *     * <p/>Note: The original msg is not modified and may be reused upon return.     *     * @param  msg          is the message to propagate.     * @param  serviceName  is the name of the service     * @param  queueName    Description of Parameter     */    protected void sendToNetwork(Message msg, RendezVousPropagateMessage propHdr) throws IOException {        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Endpoint propagating " + msg + " (" + propHdr.getMsgId() + ")");        }        rdvService.endpoint.propagate((Message) msg.clone(), PropSName, PropPName);    }    /**     *  Convenience method for constructing an endpoint address from an id     *     *  @param destPeer peer id     *  @param serv the service name (if any)     *  @param parm the service param (if any)     *  @param endpointAddress for this peer id.     */    protected final static EndpointAddress mkAddress(String destPeer, String serv, String parm) {        ID asID = null;        try {            asID = IDFactory.fromURI(new URI(destPeer));        } catch (URISyntaxException caught) {            throw new IllegalArgumentException(caught.getMessage());        }        return mkAddress(asID, serv, parm);    }    /**     *  Convenience method for constructing an endpoint address from an id     *     *  @param destPeer peer id     *  @param serv the service name (if any)     *  @param parm the service param (if any)     *  @param endpointAddress for this peer id.     */    protected final static EndpointAddress mkAddress(ID destPeer, String serv, String parm) {        EndpointAddress addr = new EndpointAddress(MESSAGE_NAMESPACE_NAME, destPeer.getUniqueValue().toString(), serv, parm);        return addr;    }    /**     *  Get propagate header from the message.     *     *  @param msg  The source message.     *  @return The message's propagate header if any, otherwise null.     */    protected RendezVousPropagateMessage getPropHeader(Message msg) {        MessageElement elem = msg.getMessageElement(MESSAGE_NAMESPACE_NAME, HEADER_NAME);        if (elem == null) {            return null;        }        try {            StructuredDocument asDoc = StructuredDocumentFactory.newStructuredDocument(elem.getMimeType(), elem.getStream());            return new RendezVousPropagateMessage(asDoc);        } catch (IOException failed) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("Could not get prop header of " + msg, failed);            }            IllegalArgumentException failure = new IllegalArgumentException("Could not get prop header of " + msg);            failure.initCause(failed);            throw failure;        }    }    /**     *  {@inheritDoc}     */    protected RendezVousPropagateMessage checkPropHeader(Message msg) {        RendezVousPropagateMessage propHdr;        try {            propHdr = getPropHeader(msg);            if (null == propHdr) {                // No header. Discard the message                if (LOG.isEnabledFor(Level.WARN)) {                    LOG.warn("Discarding " + msg + " -- missing propagate header.");                }                if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) {                    rendezvousMeter.invalidMessageReceived();                }                return null;            }        } catch (Exception failure) {            // Bad header. Discard the message            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("Discarding " + msg + " -- bad propagate header.", failure);            }            if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) {                rendezvousMeter.invalidMessageReceived();            }            return null;        }        // Look at the Propagate header if any and check for loops.        // Do not remove it; we do not have to change it yet, and we have        // do look at it at different places and looking costs less on        // incoming elements than on outgoing.        // TTL detection. A message arriving with TTL <= 0 should not even        // have been sent. Kill it.        if (propHdr.getTTL() <= 0) {            // This message is dead on arrival. Drop it.            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Discarding " + msg + "(" + propHdr.getMsgId() + ") -- dead on arrival (TTl=" + propHdr.getTTL() + ").");            }            if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) {                rendezvousMeter.receivedDeadMessage();            }            return null;        }        if (!rdvService.addMsgId(propHdr.getMsgId())) {            // We already received this message - discard            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Discarding " + msg + "(" + propHdr.getMsgId() + ") -- feedback.");            }            if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) {                rendezvousMeter.receivedDuplicateMessage();            }            return null;        }        // Loop detection        if (propHdr.isVisited(group.getPeerID().toURI())) {            // Loop is detected - discard.            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Discarding " + msg + "(" + propHdr.getMsgId() + ") -- loopback.");            }            if (RendezvousMeterBuildSettings.RENDEZVOUS_METERING && (rendezvousMeter != null)) {                rendezvousMeter.receivedLoopbackMessage();            }            return null;        }        // Message is valid        return propHdr;    }    protected RendezVousPropagateMessage updatePropHeader(Message msg, RendezVousPropagateMessage propHdr, String serviceName, String serviceParam, int ttl) {        boolean newHeader = false;        if (null == propHdr) {            propHdr = newPropHeader(serviceName, serviceParam, ttl);            newHeader = true;        } else {            if (null == updatePropHeader(propHdr, ttl)) {                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("TTL expired for " + msg + " (" + propHdr.getMsgId() + ") ttl=" + propHdr.getTTL());                }                return null;            }        }        XMLDocument propHdrDoc = (XMLDocument) propHdr.getDocument(MimeMediaType.XMLUTF8);        MessageElement elem = new TextDocumentMessageElement(HEADER_NAME, propHdrDoc, null);        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug((newHeader ? "Added" : "Updated") + " prop header for " + msg + " (" + propHdr.getMsgId() + ") ttl=" + propHdr.getTTL());        }        msg.replaceMessageElement(MESSAGE_NAMESPACE_NAME, elem);        return propHdr;    }    /**     * Adds a propagation header to the given message with the given default     * TTL. Also adds this peer to the path recorded in the message.     *     * @param  msg              Description of Parameter     * @param  defaultTTL       Description of Parameter     * @return                  Description of the Returned Value     */    private RendezVousPropagateMessage newPropHeader(String serviceName, String serviceParam, int ttl) {        RendezVousPropagateMessage propHdr = new RendezVousPropagateMessage();        propHdr.setTTL(ttl);        propHdr.setDestSName(serviceName);        propHdr.setDestSParam(serviceParam);        UUID msgID = rdvService.createMsgId();        propHdr.setMsgId(msgID);        rdvService.addMsgId(msgID);        // Add this peer to the path.        propHdr.addVisited(group.getPeerID().toURI());        return propHdr;    }    /**     * Updates the propagation header of the message. Also adds this peer to the     * path recorded in the message. Returns true if the message should be     * repropagated, false otherwise.     *     * @param  msg The message to update     * @param  propHdr The propHdr for the message.     * @param  maxTTL The maximum TTL which will be allowed.     * @return The updated propagate header if the message should be     * repropagated otherwise null.     */    private RendezVousPropagateMessage updatePropHeader(RendezVousPropagateMessage propHdr, int maxTTL) {        int msgTTL = propHdr.getTTL();        URI me = group.getPeerID().toURI();        int useTTL = msgTTL;        if (!propHdr.isVisited(me)) {            // only reduce TTL if message has not previously visited us.            useTTL--;        }        // ensure TTL does not exceed maxTTL        useTTL = Math.min(useTTL, maxTTL);        propHdr.setTTL(useTTL);        // Add this peer to the path.        propHdr.addVisited(me);        // If message came in with TTL one or less, it was last trip. It can not go any further.        return (useTTL <= 0) ? null : propHdr;    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -