⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jxtabidipipe.java

📁 JXTA&#8482 is a set of open, generalized peer-to-peer (P2P) protocols that allow any networked devi
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
        if (isReliable && ros != null) {            ros.close();        }        // close the pipe        inputPipe.close();        msgr.close();        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Pipe close complete");        }        notifyListeners(PIPE_CLOSED_EVENT);    }    private void notifyListeners(int event) {        try {            if (eventListener != null) {                eventListener.pipeEvent(event);            } else if (stateListener != null) {                stateListener.stateEvent(this, event);            }        } catch (Throwable th) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.log(Level.FINE, "error during pipe event callback", th);            }        }    }    /**     * Sets the inputPipe attribute of the JxtaBiDiPipe object     *     * @param inputPipe The new inputPipe value     */    protected void setInputPipe(InputPipe inputPipe) {        this.inputPipe = inputPipe;    }    /**     * {@inheritDoc}     */    public void pipeMsgEvent(PipeMsgEvent event) {        Message message = event.getMessage();        if (message == null) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("Empty event");            }            return;        }        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {            LOG.fine("Pipe message arrived");        }        MessageElement element;        if (!bound) {            // look for a remote pipe answer            element = message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.remPipeTag);            if (element != null) {                // connect response                try {                    XMLDocument CredDoc = null;                    XMLDocument remotePipeDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(element);                    remotePipeAdv = (PipeAdvertisement) AdvertisementFactory.newAdvertisement(remotePipeDoc);                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("Recevied a pipe Advertisement :" + remotePipeAdv.getName());                    }                    element = message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.remPeerTag);                    if (element != null) {                        XMLDocument remotePeerDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(element);                        remotePeerAdv = (PeerAdvertisement) AdvertisementFactory.newAdvertisement(remotePeerDoc);                        if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                            LOG.fine("Recevied an Peer Advertisement :" + remotePeerAdv.getName());                        }                    } else {                        if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                            LOG.warning(" BAD connect response");                        }                        return;                    }                    element = message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.credTag);                    if (element != null) {                        CredDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(element);                    }                    if (pipeAdv.getType().equals(PipeService.UnicastSecureType) && (CredDoc == null || !checkCred(CredDoc))) {                        // we're done here                        if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                            LOG.severe("Missing remote credential doc");                        }                        return;                    }                    element = message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.reliableTag);                    if (element != null) {                        isReliable = Boolean.valueOf(element.toString());                    }                    boolean directSupported = false;                    element = message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.directSupportedTag);                    if (element != null) {                        directSupported = Boolean.valueOf(element.toString());                    }                    if (directSupported) {                        msgr = getDirectMessenger(group, remotePipeAdv, remotePeerAdv);                        if (msgr != null) {                            this.direct = true;                        } else {                            msgr = lightweightOutputPipe(group, remotePipeAdv, remotePeerAdv);                        }                    } else {                        msgr = lightweightOutputPipe(group, remotePipeAdv, remotePeerAdv);                    }                    if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                        LOG.fine("Reliability set to :" + isReliable);                    }                    if (isReliable && !direct) {                        createRLib();                    }                    synchronized (finalLock) {                        waiting = false;                        finalLock.notifyAll();                    }                } catch (IOException e) {                    if (Logging.SHOW_SEVERE && LOG.isLoggable(Level.SEVERE)) {                        LOG.log(Level.SEVERE, "failed to process response message", e);                    }                }                return;            }        }        if (isReliable && !direct) {            // let reliabilty deal with the message            receiveMessage(message);            return;        }        if (!hasClose(message)) {            push(event);        }    }    private boolean hasClose(Message message) {        // look for close request        MessageElement element = message.getMessageElement(JxtaServerPipe.nameSpace, JxtaServerPipe.closeTag);        if (element != null) {            try {                if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                    LOG.fine("Recevied a pipe close request, closing pipes");                }                if (ros != null) {                    ros.hardClose();                }                closePipe(false);            } catch (IOException ie) {                if (Logging.SHOW_WARNING && LOG.isLoggable(Level.WARNING)) {                    LOG.log(Level.WARNING, "failed during close", ie);                }            }            return true;        }        return false;    }    private void receiveMessage(Message message) {        Iterator<MessageElement> i = message.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_ACK);        if (i.hasNext()) {            if (ros != null) {                ros.recv(message);            }            return;        }        i = message.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_BLOCK);        if (i.hasNext()) {            // It can happen that we receive messages for the input stream            // while we have not finished creating it.            try {                synchronized (finalLock) {                    while (waiting) {                        finalLock.wait(timeout);                    }                }            } catch (InterruptedException ie) {// ignored            }            if (ris != null) {                ris.recv(message);            }        }    }    /**     * Gets the Maximum Retry Timeout of the reliability layer     *     * @return The maximum retry Timeout value     */    public synchronized int getMaxRetryTimeout() {        return maxRetryTimeout;    }    /**     * Gets the Maximum Retry Timeout of the reliability layer     *     * @param maxRetryTimeout The new maximum retry timeout value     * @throws IllegalArgumentException if maxRetryTimeout exceeds jxta platform maximum retry timeout     */    public synchronized void setMaxRetryTimeout(int maxRetryTimeout) {        if (maxRetryTimeout <= 0 || maxRetryTimeout > MAXRETRYTIMEOUT) {            throw new IllegalArgumentException(                    "Invalid Maximum retry timeout :" + maxRetryTimeout + " Exceed Global maximum retry timeout :"                            + MAXRETRYTIMEOUT);        }        this.maxRetryTimeout = maxRetryTimeout;    }    /**     * Gets the Retry Timeout of the reliability layer     *     * @return The retry Timeout value     */    public synchronized int getRetryTimeout() {        return retryTimeout;    }    /**     * Sets the Retry Timeout of the underlying reliability layer     * .     * In reliable mode it is possible for this call to block     * trying to obtain a lock on reliable input stream     *     * @param retryTimeout The new retry timeout value     * @throws IOException if an I/O error occurs     */    public synchronized void setRetryTimeout(int retryTimeout) throws IOException {        if (timeout <= 0) {            throw new IllegalArgumentException("Invalid Socket timeout :" + retryTimeout);        }        this.retryTimeout = retryTimeout;        if (outgoing != null) {            outgoing.setTimeout(retryTimeout);        }    }    /**     * When in reliable mode, gets the Reliable library window size     *     * @return The windowSize value     */    public synchronized int getWindowSize() {        return windowSize;    }    /**     * When in reliable mode, sets the Reliable library window size     *     * @param windowSize The new window size value     * @throws IOException if an I/O error occurs     */    public synchronized void setWindowSize(int windowSize) throws IOException {        if (isBound()) {            throw new IOException("Socket bound. Can not change the window size");        }        this.windowSize = windowSize;    }    /**     * This method is invoked by the Reliablity library for each incoming data message     *     * @param message Incoming message     */    public void processIncomingMessage(Message message) {        if (!hasClose(message)) {            PipeMsgEvent event = new PipeMsgEvent(this, message, (PipeID) inputPipe.getPipeID());            push(event);        }    }    private void push(PipeMsgEvent event) {        if (msgListener == null) {            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("push message onto queue");            }            queue.offer(event);        } else {            dequeue();            if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) {                LOG.fine("calling message listener");            }            msgListener.pipeMsgEvent(event);        }    }    /**     * Send a message     * <p/>     * <code>Messenger</code>     *     * @param msg Message to send to the remote side     * @return true if message was successfully enqueued     * @throws IOException if the underlying messenger breaks, either due to     *                     a physical address change, reliability issue.     * @see net.jxta.endpoint.Message     */    public boolean sendMessage(Message msg) throws IOException {        if (isReliable && !direct) {            int seqn = ros.send(msg);            return (seqn > 0);        } else {            try {                if (msgr instanceof TcpMessenger) {                    ((TcpMessenger) msgr).sendMessageDirect(msg, null, null, true);                    return true;                } else {                    return msgr.sendMessage(msg, null, null);                }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -