📄 socketorchannelconnectionimpl.java
字号:
} catch (IOException e) { if (orb.transportDebugFlag) { dprint(".finishReadingBits: " + this + ": sendMessageError: IOException: " + e, e); } } // REVISIT - make sure reader thread is killed. orb.getTransportManager().getSelector(0).unregisterForEvent(this); // Notify anyone waiting. purgeCalls(wrapper.connectionAbort(ex), true, false); // REVISIT //keepRunning = false; // REVISIT - if this is called after purgeCalls then // the state of the socket is ABORT so the writeLock // in close throws an exception. It is ignored but // causes IBM (screen scraping) tests to fail. //close(); } finally { if (orb.transportDebugFlag) { dprint(".finishReadingBits<-: " + this); } } return null; } protected boolean dispatch(CorbaMessageMediator messageMediator) { try { if (orb.transportDebugFlag) { dprint(".dispatch->: " + this); } // // NOTE: // // This call is the transition from the tranport block // to the protocol block. // boolean result = messageMediator.getProtocolHandler() .handleRequest(messageMediator); return result; } catch (ThreadDeath td) { if (orb.transportDebugFlag) { dprint(".dispatch: ThreadDeath", td ); } try { purgeCalls(wrapper.connectionAbort(td), false, false); } catch (Throwable t) { if (orb.transportDebugFlag) { dprint(".dispatch: purgeCalls: Throwable", t); } } throw td; } catch (Throwable ex) { if (orb.transportDebugFlag) { dprint(".dispatch: Throwable", ex ) ; } try { if (ex instanceof INTERNAL) { sendMessageError(GIOPVersion.DEFAULT_VERSION); } } catch (IOException e) { if (orb.transportDebugFlag) { dprint(".dispatch: sendMessageError: IOException", e); } } purgeCalls(wrapper.connectionAbort(ex), false, false); // REVISIT //keepRunning = false; } finally { if (orb.transportDebugFlag) { dprint(".dispatch<-: " + this); } } return true; } public boolean shouldUseDirectByteBuffers() { return getSocketChannel() != null; } public ByteBuffer read(int size, int offset, int length, long max_wait_time) throws IOException { if (shouldUseDirectByteBuffers()) { ByteBuffer byteBuffer = orb.getByteBufferPool().getByteBuffer(size); if (orb.transportDebugFlag) { // print address of ByteBuffer gotten from pool int bbAddress = System.identityHashCode(byteBuffer); StringBuffer sb = new StringBuffer(80); sb.append(".read: got ByteBuffer id ("); sb.append(bbAddress).append(") from ByteBufferPool."); String msgStr = sb.toString(); dprint(msgStr); } byteBuffer.position(offset); byteBuffer.limit(size); readFully(byteBuffer, length, max_wait_time); return byteBuffer; } byte[] buf = new byte[size]; readFully(getSocket().getInputStream(), buf, offset, length, max_wait_time); ByteBuffer byteBuffer = ByteBuffer.wrap(buf); byteBuffer.limit(size); return byteBuffer; } public ByteBuffer read(ByteBuffer byteBuffer, int offset, int length, long max_wait_time) throws IOException { int size = offset + length; if (shouldUseDirectByteBuffers()) { if (! byteBuffer.isDirect()) { throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket(); } if (size > byteBuffer.capacity()) { if (orb.transportDebugFlag) { // print address of ByteBuffer being released int bbAddress = System.identityHashCode(byteBuffer); StringBuffer bbsb = new StringBuffer(80); bbsb.append(".read: releasing ByteBuffer id (") .append(bbAddress).append(") to ByteBufferPool."); String bbmsg = bbsb.toString(); dprint(bbmsg); } orb.getByteBufferPool().releaseByteBuffer(byteBuffer); byteBuffer = orb.getByteBufferPool().getByteBuffer(size); } byteBuffer.position(offset); byteBuffer.limit(size); readFully(byteBuffer, length, max_wait_time); byteBuffer.position(0); byteBuffer.limit(size); return byteBuffer; } if (byteBuffer.isDirect()) { throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket(); } byte[] buf = new byte[size]; readFully(getSocket().getInputStream(), buf, offset, length, max_wait_time); return ByteBuffer.wrap(buf); } public void readFully(ByteBuffer byteBuffer, int size, long max_wait_time) throws IOException { int n = 0; int bytecount = 0; long time_to_wait = readTimeouts.get_initial_time_to_wait(); long total_time_in_wait = 0; // The reading of data incorporates a strategy to detect a // rogue client. The strategy is implemented as follows. As // long as data is being read, at least 1 byte or more, we // assume we have a well behaved client. If no data is read, // then we sleep for a time to wait, re-calculate a new time to // wait which is lengthier than the previous time spent waiting. // Then, if the total time spent waiting does not exceed a // maximum time we are willing to wait, we attempt another // read. If the maximum amount of time we are willing to // spend waiting for more data is exceeded, we throw an // IOException. // NOTE: Reading of GIOP headers are treated with a smaller // maximum time to wait threshold. Based on extensive // performance testing, all GIOP headers are being // read in 1 read access. do { bytecount = getSocketChannel().read(byteBuffer); if (bytecount < 0) { throw new IOException("End-of-stream"); } else if (bytecount == 0) { try { Thread.sleep(time_to_wait); total_time_in_wait += time_to_wait; time_to_wait = (long)(time_to_wait*readTimeouts.get_backoff_factor()); } catch (InterruptedException ie) { // ignore exception if (orb.transportDebugFlag) { dprint("readFully(): unexpected exception " + ie.toString()); } } } else { n += bytecount; } } while (n < size && total_time_in_wait < max_wait_time); if (n < size && total_time_in_wait >= max_wait_time) { // failed to read entire message throw wrapper.transportReadTimeoutExceeded(new Integer(size), new Integer(n), new Long(max_wait_time), new Long(total_time_in_wait)); } getConnectionCache().stampTime(this); } // To support non-channel connections. public void readFully(java.io.InputStream is, byte[] buf, int offset, int size, long max_wait_time) throws IOException { int n = 0; int bytecount = 0; long time_to_wait = readTimeouts.get_initial_time_to_wait(); long total_time_in_wait = 0; // The reading of data incorporates a strategy to detect a // rogue client. The strategy is implemented as follows. As // long as data is being read, at least 1 byte or more, we // assume we have a well behaved client. If no data is read, // then we sleep for a time to wait, re-calculate a new time to // wait which is lengthier than the previous time spent waiting. // Then, if the total time spent waiting does not exceed a // maximum time we are willing to wait, we attempt another // read. If the maximum amount of time we are willing to // spend waiting for more data is exceeded, we throw an // IOException. // NOTE: Reading of GIOP headers are treated with a smaller // maximum time to wait threshold. Based on extensive // performance testing, all GIOP headers are being // read in 1 read access. do { bytecount = is.read(buf, offset + n, size - n); if (bytecount < 0) { throw new IOException("End-of-stream"); } else if (bytecount == 0) { try { Thread.sleep(time_to_wait); total_time_in_wait += time_to_wait; time_to_wait = (long)(time_to_wait*readTimeouts.get_backoff_factor()); } catch (InterruptedException ie) { // ignore exception if (orb.transportDebugFlag) { dprint("readFully(): unexpected exception " + ie.toString()); } } } else { n += bytecount; } } while (n < size && total_time_in_wait < max_wait_time); if (n < size && total_time_in_wait >= max_wait_time) { // failed to read entire message throw wrapper.transportReadTimeoutExceeded(new Integer(size), new Integer(n), new Long(max_wait_time), new Long(total_time_in_wait)); } getConnectionCache().stampTime(this); } public void write(ByteBuffer byteBuffer) throws IOException { if (shouldUseDirectByteBuffers()) { /* NOTE: cannot perform this test. If one ask for a ByteBuffer from the pool which is bigger than the size of ByteBuffers managed by the pool, then the pool will return a HeapByteBuffer. if (byteBuffer.hasArray()) { throw wrapper.unexpectedNonDirectByteBufferWithChannelSocket(); } */ // IMPORTANT: For non-blocking SocketChannels, there's no guarantee // all bytes are written on first write attempt. do { getSocketChannel().write(byteBuffer); } while (byteBuffer.hasRemaining()); } else { if (! byteBuffer.hasArray()) { throw wrapper.unexpectedDirectByteBufferWithNonChannelSocket(); } byte[] tmpBuf = byteBuffer.array(); getSocket().getOutputStream().write(tmpBuf, 0, byteBuffer.limit()); getSocket().getOutputStream().flush(); } // TimeStamp connection to indicate it has been used // Note granularity of connection usage is assumed for // now to be that of a IIOP packet. getConnectionCache().stampTime(this); } /** * Note:it is possible for this to be called more than once */ public synchronized void close() { try { if (orb.transportDebugFlag) { dprint(".close->: " + this); } writeLock(); // REVISIT It will be good to have a read lock on the reader thread // before we proceed further, to avoid the reader thread (server side) // from processing requests. This avoids the risk that a new request // will be accepted by ReaderThread while the ListenerThread is // attempting to close this connection. if (isBusy()) { // we are busy! writeUnlock(); if (orb.transportDebugFlag) { dprint(".close: isBusy so no close: " + this); } return; } try { try { sendCloseConnection(GIOPVersion.V1_0); } catch (Throwable t) { wrapper.exceptionWhenSendingCloseConnection(t); } synchronized ( stateEvent ){ state = CLOSE_SENT; stateEvent.notifyAll(); } // stop the reader without causing it to do purgeCalls //Exception ex = new Exception(); //reader.stop(ex); // REVISIT // NOTE: !!!!!! // This does writeUnlock(). purgeCalls(wrapper.connectionRebind(), false, true); } catch (Exception ex) { if (orb.transportDebugFlag) { dprint(".close: exception: " + this, ex); } } try { Selector selector = orb.getTransportManager().getSelector(0); selector.unregisterForEvent(this); if (socketChannel != null) { socketChannel.close(); } socket.close(); } catch (IOException e) { if (orb.transportDebugFlag) { dprint(".close: " + this, e); } } } finally { if (orb.transportDebugFlag) { dprint(".close<-: " + this); } } } public Acceptor getAcceptor() { return acceptor; } public ContactInfo getContactInfo() { return contactInfo; } public EventHandler getEventHandler() { return this; } public OutputObject createOutputObject(MessageMediator messageMediator) { // REVISIT - remove this method from Connection and all it subclasses. throw new RuntimeException("*****SocketOrChannelConnectionImpl.createOutputObject - should not be called."); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -