iiopconnection.java
来自「java jdk 1.4的源码」· Java 代码 · 共 1,000 行 · 第 1/3 页
JAVA
1,000 行
return returnStream; } void createOutCallDescriptor(int requestId) { // Temporary solution -- check if we're a server or not if (!isServer) { Integer requestID = new Integer(requestId); OutCallDesc call = new OutCallDesc(); call.thd = Thread.currentThread(); out_calls.put(requestID, call); } } public void removeOutCallDescriptor(int requestId) { if (!isServer) { Integer requestID = new Integer(requestId); out_calls.remove(requestID); } } // Assumes the caller handles writeLock and writeUnlock void sendWithoutLock(IIOPOutputStream os) { // Don't we need to check for CloseConnection // here? REVISIT try { // Write the fragment/message os.writeTo(outputStream); outputStream.flush(); } catch (IOException e1) { /* * ADDED(Ram J) 10/13/2000 In the event of an IOException, try * sending a CancelRequest for regular requests / locate requests */ // Since IIOPOutputStream's msgheader is set only once, and not // altered during sending multiple fragments, the original // msgheader will always have the requestId. // REVISIT This could be optimized to send a CancelRequest only // if any fragments had been sent already. Message msg = os.getMessage(); if (msg.getType() == Message.GIOPRequest || msg.getType() == Message.GIOPLocateRequest) { GIOPVersion requestVersion = msg.getGIOPVersion(); int requestId = MessageBase.getRequestId(msg); try { sendCancelRequest(requestVersion, requestId); } catch (IOException e2) { // most likely an abortive connection closure. // ignore, since nothing more can be done. } } // REVISIT When a send failure happens, purge_calls() need to be // called to ensure that the connection is properly removed from // further usage (ie., cancelling pending requests with COMM_FAILURE // with an appropriate minor_code CompletionStatus.MAY_BE). // Relying on the IIOPOutputStream (as noted below) is not // sufficient as it handles COMM_FAILURE only for the final // fragment (during invoke processing). Note that COMM_FAILURE could // happen while sending the initial fragments. // Also the IIOPOutputStream does not properly close the connection. // It simply removes the connection from the table. An orderly // closure is needed (ie., cancel pending requests on the connection // COMM_FAILURE as well. // IIOPOutputStream will cleanup the connection info when it // sees this exception. throw new COMM_FAILURE(MinorCodes.WRITE_ERROR_SEND, CompletionStatus.COMPLETED_NO); } } /** Send an IIOP message to the server. * If not oneway, wait for the reply. */ public IIOPInputStream send(IIOPOutputStream s, boolean oneWay) { /* writeLock(); createOutCallDescriptor(MessageBase.getRequestId(s.getMessage())); try { sendWithoutLock(s); } finally { writeUnlock(); } */ // This will force all fragments to be sent. s.finishSendingMessage(); return getResponse(oneWay, MessageBase.getRequestId(s.getMessage())); } public void sendReply(IIOPOutputStream os) throws Exception { os.finishSendingMessage(); } /** public IOR locate(byte[] key) is now in Connection.java */ /*************************************************************************** * The following methods are for dealing with Connection cleaning for * better scalability of servers in high network load conditions. ***************************************************************************/ /** * Send a CancelRequest message. This does not lock the connection, so the * caller needs to ensure this method is called appropriately. * * @exception IOException if an I/O error occurs (could be due to abortive * connection closure). */ public void sendCancelRequest(GIOPVersion giopVersion, int requestId) throws IOException { Message msg = MessageBase.createCancelRequest(giopVersion, requestId); IIOPOutputStream os = new IIOPOutputStream(giopVersion, orb, this); os.setMessage(msg); msg.write(os); os.writeTo(outputStream); outputStream.flush(); } public void sendCancelRequestWithLock(GIOPVersion giopVersion, int requestId) throws IOException { writeLock(); try { sendCancelRequest(giopVersion, requestId); } finally { writeUnlock(); } } /** * Send a CloseConnection message. This does not lock the connection, so the * caller needs to ensure this method is called appropriately. * * @exception IOException if an I/O error occurs (could be due to abortive * connection closure). */ public void sendCloseConnection(GIOPVersion giopVersion) throws IOException { Message msg = MessageBase.createCloseConnection(giopVersion); IIOPOutputStream os = new IIOPOutputStream(giopVersion, orb, this); os.setMessage(msg); msg.write(os); os.writeTo(outputStream); outputStream.flush(); } /** * Send a MessageError message. This does not lock the connection, so the * caller needs to ensure this method is called appropriately. * * @exception IOException if an I/O error occurs (could be due to abortive * connection closure). */ public void sendMessageError(GIOPVersion giopVersion) throws IOException { Message msg = MessageBase.createMessageError(giopVersion); IIOPOutputStream os = new IIOPOutputStream(giopVersion, orb, this); os.setMessage(msg); msg.write(os); os.writeTo(outputStream); outputStream.flush(); } public boolean isBusy() { // Note: Hashtable.size() is not synchronized if (requestCount > 0 || out_calls.size() > 0) return true; else return false; } /** * Cleans up this Connection. * This is called from ConnectionTable, from the ListenerThread. * Note:it is possible for this to be called more than once */ public synchronized void cleanUp() throws Exception { writeLock(); // REVISIT It will be good to have a read lock on the reader thread // before we proceed further, to avoid the reader thread (server side) // from processing requests. This avoids the risk that a new request // will be accepted by ReaderThread while the ListenerThread is // attempting to close this connection. if (requestCount > 0 || out_calls.size() > 0) { // we are busy! writeUnlock(); throw new Exception(); } try { sendCloseConnection(GIOPVersion.V1_0); synchronized ( stateEvent ){ state = CLOSE_SENT; stateEvent.notifyAll(); } // stop the reader without causing it to do purge_calls Exception ex = new Exception(); reader.stop(ex); // this also does writeUnlock(); purge_calls(Connection.CONN_REBIND, false, true); } catch (Exception ex) {} } /** It is possible for a Close Connection to have been ** sent here, but we will not check for this. A "lazy" ** Exception will be thrown in the Worker thread after the ** incoming request has been processed even though the connection ** is closed before the request is processed. This is o.k because ** it is a boundary condition. To prevent it we would have to add ** more locks which would reduce performance in the normal case. **/ public synchronized void requestBegins() { requestCount++; } public synchronized void requestEnds(IIOPInputStream request) { if (request.getGIOPVersion().equals(GIOPVersion.V1_2)) serverRequestMap.remove(new Integer(MessageBase.getRequestId(request.getMessage()))); if (request.getGIOPVersion().equals(GIOPVersion.V1_1)) theOnly1_1ServerRequestImpl = null; requestCount--; } void shutdown() { // The order is important here. First make sure that the thread knows what to do // after the socket closes before we close it. ((ReaderThread)reader).shutdown(); super.shutdown(); } public void print() { System.out.println("Connection for " + endpoint.getHostName() + " @ " + endpoint.getPort()); System.out.println(" Time stamp = " + timeStamp); boolean alive = reader.isAlive(); if (alive) System.out.println(" Reader is Alive"); else System.out.println(" Reader is not Alive"); } // Begin Code Base methods --------------------------------------- // // Set this connection's code base IOR. The IOR comes from the // SendingContext. This is an optional service context, but all // JavaSoft ORBs send it. // // The set and get methods don't need to be synchronized since the // first possible get would occur during reading a valuetype, and // that would be after the set. public final void setCodeBaseIOR(IOR ior) { codeBaseServerIOR = ior; } final IOR getCodeBaseIOR() { return codeBaseServerIOR; } // Get a CodeBase stub to use in unmarshaling. The CachedCodeBase // won't connect to the remote codebase unless it's necessary. final CodeBase getCodeBase() { return cachedCodeBase; } // End Code Base methods ----------------------------------------- final void createIdToFragmentedOutputStreamEntry ( int requestID, IIOPOutputStream outputStream) { idToFragmentedOutputStream.put(new Integer(requestID), outputStream); } public final IIOPOutputStream getIdToFragmentedOutputStreamEntry( int requestID) { return (IIOPOutputStream) idToFragmentedOutputStream.get(new Integer(requestID)); } public final void removeIdToFragmentedOutputStreamEntry(int requestID) { idToFragmentedOutputStream.remove(new Integer(requestID)); }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?