⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 wirepipe.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
    /**     *  {@inheritDoc}     */    public synchronized void close() {        if(closed) {            return;        }        pipeResolver.forget(this);        repropagater.close();        msgIds.clear();        closed = true;    }    /**     *  {@inheritDoc}     */    public String getType() {        return pipeAdv.getType();    }    /**     *  {@inheritDoc}     */    public ID getPipeID() {        return pipeAdv.getPipeID();    }    /**     *  {@inheritDoc}     */    public String getName() {        return pipeAdv.getName();    }    /**     *  {@inheritDoc}     */    public PipeAdvertisement getAdvertisement() {        return pipeAdv;    }    /**     *  {@inheritDoc}     */    public void processIncomingMessage(Message message, EndpointAddress srcAddr, EndpointAddress dstAddr) {        // Check if there is a JXTA-WIRE header        MessageElement elem = message.getMessageElement("jxta", WirePipeImpl.WireTagName);        if(null == elem) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("No JxtaWireHeader element. Discarding " + message);            }            return;        }        WireHeader header;        try {            XMLDocument doc = (XMLDocument)                              StructuredDocumentFactory.newStructuredDocument(elem.getMimeType(), elem.getStream());            header = new WireHeader(doc);        } catch (Exception e) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("bad wire header", e);            }            return;        }        processIncomingMessage(message, header, srcAddr, dstAddr);    }    /**     *  local version with the wire header already parsed. There are two paths     *  to this point; via the local endpoint listener or via the general     *  propagation listener in WirePipeImpl.     *     */    void processIncomingMessage(Message message, WireHeader header, EndpointAddress srcAddr, EndpointAddress dstAddr) {        if (header.containsPeer(localPeerId)) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Loopback detected - discarding " + message);            }            return;        }        if (recordSeenMessage(header.getMsgId())) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Discarding duplicate " + message);            }            return;        }        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Processing " + message + " on " + pipeAdv.getPipeID());        }        if (myGroup.isRendezvous()) {            // local listeners are called during repropagate            repropagate(message, header);        } else {            callLocalListeners(message, srcAddr, dstAddr);        }    }    /**     *  Calls the local listeners for a given pipe     */    private void callLocalListeners(Message message, EndpointAddress srcAddr, EndpointAddress dstAddr) {        srcAddr = (null == srcAddr) ?  null : EndpointAddress.unmodifiableEndpointAddress(srcAddr);        dstAddr = (null == dstAddr) ?  null : EndpointAddress.unmodifiableEndpointAddress(dstAddr);        Iterator eachInput = Arrays.asList(wireinputpipes.keySet().toArray(new InputPipe[0])).iterator();        while(eachInput.hasNext()) {            InputPipeImpl anInputPipe = (InputPipeImpl) eachInput.next();            Message tmpMsg = (Message) message.clone();            try {                anInputPipe.processIncomingMessage(tmpMsg, srcAddr, dstAddr);            } catch(Throwable ignored) {                if (LOG.isEnabledFor(Level.ERROR)) {                    LOG.error("Uncaught Throwable during callback (" + anInputPipe + ") for " + anInputPipe.getPipeID(), ignored);                }            }        }    }    /**     *  Repropagate a message.     */    void repropagate(Message message, WireHeader header) {        if ((header.getTTL() <= 1)) {            // This message ran out of fuel.            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("No TTL remaining - discarding " + message + " on " + header.getPipeID());            }            return;        }        Message msg = (Message) message.clone();        header.setTTL(header.getTTL() - 1);        header.addPeer(localPeerId);        XMLDocument headerDoc = (XMLDocument) header.getDocument(MimeMediaType.XMLUTF8);        MessageElement elem = new TextDocumentMessageElement(WirePipeImpl.WireTagName, headerDoc, null);        msg.replaceMessageElement("jxta", elem);        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Repropagating " + msg + " on " + header.getPipeID());        }        try {            if(!repropagater.enqueue(msg)) {                // XXX bondolo@jxta.org we don't make any attempt to retry.                // There is a potential problem in that we have accepted the                // message locally but didn't resend it. If we get another copy                // of this message then we will NOT attempt to repropagate it                // even if we should.                if (LOG.isEnabledFor(Level.WARN)) {                    LOG.warn("Failure repropagating " + msg + " on " + header.getPipeID() + ". Could not queue message.");                }            }        } catch(IOException failed) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("Failure repropagating " + msg + " on " + header.getPipeID(), failed);            }        }    }    /**     *  Sends a message on the propagated pipe.  if set is not empty, then the      *  message is sent to set of peers.     *     *  @param msg  The message to send.     *  @param peers    The peers to which the message will be sent. If the     *  set is empty then the message is sent to all members of the pipe that are     *  connected to the rendezvous, as well as walk the message through the network     */    void sendMessage(Message msg, Set peers) throws IOException {        // do local listeners if we are to be one of the destinations        if(peers.isEmpty() || peers.contains(myGroup.getPeerID())) {            callLocalListeners(msg, null, null);        }        if(peers.isEmpty()) {            if (myGroup.isRendezvous()) {                // propagate to my clients                SrdiIndex srdiIndex = pipeResolver.getSrdiIndex();                List peerids = srdiIndex.query(PipeService.PropagateType, PipeAdvertisement.IdTag, getPipeID().toString(), Integer.MAX_VALUE);                peerids.retainAll(rendezvous.getConnectedPeerIDs());                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("Propagating " + msg + " to " + peerids.size() + " subscriber peers and walking through peerview.");                }                // we clone the message since we are deliberately setting the TTL very low.                rendezvous.propagate(Collections.enumeration(peerids), (Message) msg.clone(), WirePipeImpl.WireName, pipeService.getServiceParameter(), 1);            } else {                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("Propagating " + msg + " to whole network.");                }                // propagate to local sub-net                rendezvous.propagateToNeighbors(msg, WirePipeImpl.WireName, pipeService.getServiceParameter(), RendezVousService.DEFAULT_TTL);            }            // walk the message through rdv network (edge, or rendezvous)            rendezvous.walk(msg, WirePipeImpl.WireName, pipeService.getServiceParameter(), RendezVousService.DEFAULT_TTL);        } else {            // Send to specific peers            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("Propagating " + msg + " to " + peers.size() + " peers.");            }            rendezvous.propagate(Collections.enumeration(peers), msg, WirePipeImpl.WireName, pipeService.getServiceParameter(), 1);        }    }    /**     *  Create a unique (mostly) identifier for this message     */    String createMsgId() {        return UUIDFactory.newSeqUUID().toString();    }    /**     *  Adds a message ID to our table or stored IDs.     *     *  @param ID to add.     *  @return false if ID was added, true if it was a duplicate.     */    private boolean recordSeenMessage(String id) {        UUID msgid = null;        try {            msgid = new UUID(id);        } catch(IllegalArgumentException notauuid) {            // XXX 20031024 bondolo@jxta.org these two conversions are provided            // for backwards compatibility and should eventually be removed.            try {                msgid = UUIDFactory.newHashUUID(Long.parseLong(id), 0);            } catch (NumberFormatException notanumber) {                msgid = UUIDFactory.newHashUUID(id.hashCode(), 0);            }        }        synchronized(msgIds) {            if (msgIds.contains(msgid)) {                // Already there. Nothing to do                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("duplicate " + msgid);                }                return true;            }            if (msgIds.size() >= MAX_RECORDED_MSGIDS) {                // The cache is full. Remove the oldest                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("Remove oldest id");                }                msgIds.remove(0);            }            msgIds.add(msgid);        }        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("added " + msgid);        }        return false;    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -