⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jxtabidipipe.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
                closePipe();            } catch (IOException ie) {                if (LOG.isEnabledFor(Level.WARN)) {                    LOG.warn("failed during close", ie);                }            }            return true;        }        return false;    }    private void receiveMessage(Message message) {        Iterator i =            message.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_ACK);        if (i.hasNext()) {            if (ros != null) {                ros.recv(message);            }            return;        }        i = message.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_BLOCK);        if (i.hasNext()) {            // It can happen that we receive messages for the input stream            // while we have not finished creating it.            try {                synchronized (finalLock) {                    while (waiting) {                        finalLock.wait(timeout);                    }                }            } catch (InterruptedException ie) {}            if (ris != null) {                ris.recv(message);            }        }    }    /**     *  Gets the Maximum Retry Timeout of the reliability layer     *     * @return The maximum retry Timeout value     */    public synchronized int getMaxRetryTimeout() {        return maxRetryTimeout;    }    /**     *  Gets the Maximum Retry Timeout of the reliability layer     *     * @param  maxRetryTimeout The new maximum retry timeout value     * @exception  IllegalArgumentException if maxRetryTimeout exceeds jxta platform maximum retry timeout     */    public synchronized void setMaxRetryTimeout(int maxRetryTimeout) {        if (maxRetryTimeout <= 0 || maxRetryTimeout > MAXRETRYTIMEOUT) {            throw new IllegalArgumentException("Invalid Maximum retry timeout :"+maxRetryTimeout+" Exceed Global maximum retry timeout :"+MAXRETRYTIMEOUT);        }        this.maxRetryTimeout = maxRetryTimeout;    }    /**     *  Gets the Retry Timeout of the reliability layer     *     * @return The retry Timeout value     */    public synchronized int getRetryTimeout() {        return retryTimeout;    }    /**     *  Sets the Retry Timeout of the underlying reliability layer     *  .     *  In reliable mode it is possible for this call to block      *  trying to obtain a lock on reliable input stream     *     * @param  retryTimeout The new retry timeout value     * @exception  IOException  if an I/O error occurs     */    public synchronized void setRetryTimeout(int retryTimeout) throws IOException {        if (timeout <= 0) {            throw new IllegalArgumentException("Invalid Socket timeout :"+retryTimeout);        }        this.retryTimeout = retryTimeout;        if (outgoing != null) {            outgoing.setTimeout(retryTimeout);        }        if (ris != null) {            ris.setTimeout(retryTimeout);        }    }        /**     *  When in reliable mode, gets the Reliable library window size     *     * @return                  The windowSize value     * @exception  IOException  if an I/O error occurs     */    public synchronized int getWindowSize() {        return windowSize;    }    /**     *  When in reliable mode, sets the Reliable library window size     *     * @param  windowSize              The new window size value     * @exception  IOException  if an I/O error occurs     */    public synchronized void setWindowSize(int windowSize) throws IOException {        if (isBound()) {            throw new IOException("Socket bound. Can not change the window size");        }        this.windowSize = windowSize;        queue.setMaxQueueSize(windowSize);    }    /**     * This method is invoked by the Reliablity library for each incoming data message     *     * @param message Incoming message     */    public void processIncomingMessage(Message message) {        if (!hasClose(message)) {            PipeMsgEvent event = new PipeMsgEvent(this, message, (PipeID) in.getPipeID());            push(event);        }    }    private void push(PipeMsgEvent event) {        if (msgListener == null) {            try {                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("push message onto queue");                }                queue.push(event, -1);            } catch (InterruptedException ie) {}        }        else {            dequeue();            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("calling message listener");            }            msgListener.pipeMsgEvent(event);        }    }    /**     * Send a message     * In non reliable mode the message is sent directly over a     * <code>Messenger</code>     *      *     *@param  msg  Message to send to the remote side     *@return true if message was succussfully enqueued     * @see net.jxta.endpoint.Message     */    public boolean sendMessage(Message msg) throws IOException {        if (isReliable) {             int seqn = ros.send(msg);             return (seqn > 0);        } else {            return msgr.sendMessage(msg, null, null);        }    }    private void dequeue() {        while (queue != null && queue.getCurrentInQueue() > 0) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.debug("dequeing messages onto message listener");            }            msgListener.pipeMsgEvent((PipeMsgEvent) queue.pop());        }    }    /**     * {@inheritDoc}     */    public void outputPipeEvent(OutputPipeEvent event) {        OutputPipe op = event.getOutputPipe();        if (op.getAdvertisement() == null) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("The output pipe has no internal pipe advertisement. Continueing anyway.");            }        }        if (op.getAdvertisement() == null || pipeAdv.equals(op.getAdvertisement())) {            synchronized (acceptLock) {                // modify op within lock to prevent a race with the if.                if (connectOutpipe == null) {                    connectOutpipe = op;                    // set to null to avoid closure                    op = null;                }                acceptLock.notifyAll();            }            // Ooops one too many, we were too fast re-trying.            if (op != null) {                op.close();            }        } else {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("Unexpected OutputPipe :"+op);            }        }    }    /**     *  A lightweight output pipe constructor, note the return type     * Since all the info needed is available, there's no need for to      * use the pipe service to resolve the pipe we have all we need     * to construct a messenger.     *     *@param  group    group context     *@param  pipeAdv  Remote Pipe Advertisement     *@param  peer     Remote Peer advertisement     *@return          Messenger     */    protected static Messenger lightweightOutputPipe(PeerGroup group,                                                     PipeAdvertisement pipeAdv,                                                     PeerAdvertisement peer) {        EndpointService endpoint = group.getEndpointService();        ID opId = pipeAdv.getPipeID();        String destPeer = (peer.getPeerID().getUniqueValue()).toString();        // Get an endpoint messenger to that address        EndpointAddress addr;        if (pipeAdv.getType().equals(PipeService.UnicastType)) {            addr = new EndpointAddress("jxta",                                       destPeer,                                       "PipeService",                                       opId.toString());        } else if (pipeAdv.getType().equals(PipeService.UnicastSecureType)) {                addr = new EndpointAddress("jxtatls",                                           destPeer,                                           "PipeService",                                           opId.toString());        } else {            // not a supported type            return null;        }        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Creating a lightweightOutputPipe()");        }        return endpoint.getMessenger(addr);    }    /**     *  Not implemented yet     */    protected boolean checkCred(StructuredDocument cred) {        //FIXME need to check credentials        return true;    }    /**     *  Send a close message to the remote side     */    private void sendClose() {        Message msg = new Message();        msg.addMessageElement(JxtaServerPipe.nameSpace,                              new StringMessageElement(JxtaServerPipe.closeTag,                                                       "close",                                                       null));        try {            sendMessage(msg);        } catch (IOException ie) {            if (LOG.isEnabledFor(Level.DEBUG)) {                LOG.error("failed during close", ie);            }        }    }    /**     * Returns the message listener for this pipe     * @return PipeMsgListener     * @deprecated use getMessageListener instead     */    public PipeMsgListener getListener() {        return getMessageListener();    }    /**     * Returns the message listener for this pipe     * @return PipeMsgListener     *     */    public PipeMsgListener getMessageListener() {        return msgListener;    }    /**     * Sets message listener for a pipe spawned by the JxtaServerPipe.     * There is a window where a message could arrive prior to listener being     * registered therefore a message queue is created to queue messages, once     * a listener is registered these messages will be dequeued by calling the     * listener until the queue is empty     *     * @param msgListener New value of property listener.     * @deprecated use setMessageListener instead     */    public void setListener(PipeMsgListener msgListener) {        setMessageListener(msgListener);    }      /**     * Sets message listener for a pipe spawned by the JxtaServerPipe.     * There is a window where a message could arrive prior to listener being     * registered therefore a message queue is created to queue messages, once     * a listener is registered these messages will be dequeued by calling the     * listener until the queue is empty     *     * @param msgListener New value of property listener.     *     */    public void setMessageListener(PipeMsgListener msgListener) {        this.msgListener = msgListener;        // if there are messages enqueued then dequeue them onto the msgListener        dequeue();    }        /**     * Sets a Pipe event listener, set listener to null to unset the listener     *     * @param eventListener New value of property listener.     * @deprecated use setPipeEventListener instead     */    public void setListener(PipeEventListener eventListener) {        setPipeEventListener(eventListener);    }    /**     * Sets a Pipe event listener, set listener to null to unset the listener     *     * @param eventListener New value of property listener.     *     */    public void setPipeEventListener(PipeEventListener eventListener) {        this.eventListener = eventListener;    }    /**     * Returns the Pipe event listener for this pipe     * @return PipeMsgListener     *     */    public PipeEventListener getPipeEventListener() {        return eventListener;    }    /**     * Gets a message from the queue. If no Object is immediately available,     * then wait the specified amount of time for a message to be inserted.     *     * @param timeout   Amount of time to wait in milliseconds for an object to     * be available. Per Java convention, a timeout of zero (0) means wait an     * infinite amount of time. Negative values mean do not wait at all.     * @return The next message in the queue., if a listener is registered calls      * to this method will return null     * @throws InterruptedException    if the operation is interrupted before     * the timeout interval is completed.     */    public Message getMessage(int timeout) throws InterruptedException {        if (queue == null || msgListener != null) {            return null;        } else {            PipeMsgEvent ev = (PipeMsgEvent) queue.pop(timeout);            if (ev != null) {                return ev.getMessage();            } else {                return null;            }        }    }    /**     * Returns the Assigned PipeAdvertisement     * @return the Assigned PipeAdvertisement     */    public PipeAdvertisement getPipeAdvertisement() {        return pipeAdv;    }    /**     *  {@inheritDoc}     *     *  <p/>Closes the JxtaBiDiPipe.     */    protected synchronized void finalize() throws Throwable {        if(!closed) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("JxtaBiDiPipe is being finalized without being previously closed. This is likely a users bug.");            }        }        close();                super.finalize();    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -