⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jxtasocket.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
                long left = 0;                // If timeout is not zero. Then compute the waiting time                // left.                if (timeout != 0) {                    left = quitAt - System.currentTimeMillis();                    if (left < 0) {                        // Too late                        closeCommon();                        throw new IOException("Close timeout");                    }                }                try {                    if (!ros.isQueueEmpty()) {                        if (LOG.isEnabledFor(Level.DEBUG)) {                            LOG.debug("Waiting for Output stream queue event");                        }                        ros.waitQueueEvent(left);                    }                    break;                } catch (InterruptedException ie) {                    // give up, then.                    throw new IOException("Close interrupted");                }            }        }        closeCommon();    }    /**     * In stream mode, closes everything but the input     * stream. closeFromRemote() leaves it open until EOF is     * reached. That is, until all messages in the input queue have     * been read. At which point, ris will close() itself.     */    protected void closeCommon() throws IOException {        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Shutting down socket");        }        queue.interrupt();        if (isStream) {            // close the reliable streams.            if (ros != null) {                ros.close();            }            if (ris != null) {                ris.close();            }        }        // close pipe, messenger, and queue        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Closing input pipe");        }        in.close();        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Closing messenger");        }        msgr.close();        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Closing message queue");        }        queue.close();        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Close complete");        }    }    /**     *  Sets the inputPipe attribute of the JxtaSocket object     *     *@param  in  The new inputPipe value     */    protected void setInputPipe(InputPipe in) {        this.in = in;    }    /**     * {@inheritDoc}     */    public void pipeMsgEvent(PipeMsgEvent event) {        Message message = event.getMessage();        if (message == null) {            return;        }        MessageElement element = null;        if (!bound) {            // look for a remote pipe answer            element = (MessageElement)                      message.getMessageElement(JxtaServerSocket.nameSpace,                                                JxtaServerSocket.remPipeTag);            if (element != null) {                // connect response                try {                    PeerAdvertisement peerAdv = null;                    InputStream in = element.getStream();                    PipeAdvertisement pa = (PipeAdvertisement)                                           AdvertisementFactory.newAdvertisement(element.getMimeType(), in);                    element = message.getMessageElement(JxtaServerSocket.nameSpace, JxtaServerSocket.remPeerTag);                    if (element != null) {                        in = element.getStream();                        peerAdv = (PeerAdvertisement)                                  AdvertisementFactory.newAdvertisement(element.getMimeType(), in);                    } else {                        return;                    }                    element = message.getMessageElement (JxtaServerSocket.nameSpace, JxtaServerSocket.credTag);                    if (element != null) {                        in = element.getStream();                        credentialDoc = (StructuredDocument)                                        StructuredDocumentFactory.newStructuredDocument(element.getMimeType(), in);                    }                    element = message.getMessageElement (JxtaServerSocket.nameSpace, JxtaServerSocket.streamTag);                    if (element != null) {                        isStream = (element.toString().equals("true"));                    }                    msgr = lightweightOutputPipe(group, pa, peerAdv);                    if (msgr == null) {                        // let the connection attempt timeout                        if (LOG.isEnabledFor(Level.ERROR)) {                            LOG.error("Unable to obtain a back messenger");                        }                        return;                    }                    if (isStream) {                        // Create the input stream right away, otherwise                        // the first few messages from remote will be lost, unless                        // we use an intermediate queue.                        // FIXME: it would be even better if we could create the                        // input stream BEFORE having the output pipe resolved, but                        // that would force us to have the MsrgAdaptor block                        // until we can give it the real pipe or msgr... later.                        createRis();                    }                    synchronized (finalLock) {                        waiting = false;                        finalLock.notifyAll();                    }                } catch (IOException e) {                    if (LOG.isEnabledFor(Level.ERROR)) {                        LOG.error("failed to process response message", e);                    }                }            }        }        //net.jxta.impl.util.MessageUtil.printMessageStats(message, true);        // look for close request        element = (MessageElement)                  message.getMessageElement(JxtaServerSocket.nameSpace,                                            JxtaServerSocket.closeTag);        if (element != null) {            if (element.toString().equals("close")) {                try {                    if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("Received a close request");                    }                    closeFromRemote();                } catch (IOException ie) {                    if (LOG.isEnabledFor(Level.ERROR)) {                        LOG.error("failed during closeFromRemote", ie);                    }                }            } else if (element.toString().equals("closeACK")) {                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("Received a close acknowledgement");                }                synchronized(closeLock) {                    closeLock.notify();                }            }        }        if (!isStream) {            // isthere data ?            element = (MessageElement)                      message.getMessageElement(JxtaServerSocket.nameSpace, JxtaServerSocket.dataTag);            if (element == null) {                return;            }            try {                queue.push(element, -1);            } catch (InterruptedException e) {                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("Interrupted", e);                }            }            return;        }        Iterator i =            message.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_ACK);        if (i != null && i.hasNext()) {            if (ros != null) {                ros.recv(message);            }            return;        }        i = message.getMessageElements(Defs.NAMESPACE, Defs.MIME_TYPE_BLOCK);        if (i != null && i.hasNext()) {            // It can happen that we receive messages for the input stream            // while we have not finished creating it.            try {                synchronized (finalLock) {                    while (waiting) {                        finalLock.wait(timeout);                    }                }            } catch (InterruptedException ie) {}            if (ris != null) {                ris.recv(message);            }        }    }    /**     * {@inheritDoc}     */    public void outputPipeEvent(OutputPipeEvent event) {        OutputPipe op = event.getOutputPipe();        if (op.getAdvertisement() == null) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("The output pipe has no internal pipe advertisement. Continueing anyway.");            }        }        if (op.getAdvertisement() == null || pipeAdv.equals(op.getAdvertisement())) {            synchronized (acceptLock) {                // modify op within lock to prevent a race with the if.                if (connectOutpipe == null) {                    connectOutpipe = op;                    op = null; // if not null, will be closed.                }                acceptLock.notifyAll();            }            // Ooops one too many, we were too fast re-trying.            if (op != null) {                op.close();            }        } else {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("Unexpected OutputPipe :"+op);            }        }    }    /**     *  A lightweight output pipe constructor, note the return type     *  Since all the info needed is available, there's no need for to use the pipe service     *  to resolve the pipe we have all we need to construct a messenger.     *     *@param  group    group context     *@param  pipeAdv  Remote Pipe Advertisement     *@param  peer     Remote Peer Advertisment     *@return          Messenger     */    protected static Messenger lightweightOutputPipe(PeerGroup group,                                                     PipeAdvertisement pipeAdv,                                                     PeerAdvertisement peer) {        EndpointService endpoint = group.getEndpointService();        ID opId = pipeAdv.getPipeID();        String destPeer = (peer.getPeerID().getUniqueValue()).toString();        // Get an endpoint messenger to that address        EndpointAddress addr;        if (pipeAdv.getType().equals(PipeService.UnicastType)) {            addr = new EndpointAddress("jxta", destPeer, "PipeService", opId.toString());        } else if (pipeAdv.getType().equals(PipeService.UnicastSecureType)) {            addr = new EndpointAddress("jxtatls", destPeer, "PipeService", opId.toString());        } else {            // not a supported type            throw new IllegalArgumentException(pipeAdv.getType()+" is not a supported pipe type");        }        return endpoint.getMessenger(addr, null);    }    private void sendClose() {        Message msg = new Message();        msg.addMessageElement(JxtaServerSocket.nameSpace,                              new StringMessageElement(JxtaServerSocket.closeTag,                                                       "close",                                                       null));        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Sending a close request");        }        synchronized(closeLock) {            try {                //allow for any last bits to hit the wire                closeLock.wait(200);            } catch (InterruptedException ie) {}        }        msgr.sendMessage(msg, null, null, new CloseListener());        synchronized(closeLock) {            try {                closeLock.wait(timeout);            } catch (InterruptedException ie) {                if (LOG.isEnabledFor(Level.DEBUG)) {                    LOG.debug("wait for SendClose interrupted");                }            }        }    }    private void sendCloseACK() {        Message msg = new Message();        msg.addMessageElement(JxtaServerSocket.nameSpace,                              new StringMessageElement(JxtaServerSocket.closeTag,                                                       "closeACK",                                                       null));        if (LOG.isEnabledFor(Level.DEBUG)) {            LOG.debug("Sending a close acknowledgement");        }        msgr.sendMessage(msg, null, null, new CloseListener());    }    /**     * {@inheritDoc}     */    public synchronized int getSoTimeout() throws SocketException {        if (isClosed()) {            throw new SocketException("Socket is closed");        }        return soTimeout;    }    /**     * {@inheritDoc}     */    public synchronized void setSoTimeout(int soTimeout) throws SocketException {        if (soTimeout < 0) {            throw new IllegalArgumentException("Invalid Socket timeout :"+soTimeout);        }        this.soTimeout = soTimeout;        if (ris != null) {            ris.setTimeout(soTimeout);        }    }    /**     *  Gets the Maximum Retry Timeout of the reliability layer     *     * @return The maximum retry Timeout value     */    public synchronized int getMaxRetryTimeout() {        return maxRetryTimeout;    }    /**     *  Gets the Maximum Retry Timeout of the reliability layer     *     * @param  maxRetryTimeout The new maximum retry timeout value     * @exception  IllegalArgumentException if maxRetryTimeout exceeds jxta platform maximum retry timeout     */    public synchronized void setMaxRetryTimeout(int maxRetryTimeout) {        if (maxRetryTimeout <= 0 || maxRetryTimeout > MAXRETRYTIMEOUT) {            throw new IllegalArgumentException("Invalid Maximum retry timeout :"+maxRetryTimeout+" Exceed Global maximum retry timeout :"+MAXRETRYTIMEOUT);        }        this.maxRetryTimeout = maxRetryTimeout;    }    /**     *  Gets the Retry Timeout of the reliability layer     *     * @return The retry Timeout value     */    public synchronized int getRetryTimeout() {        return retryTimeout;    }    /**     *  Sets the Retry Timeout of the underlying reliability layer     *  .     *  In reliable mode it is possible for this call to block      *  trying to obtain a lock on reliable input stream     *     * @param  retryTimeout The new retry timeout value     * @exception  SocketException  if an I/O error occurs     */    public synchronized void setRetryTimeout(int retryTimeout) throws SocketException {        if (retryTimeout <= 0 || retryTimeout > maxRetryTimeout) {            throw new IllegalArgumentException("Invalid Retry Socket timeout :"+retryTimeout);        }        this.retryTimeout = retryTimeout;        if (outgoing != null) {            outgoing.setTimeout(retryTimeout);        }    }    /**     *  When in reliable mode, gets the Reliable library window size     *     * @return                  The windowSize value     * @exception  IOException  if an I/O error occurs     */    public synchronized int getWindowSize() {        return windowSize;    }    /**     *  When in reliable mode, sets the Reliable library window size     *     * @param  windowSize              The new window size value     * @exception  IOException  if an I/O error occurs     */    public synchronized void setWindowSize(int windowSize) throws SocketException {        if (isBound()) {            throw new SocketException("Socket bound. Can not change the window size");

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -