📄 socketorchannelconnectionimpl.java
字号:
// REVISIT - inteface defines isServer but already defined in // higher interface. public void serverRequestMapPut(int requestId, CorbaMessageMediator messageMediator) { serverRequestMap.put(new Integer(requestId), messageMediator); } public CorbaMessageMediator serverRequestMapGet(int requestId) { return (CorbaMessageMediator) serverRequestMap.get(new Integer(requestId)); } public void serverRequestMapRemove(int requestId) { serverRequestMap.remove(new Integer(requestId)); } // REVISIT: this is also defined in: // com.sun.corba.se.spi.legacy.connection.Connection public java.net.Socket getSocket() { return socket; } /** It is possible for a Close Connection to have been ** sent here, but we will not check for this. A "lazy" ** Exception will be thrown in the Worker thread after the ** incoming request has been processed even though the connection ** is closed before the request is processed. This is o.k because ** it is a boundary condition. To prevent it we would have to add ** more locks which would reduce performance in the normal case. **/ public synchronized void serverRequestProcessingBegins() { serverRequestCount++; } public synchronized void serverRequestProcessingEnds() { serverRequestCount--; } // // // public synchronized int getNextRequestId() { return requestId++; } // Negotiated code sets for char and wchar data protected CodeSetComponentInfo.CodeSetContext codeSetContext = null; public ORB getBroker() { return orb; } public CodeSetComponentInfo.CodeSetContext getCodeSetContext() { // Needs to be synchronized for the following case when the client // doesn't send the code set context twice, and we have two threads // in ServerRequestDispatcher processCodeSetContext. // // Thread A checks to see if there is a context, there is none, so // it calls setCodeSetContext, getting the synch lock. // Thread B checks to see if there is a context. If we didn't synch, // it might decide to outlaw wchar/wstring. if (codeSetContext == null) { synchronized(this) { return codeSetContext; } } return codeSetContext; } public synchronized void setCodeSetContext(CodeSetComponentInfo.CodeSetContext csc) { // Double check whether or not we need to do this if (codeSetContext == null) { if (OSFCodeSetRegistry.lookupEntry(csc.getCharCodeSet()) == null || OSFCodeSetRegistry.lookupEntry(csc.getWCharCodeSet()) == null) { // If the client says it's negotiated a code set that // isn't a fallback and we never said we support, then // it has a bug. throw wrapper.badCodesetsFromClient() ; } codeSetContext = csc; } } // // from iiop.IIOPConnection.java // // Map request ID to an InputObject. // This is so the client thread can start unmarshaling // the reply and remove it from the out_calls map while the // ReaderThread can still obtain the input stream to give // new fragments. Only the ReaderThread touches the clientReplyMap, // so it doesn't incur synchronization overhead. public MessageMediator clientRequestMapGet(int requestId) { return responseWaitingRoom.getMessageMediator(requestId); } protected MessageMediator clientReply_1_1; public void clientReply_1_1_Put(MessageMediator x) { clientReply_1_1 = x; } public MessageMediator clientReply_1_1_Get() { return clientReply_1_1; } public void clientReply_1_1_Remove() { clientReply_1_1 = null; } protected MessageMediator serverRequest_1_1; public void serverRequest_1_1_Put(MessageMediator x) { serverRequest_1_1 = x; } public MessageMediator serverRequest_1_1_Get() { return serverRequest_1_1; } public void serverRequest_1_1_Remove() { serverRequest_1_1 = null; } protected String getStateString( int state ) { synchronized ( stateEvent ){ switch (state) { case OPENING : return "OPENING" ; case ESTABLISHED : return "ESTABLISHED" ; case CLOSE_SENT : return "CLOSE_SENT" ; case CLOSE_RECVD : return "CLOSE_RECVD" ; case ABORT : return "ABORT" ; default : return "???" ; } } } public synchronized boolean isPostInitialContexts() { return postInitialContexts; } // Can never be unset... public synchronized void setPostInitialContexts(){ postInitialContexts = true; } /** * Wake up the outstanding requests on the connection, and hand them * COMM_FAILURE exception with a given minor code. * * Also, delete connection from connection table and * stop the reader thread. * Note that this should only ever be called by the Reader thread for * this connection. * * @param minor_code The minor code for the COMM_FAILURE major code. * @param die Kill the reader thread (this thread) before exiting. */ public void purgeCalls(SystemException systemException, boolean die, boolean lockHeld) { int minor_code = systemException.minor; try{ if (orb.transportDebugFlag) { dprint(".purgeCalls->: " + minor_code + "/" + die + "/" + lockHeld + " " + this); } // If this invocation is a result of ThreadDeath caused // by a previous execution of this routine, just exit. synchronized ( stateEvent ){ if ((state == ABORT) || (state == CLOSE_RECVD)) { if (orb.transportDebugFlag) { dprint(".purgeCalls: exiting since state is: " + getStateString(state) + " " + this); } return; } } // Grab the writeLock (freeze the calls) try { if (!lockHeld) { writeLock(); } } catch (SystemException ex) { if (orb.transportDebugFlag) dprint(".purgeCalls: SystemException" + ex + "; continuing " + this); } // Mark the state of the connection // and determine the request status org.omg.CORBA.CompletionStatus completion_status; synchronized ( stateEvent ){ if (minor_code == ORBUtilSystemException.CONNECTION_REBIND) { state = CLOSE_RECVD; systemException.completed = CompletionStatus.COMPLETED_NO; } else { state = ABORT; systemException.completed = CompletionStatus.COMPLETED_MAYBE; } stateEvent.notifyAll(); } try { socket.getInputStream().close(); socket.getOutputStream().close(); socket.close(); } catch (Exception ex) { if (orb.transportDebugFlag) { dprint(".purgeCalls: Exception closing socket: " + ex + " " + this); } } // Signal all threads with outstanding requests on this // connection and give them the SystemException; responseWaitingRoom.signalExceptionToAllWaiters(systemException); if (contactInfo != null) { ((OutboundConnectionCache)getConnectionCache()).remove(contactInfo); } else if (acceptor != null) { ((InboundConnectionCache)getConnectionCache()).remove(this); } // // REVISIT: Stop the reader thread // // Signal all the waiters of the writeLock. // There are 4 types of writeLock waiters: // 1. Send waiters: // 2. SendReply waiters: // 3. cleanUp waiters: // 4. purge_call waiters: // writeUnlock(); } finally { if (orb.transportDebugFlag) { dprint(".purgeCalls<-: " + minor_code + "/" + die + "/" + lockHeld + " " + this); } } } /************************************************************************* * The following methods are for dealing with Connection cleaning for * better scalability of servers in high network load conditions. **************************************************************************/ public void sendCloseConnection(GIOPVersion giopVersion) throws IOException { Message msg = MessageBase.createCloseConnection(giopVersion); sendHelper(giopVersion, msg); } public void sendMessageError(GIOPVersion giopVersion) throws IOException { Message msg = MessageBase.createMessageError(giopVersion); sendHelper(giopVersion, msg); } /** * Send a CancelRequest message. This does not lock the connection, so the * caller needs to ensure this method is called appropriately. * @exception IOException - could be due to abortive connection closure. */ public void sendCancelRequest(GIOPVersion giopVersion, int requestId) throws IOException { Message msg = MessageBase.createCancelRequest(giopVersion, requestId); sendHelper(giopVersion, msg); } protected void sendHelper(GIOPVersion giopVersion, Message msg) throws IOException { // REVISIT: See comments in CDROutputObject constructor. CDROutputObject outputObject = new CDROutputObject((ORB)orb, null, giopVersion, this, msg, ORBConstants.STREAM_FORMAT_VERSION_1); msg.write(outputObject); outputObject.writeTo(this); } public void sendCancelRequestWithLock(GIOPVersion giopVersion, int requestId) throws IOException { writeLock(); try { sendCancelRequest(giopVersion, requestId); } finally { writeUnlock(); } } // Begin Code Base methods --------------------------------------- // // Set this connection's code base IOR. The IOR comes from the // SendingContext. This is an optional service context, but all // JavaSoft ORBs send it. // // The set and get methods don't need to be synchronized since the // first possible get would occur during reading a valuetype, and // that would be after the set. // Sets this connection's code base IOR. This is done after // getting the IOR out of the SendingContext service context. // Our ORBs always send this, but it's optional in CORBA. public final void setCodeBaseIOR(IOR ior) { codeBaseServerIOR = ior; } public final IOR getCodeBaseIOR() { return codeBaseServerIOR; } // Get a CodeBase stub to use in unmarshaling. The CachedCodeBase // won't connect to the remote codebase unless it's necessary. public final CodeBase getCodeBase() { return cachedCodeBase; } // End Code Base methods ----------------------------------------- // set transport read thresholds protected void setReadTimeouts(ReadTimeouts readTimeouts) { this.readTimeouts = readTimeouts; } protected void setPartialMessageMediator(CorbaMessageMediator messageMediator) { partialMessageMediator = messageMediator; } protected CorbaMessageMediator getPartialMessageMediator() { return partialMessageMediator; } public String toString() { synchronized ( stateEvent ){ return "SocketOrChannelConnectionImpl[" + " " + (socketChannel == null ? socket.toString() : socketChannel.toString()) + " " + getStateString( state ) + " " + shouldUseSelectThreadToWait() + " " + shouldUseWorkerThreadForEvent() + " " + shouldReadGiopHeaderOnly() + "]" ; } } // Must be public - used in encoding. public void dprint(String msg) { ORBUtility.dprint("SocketOrChannelConnectionImpl", msg); } protected void dprint(String msg, Throwable t) { dprint(msg); t.printStackTrace(System.out); }}// End of file.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -