📄 jxtasocket.java
字号:
} this.windowSize = windowSize; queue.setMaxQueueSize(windowSize); } /** * Returns the closed state of the JxtaSocket. * * @return true if the socket has been closed */ public boolean isClosed() { synchronized (closeLock) { return closed; } } /* * Actual implementation of the JxtaSocketInputStream methods. */ /** * Performs on behalf of JxtaSocketOutputStream. * * @see java.io.OutputStream#write */ protected void write(byte[] buf, int offset, int length) throws IOException { checkState(); if (isStream) { // reliable mode, ros != null ros.write(buf, offset, length); return; } byte[] bufCopy = new byte[length]; System.arraycopy(buf, offset, bufCopy, 0,length); Message msg = new Message(); msg.addMessageElement(JxtaServerSocket.nameSpace, new ByteArrayMessageElement(JxtaServerSocket.dataTag, MimeMediaType.AOS, bufCopy, 0, length, null)); msgr.sendMessageB(msg, null, null); } /** * Performs on behalf of JxtaSocketInputStream. * * @see java.io.InputStream#read */ protected int read() throws IOException { if (isClosed()) { return -1; } checkState(); if (isStream) { return ris.read(); } int result = -1; InputStream in = getCurrentStream(); if (in != null) { result = in.read(); if (result == -1) { closeCurrentStream(); result = read(); } } return result; } /** * Performs on behalf of JxtaSocketInputStream. * * @see java.io.InputStream#read */ protected int read(byte b[], int off, int len) throws IOException { if (isClosed()) { return -1; } checkState(); if (isStream) { // If read is called in stream mode, then a JxtaSocketInputStream // exists. It is never created until ris exists. ris is never // zero'ed. Therefore ris is not null. return ris.read(b, off, len); } int result = -1; if (b == null) { throw new NullPointerException(); } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { throw new IndexOutOfBoundsException(); } else if (len == 0) { result = 0; } else { InputStream in = getCurrentStream(); if (in != null) { result = in.read(b, off, len); if (result == -1) { closeCurrentStream(); result = read(b,off,len); } } } return result; } /** * Returns the number of bytes that can be read * (or skipped over) from this input stream. * It's possible for this call to block, when called for the very first time * before the internal stream has ever been created (i.e. no data receieved) * @return the number of bytes that can be read from this input stream * without blocking * @throws IOException - if an I/O error occurs. */ protected int available() throws IOException { checkState(); if (isStream) { return ris.available(); } int result = 0; // if currentMsgStream is null, then next read call will block // so we return 0 to indicate this. InputStream in = getCurrentStream(); if (in != null) { result = in.available(); } return result; } private InputStream getCurrentStream() throws IOException { // despite the warning about thread-saftey and the // getCurrentInQueue method, if queue is closed and the number in // queue goes to zero, we're done (it shouldn't be going to go back // up again) synchronized(instrLock) { if (currentMsgStream == null) { MessageElement me; try { me = (MessageElement) queue.pop(timeout); } catch (InterruptedException e) { throw new IOException(e.toString()); } // queue.pop(0) returns null when and only when queue // closed and empty. The rest of the time it waits as // long as needed for something to return and returns // it. Oh, and, queue is never set to null. if (me != null) { currentMsgStream = me.getStream(); } } return currentMsgStream; } } private void closeCurrentStream() throws IOException { synchronized(instrLock) { if (currentMsgStream != null) { currentMsgStream.close(); currentMsgStream = null; } } } /** * throw a SocketException if closed or not bound */ private void checkState() throws SocketException { if (isClosed()) { throw new SocketException("Socket is closed"); } else if (!isBound()) { throw new SocketException("Socket not bound"); } } /** * {@inheritDoc} */ public synchronized int getSendBufferSize() throws SocketException { if (isClosed()) { throw new SocketException("Socket is closed"); } return outputBufferSize; } /** * {@inheritDoc} */ public synchronized void setSendBufferSize(int size) throws SocketException { if (isClosed()) { throw new SocketException("Socket is closed"); } if (size < 1) { throw new IllegalArgumentException("negative/zero buffer size"); } if (osCreated) { throw new SocketException("Can not reset buffersize, OutputStream is already created"); } outputBufferSize = size; } /** * {@inheritDoc} */ public synchronized int getReceiveBufferSize() throws SocketException { checkState(); // this is just rough size return getOutputStreamBufferSize(); } /** * {@inheritDoc} */ public boolean getKeepAlive() throws SocketException { if (isClosed()) { throw new SocketException("Socket is closed"); } return false; } /** * {@inheritDoc} */ public int getTrafficClass() throws SocketException { throw new SocketException("TrafficClass not yet defined"); } /** * {@inheritDoc} */ public void setTrafficClass(int tc) throws SocketException { // a place holder when and if we decide to add hints regarding // flow info hints such as (IPTOS_LOWCOST (0x02), IPTOS_RELIABILITY (0x04), etc throw new SocketException("TrafficClass not yet defined"); } /** * {@inheritDoc} */ public boolean isInputShutdown() { if (isClosed()) { return true; } if (isStream) { return ris.isInputShutdown(); } else { return isClosed(); } } /** * {@inheritDoc} */ public void sendUrgentData(int data) throws IOException { throw new SocketException ("Urgent data not supported"); } /** * {@inheritDoc} */ public void setOOBInline(boolean state) throws SocketException { throw new SocketException("Enable/disable OOBINLINE supported"); } /** * {@inheritDoc} */ public void setKeepAlive(boolean state) throws SocketException { if (isClosed()) { throw new SocketException("Socket is closed"); } throw new SocketException("Operation not supported"); } /** * {@inheritDoc} */ public void shutdownInput() throws IOException { if (isStream) { if (ris != null) { ris.close(); } } // close pipe, and queue in.close(); queue.close(); } /** * {@inheritDoc} */ public void shutdownOutput() throws IOException { if (isStream) { long quitAt = System.currentTimeMillis() + timeout; while (true) { if (ros == null) { // done break; } // stop ros from taking any new message ros.setClosing(); if (ros.getMaxAck() == ros.getSeqNumber()) { break; } // By default wait forever. long left = 0; // compute remaining timeout if (timeout != 0) { left = quitAt - System.currentTimeMillis(); if (left < 0) { // too late sendClose(); msgr.close(); throw new IOException("shutdownOutput timeout"); } } try { ros.waitQueueEvent(left); } catch (InterruptedException ie) { // give up, then. throw new IOException("shutdownOutput interrupted"); } } } } /** * {@inheritDoc} */ public boolean isConnected() { return isBound(); } /** * {@inheritDoc} */ public SocketAddress getLocalSocketAddress() { return new JxtaSocketAddress(group, myPipeAdv, group.getPeerID()); } /** * {@inheritDoc} */ public SocketAddress getRemoteSocketAddress() { return new JxtaSocketAddress(group, pipeAdv, peerid); } /** * {@inheritDoc} */ protected synchronized void finalize() throws Throwable { super.finalize(); if (!closed) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("JxtaSocket is being finalized without being previously closed. This is likely a users bug."); } } close(); } /** *{@inheritDoc} */ public String toString() { if (isConnected()) { return "JxtaSocket[pipe id=" + pipeAdv.getPipeID() + "]"; } return "JxtaSocket[unconnected]"; } private class CloseListener implements OutgoingMessageEventListener { /** * {@inheritDoc} */ public void messageSendFailed(OutgoingMessageEvent event) { if (event.getFailure() == null) { if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Event Failure not available"); } } if (LOG.isEnabledFor(Level.WARN)) { LOG.warn("Failed to send a close message"); } } /** * {@inheritDoc} */ public void messageSendSucceeded(OutgoingMessageEvent event) { if (LOG.isEnabledFor(Level.WARN)) { LOG.debug("Close message successfully sent"); } synchronized(closeLock) { closeLock.notify(); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -