⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 jxtasocket.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
        }        this.windowSize = windowSize;        queue.setMaxQueueSize(windowSize);    }    /**     * Returns the closed state of the JxtaSocket.     *     * @return    true if the socket has been closed     */    public boolean isClosed() {        synchronized (closeLock) {            return closed;        }    }    /*     * Actual implementation of the JxtaSocketInputStream methods.     */    /**     * Performs on behalf of JxtaSocketOutputStream.     *     * @see java.io.OutputStream#write     */    protected void write(byte[] buf, int offset, int length) throws IOException {        checkState();        if (isStream) {            // reliable mode, ros != null            ros.write(buf, offset, length);            return;        }        byte[] bufCopy = new byte[length];        System.arraycopy(buf, offset, bufCopy, 0,length);        Message msg = new Message();        msg.addMessageElement(JxtaServerSocket.nameSpace,                              new ByteArrayMessageElement(JxtaServerSocket.dataTag,                                                          MimeMediaType.AOS,                                                          bufCopy,                                                          0,                                                          length,                                                          null));        msgr.sendMessageB(msg, null, null);    }    /**     * Performs on behalf of JxtaSocketInputStream.     *     * @see java.io.InputStream#read     */    protected int read() throws IOException {        if (isClosed()) {            return -1;        }        checkState();        if (isStream) {            return ris.read();        }        int result = -1;        InputStream in = getCurrentStream();        if (in != null) {            result = in.read();            if (result == -1) {                closeCurrentStream();                result = read();            }        }        return result;    }    /**     * Performs on behalf of JxtaSocketInputStream.     *     * @see java.io.InputStream#read     */    protected int read(byte b[], int off, int len) throws IOException {        if (isClosed()) {            return -1;        }        checkState();        if (isStream) {            // If read is called in stream mode, then a JxtaSocketInputStream            // exists. It is never created until ris exists. ris is never            // zero'ed. Therefore ris is not null.            return ris.read(b, off, len);        }        int result = -1;        if (b == null) {            throw new NullPointerException();        } else if ((off < 0) || (off > b.length) || (len < 0) ||                   ((off + len) > b.length) || ((off + len) < 0)) {            throw new IndexOutOfBoundsException();        } else if (len == 0) {            result = 0;        } else {            InputStream in = getCurrentStream();            if (in != null) {                result = in.read(b, off, len);                if (result == -1) {                    closeCurrentStream();                    result = read(b,off,len);                }            }        }        return result;    }    /**     * Returns the number of bytes that can be read      * (or skipped over) from this input stream.     * It's possible for this call to block, when called for the very first time     * before the internal stream has ever been created (i.e. no data receieved)     * @return the number of bytes that can be read from this input stream      *          without blocking     * @throws IOException - if an I/O error occurs.     */    protected int available() throws IOException {        checkState();        if (isStream) {            return ris.available();        }        int result = 0;        // if currentMsgStream is null, then next read call will block        // so we return 0 to indicate this.        InputStream in = getCurrentStream();        if (in != null) {            result = in.available();        }        return result;    }    private InputStream getCurrentStream() throws IOException {        // despite the warning about thread-saftey and the        // getCurrentInQueue method, if queue is closed and the number in        // queue goes to zero, we're done (it shouldn't be going to go back        // up again)        synchronized(instrLock) {            if (currentMsgStream == null) {                MessageElement me;                try {                    me = (MessageElement) queue.pop(timeout);                } catch (InterruptedException e) {                    throw new IOException(e.toString());                }                // queue.pop(0) returns null when and only when queue                // closed and empty. The rest of the time it waits as                // long as needed for something to return and returns                // it. Oh, and, queue is never set to null.                if (me != null) {                    currentMsgStream = me.getStream();                }            }            return currentMsgStream;        }    }    private void closeCurrentStream() throws IOException {        synchronized(instrLock) {            if (currentMsgStream != null) {                currentMsgStream.close();                currentMsgStream = null;            }        }    }    /**     * throw a SocketException if closed or not bound     */    private void checkState() throws SocketException {        if (isClosed()) {            throw new SocketException("Socket is closed");        } else if (!isBound()) {            throw new SocketException("Socket not bound");        }    }    /**     * {@inheritDoc}     */    public synchronized int getSendBufferSize()    throws SocketException {        if (isClosed()) {            throw new SocketException("Socket is closed");        }        return outputBufferSize;    }    /**     * {@inheritDoc}     */    public synchronized void setSendBufferSize(int size) throws SocketException {        if (isClosed()) {            throw new SocketException("Socket is closed");        }        if (size < 1) {            throw new IllegalArgumentException("negative/zero buffer size");        }        if (osCreated) {            throw new SocketException("Can not reset buffersize, OutputStream is already created");        }        outputBufferSize = size;    }    /**     * {@inheritDoc}     */    public synchronized int getReceiveBufferSize() throws SocketException {        checkState();        // this is just rough size        return getOutputStreamBufferSize();    }    /**     * {@inheritDoc}     */    public boolean getKeepAlive() throws SocketException {        if (isClosed()) {            throw new SocketException("Socket is closed");        }        return false;    }    /**     * {@inheritDoc}     */    public int getTrafficClass() throws SocketException {        throw new SocketException("TrafficClass not yet defined");    }    /**     * {@inheritDoc}     */    public void setTrafficClass(int tc) throws SocketException {        // a place holder when and if we decide to add hints regarding        // flow info hints such as (IPTOS_LOWCOST (0x02), IPTOS_RELIABILITY (0x04), etc        throw new SocketException("TrafficClass not yet defined");    }    /**     * {@inheritDoc}     */    public boolean isInputShutdown() {        if (isClosed()) {            return true;        }        if (isStream) {            return ris.isInputShutdown();        } else {            return isClosed();        }    }    /**     * {@inheritDoc}     */    public void sendUrgentData(int data) throws IOException {        throw new SocketException ("Urgent data not supported");    }    /**     * {@inheritDoc}     */    public void setOOBInline(boolean state) throws SocketException {        throw new SocketException("Enable/disable OOBINLINE supported");    }    /**     * {@inheritDoc}     */    public void setKeepAlive(boolean state) throws SocketException {        if (isClosed()) {            throw new SocketException("Socket is closed");        }        throw new SocketException("Operation not supported");    }    /**     * {@inheritDoc}     */    public void shutdownInput() throws IOException {        if (isStream) {            if (ris != null) {                ris.close();            }        }        // close pipe, and queue        in.close();        queue.close();    }    /**     * {@inheritDoc}     */    public void shutdownOutput() throws IOException {        if (isStream) {            long quitAt = System.currentTimeMillis() + timeout;            while (true) {                if (ros == null) {                    // done                    break;                }                // stop ros from taking any new message                ros.setClosing();                if (ros.getMaxAck() == ros.getSeqNumber()) {                    break;                }                // By default wait forever.                long left = 0;                // compute remaining timeout                if (timeout != 0) {                    left = quitAt - System.currentTimeMillis();                    if (left < 0) {                        // too late                        sendClose();                        msgr.close();                        throw new IOException("shutdownOutput timeout");                    }                }                try {                    ros.waitQueueEvent(left);                } catch (InterruptedException ie) {                    // give up, then.                    throw new IOException("shutdownOutput interrupted");                }            }        }    }    /**     * {@inheritDoc}     */    public boolean isConnected() {        return isBound();    }    /**     * {@inheritDoc}     */    public SocketAddress getLocalSocketAddress() {        return new JxtaSocketAddress(group, myPipeAdv, group.getPeerID());    }    /**     * {@inheritDoc}     */    public SocketAddress getRemoteSocketAddress() {        return new JxtaSocketAddress(group, pipeAdv, peerid);    }    /**     * {@inheritDoc}     */    protected synchronized void finalize() throws Throwable {        super.finalize();        if (!closed) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("JxtaSocket is being finalized without being previously closed. This is likely a users bug.");            }        }        close();    }    /**     *{@inheritDoc}     */    public String toString() {        if (isConnected()) {            return "JxtaSocket[pipe id=" + pipeAdv.getPipeID() + "]";        }        return "JxtaSocket[unconnected]";    }    private class CloseListener implements OutgoingMessageEventListener {        /**         * {@inheritDoc}         */        public void messageSendFailed(OutgoingMessageEvent event) {            if (event.getFailure() == null) {                if (LOG.isEnabledFor(Level.WARN)) {                    LOG.warn("Event Failure not available");                }            }            if (LOG.isEnabledFor(Level.WARN)) {                LOG.warn("Failed to send a close message");            }        }        /**         * {@inheritDoc}         */        public void messageSendSucceeded(OutgoingMessageEvent event) {            if (LOG.isEnabledFor(Level.WARN)) {                LOG.debug("Close message successfully sent");            }            synchronized(closeLock) {                closeLock.notify();            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -