⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 wirepipe.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
    /**     * {@inheritDoc}     */    public String getName() {        return pipeAdv.getName();    }    /**     * {@inheritDoc}     */    public PipeAdvertisement getAdvertisement() {        return pipeAdv;    }    /**     * {@inheritDoc}     * <p/>     * Handler for messages received through the normal pipe endpoint     * listener.     * <p/>     * "PipeService" / &lt;PipeID&gt;     */    public void processIncomingMessage(Message message, EndpointAddress srcAddr, EndpointAddress dstAddr) {        // Check if there is a JXTA-WIRE header        MessageElement elem = message.getMessageElement(WirePipeImpl.WIRE_HEADER_ELEMENT_NAMESPACE,                WirePipeImpl.WIRE_HEADER_ELEMENT_NAME);        if (null == elem) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("No JxtaWireHeader element. Discarding " + message);            }            return;        }        WireHeader header;        try {            XMLDocument doc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(elem);            header = new WireHeader(doc);        } catch (Exception e) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.log(Level.WARNING, "bad wire header", e);            }            return;        }        processIncomingMessage(message, header, srcAddr, dstAddr);    }    /**     * local version with the wire header already parsed. There are two paths     * to this point; via the local endpoint listener or via the general     * propagation listener in WirePipeImpl.     *     * @param message the message     * @param header  the wire header     * @param srcAddr source     * @param dstAddr destination     */    void processIncomingMessage(Message message, WireHeader header, EndpointAddress srcAddr, EndpointAddress dstAddr) {        if (recordSeenMessage(header.getMsgId())) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Discarding duplicate " + message);            }            return;        }        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Processing " + message + " from " + srcAddr + " on " + pipeAdv.getPipeID());        }        callLocalListeners(message, srcAddr, dstAddr);        if (peerGroup.isRendezvous()) {            repropagate(message, header);        }    }    /**     * Calls the local listeners for a given pipe.     *     * @param message the message     * @param srcAddr source The peer which sent us the message (last hop).     * @param dstAddr dest The wire pipe the message was sent to.     */    private void callLocalListeners(Message message, EndpointAddress srcAddr, EndpointAddress dstAddr) {        List<InputPipeImpl> listeners = new ArrayList(wireinputpipes.keySet());        if (listeners.isEmpty()) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("No local listeners for " + pipeAdv.getPipeID());            }        } else {            int listenersCalled = 0;            for (InputPipeImpl anInputPipe : listeners) {                try {                    anInputPipe.processIncomingMessage(message.clone(), srcAddr, dstAddr);                    listenersCalled++;                } catch (Throwable ignored) {                    if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                        LOG.log(Level.SEVERE, "Uncaught Throwable during callback (" + anInputPipe + ") for " + anInputPipe.getPipeID(), ignored);                    }                }            }            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Called " + listenersCalled + " of " + listeners.size() + " local listeners for " + pipeAdv.getPipeID());            }        }    }    /**     * Repropagate a message.     *     * @param message the message     * @param header  the header     */    void repropagate(Message message, WireHeader header) {        if (closed) {            return;        }        if ((header.getTTL() <= 1)) {            // This message has run out of fuel.            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("No TTL remaining - discarding " + message + " on " + header.getPipeID());            }            return;        }        Message msg = message.clone();        header.setTTL(header.getTTL() - 1);        XMLDocument headerDoc = (XMLDocument) header.getDocument(MimeMediaType.XMLUTF8);        MessageElement elem = new TextDocumentMessageElement(WirePipeImpl.WIRE_HEADER_ELEMENT_NAME, headerDoc, null);        msg.replaceMessageElement(WirePipeImpl.WIRE_HEADER_ELEMENT_NAMESPACE, elem);        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Repropagating " + msg + " on " + header.getPipeID());        }        synchronized (this) {            if (closed) {                return;            }            if (null == repropagater) {                repropagater = wireService.createOutputPipe(pipeAdv, Collections.EMPTY_SET);            }        }        try {            if (!repropagater.sendUnModified(msg, header)) {                // XXX bondolo@jxta.org we don't make any attempt to retry.                // There is a potential problem in that we have accepted the                // message locally but didn't resend it. If we get another copy                // of this message then we will NOT attempt to repropagate it                // even if we should.                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                    LOG.warning("Failure repropagating " + msg + " on " + header.getPipeID() + ". Could not queue message.");                }            }        } catch (IOException failed) {            if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                LOG.log(Level.WARNING, "Failure repropagating " + msg + " on " + header.getPipeID(), failed);            }        }    }    /**     * Sends a message on the propagated pipe.  if set is not empty, then the     * message is sent to set of peers.     *     * @param message    The message to send.     * @param peers  The peers to which the message will be sent. If the     *               set is empty then the message is sent to all members of the pipe that are     *               connected to the rendezvous, as well as walk the message through the network     * @param header message header     * @throws java.io.IOException if an io error occurs     */    void sendMessage(Message message, Set<? extends ID> peers, WireHeader header) throws IOException {        message = message.clone();        // do local listeners if we are to be one of the destinations        if (peers.isEmpty() || peers.contains(localPeerId)) {            if (!recordSeenMessage(header.getMsgId())) {                callLocalListeners(message, new EndpointAddress(localPeerId, null, null),                        new EndpointAddress(pipeAdv.getPipeID(), null, null));            }        }        if (peers.isEmpty()) {            if (peerGroup.isRendezvous()) {                // propagate to my clients                SrdiIndex srdiIndex = pipeResolver.getSrdiIndex();                List<PeerID> peerids = srdiIndex.query(PipeService.PropagateType, PipeAdvertisement.IdTag, getPipeID().toString(),                        Integer.MAX_VALUE);                peerids.retainAll(rendezvous.getConnectedPeerIDs());                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("Propagating " + message + " to " + peerids.size() + " subscriber peers.");                }                rendezvous.propagate(Collections.enumeration(peerids), message, WirePipeImpl.WIRE_SERVICE_NAME,                        wireService.getServiceParameter(), 1);            } else {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("Propagating " + message + " to whole network.");                }                // propagate to local sub-net                rendezvous.propagateToNeighbors(message, WirePipeImpl.WIRE_SERVICE_NAME, wireService.getServiceParameter(),                        RendezVousService.DEFAULT_TTL);            }            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Walking " + message + " through peerview.");            }            // walk the message through rdv network (edge, or rendezvous)            rendezvous.walk(message, WirePipeImpl.WIRE_SERVICE_NAME, wireService.getServiceParameter(),                    RendezVousService.DEFAULT_TTL);        } else {            // Send to specific peers            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Propagating " + message + " to " + peers.size() + " peers.");            }            rendezvous.propagate(Collections.enumeration(peers), message, WirePipeImpl.WIRE_SERVICE_NAME,                    wireService.getServiceParameter(), 1);        }    }    /**     * Create a unique (mostly) identifier for this message     *     * @return a message sequence uuid     */    static String createMsgId() {        return UUIDFactory.newSeqUUID().toString();    }    /**     * Adds a message ID to our table or stored IDs.     *     * @param id to add.     * @return false if ID was added, true if it was a duplicate.     */    private boolean recordSeenMessage(String id) {        UUID msgid;        try {            msgid = new UUID(id);        } catch (IllegalArgumentException notauuid) {            // XXX 20031024 bondolo@jxta.org these two conversions are provided            // for backwards compatibility and should eventually be removed.            try {                msgid = UUIDFactory.newHashUUID(Long.parseLong(id), 0);            } catch (NumberFormatException notanumber) {                msgid = UUIDFactory.newHashUUID(id.hashCode(), 0);            }        }        synchronized (msgIds) {            if (msgIds.contains(msgid)) {                // Already there. Nothing to do                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("duplicate " + msgid);                }                return true;            }            if (msgIds.size() < MAX_RECORDED_MSGIDS) {                msgIds.add(msgid);            } else {                msgIds.set((messagesReceived % MAX_RECORDED_MSGIDS), msgid);            }            messagesReceived++;        }        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("added " + msgid);        }        return false;    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -