📄 socketorchannelconnectionimpl.java
字号:
// This is used by the GIOPOutputObject in order to // throw the correct error when handling code sets. // Can we determine if we are on the server side by // other means? XREVISIT public boolean isServer() { return isServer; } public boolean isBusy() { if (serverRequestCount > 0 || getResponseWaitingRoom().numberRegistered() > 0) { return true; } else { return false; } } public long getTimeStamp() { return timeStamp; } public void setTimeStamp(long time) { timeStamp = time; } public void setState(String stateString) { synchronized (stateEvent) { if (stateString.equals("ESTABLISHED")) { state = ESTABLISHED; stateEvent.notifyAll(); } else { // REVISIT: ASSERT } } } /** * Sets the writeLock for this connection. * If the writeLock is already set by someone else, block till the * writeLock is released and can set by us. * IMPORTANT: this connection's lock must be acquired before * setting the writeLock and must be unlocked after setting the writeLock. */ public void writeLock() { try { if (dprintWriteLocks && orb.transportDebugFlag) { dprint(".writeLock->: " + this); } // Keep looping till we can set the writeLock. while ( true ) { int localState = state; switch ( localState ) { case OPENING: synchronized (stateEvent) { if (state != OPENING) { // somebody has changed 'state' so be careful break; } try { stateEvent.wait(); } catch (InterruptedException ie) { if (orb.transportDebugFlag) { dprint(".writeLock: OPENING InterruptedException: " + this); } } } // Loop back break; case ESTABLISHED: synchronized (writeEvent) { if (!writeLocked) { writeLocked = true; return; } try { // do not stay here too long if state != ESTABLISHED // Bug 4752117 while (state == ESTABLISHED && writeLocked) { writeEvent.wait(100); } } catch (InterruptedException ie) { if (orb.transportDebugFlag) { dprint(".writeLock: ESTABLISHED InterruptedException: " + this); } } } // Loop back break; // // XXX // Need to distinguish between client and server roles // here probably. // case ABORT: synchronized ( stateEvent ){ if (state != ABORT) { break; } throw wrapper.writeErrorSend() ; } case CLOSE_RECVD: // the connection has been closed or closing // ==> throw rebind exception synchronized ( stateEvent ){ if (state != CLOSE_RECVD) { break; } throw wrapper.connectionCloseRebind() ; } default: if (orb.transportDebugFlag) { dprint(".writeLock: default: " + this); } // REVISIT throw new RuntimeException(".writeLock: bad state"); } } } finally { if (dprintWriteLocks && orb.transportDebugFlag) { dprint(".writeLock<-: " + this); } } } public void writeUnlock() { try { if (dprintWriteLocks && orb.transportDebugFlag) { dprint(".writeUnlock->: " + this); } synchronized (writeEvent) { writeLocked = false; writeEvent.notify(); // wake up one guy waiting to write } } finally { if (dprintWriteLocks && orb.transportDebugFlag) { dprint(".writeUnlock<-: " + this); } } } // Assumes the caller handles writeLock and writeUnlock public void sendWithoutLock(OutputObject outputObject) { // Don't we need to check for CloseConnection // here? REVISIT // XREVISIT - Shouldn't the MessageMediator // be the one to handle writing the data here? try { // Write the fragment/message CDROutputObject cdrOutputObject = (CDROutputObject) outputObject; cdrOutputObject.writeTo(this); // REVISIT - no flush? //socket.getOutputStream().flush(); } catch (IOException e1) { /* * ADDED(Ram J) 10/13/2000 In the event of an IOException, try * sending a CancelRequest for regular requests / locate requests */ // Since IIOPOutputStream's msgheader is set only once, and not // altered during sending multiple fragments, the original // msgheader will always have the requestId. // REVISIT This could be optimized to send a CancelRequest only // if any fragments had been sent already. /* REVISIT: MOVE TO SUBCONTRACT Message msg = os.getMessage(); if (msg.getType() == Message.GIOPRequest || msg.getType() == Message.GIOPLocateRequest) { GIOPVersion requestVersion = msg.getGIOPVersion(); int requestId = MessageBase.getRequestId(msg); try { sendCancelRequest(requestVersion, requestId); } catch (IOException e2) { // most likely an abortive connection closure. // ignore, since nothing more can be done. if (orb.transportDebugFlag) { } } */ // REVISIT When a send failure happens, purgeCalls() need to be // called to ensure that the connection is properly removed from // further usage (ie., cancelling pending requests with COMM_FAILURE // with an appropriate minor_code CompletionStatus.MAY_BE). // Relying on the IIOPOutputStream (as noted below) is not // sufficient as it handles COMM_FAILURE only for the final // fragment (during invoke processing). Note that COMM_FAILURE could // happen while sending the initial fragments. // Also the IIOPOutputStream does not properly close the connection. // It simply removes the connection from the table. An orderly // closure is needed (ie., cancel pending requests on the connection // COMM_FAILURE as well. // IIOPOutputStream will cleanup the connection info when it // sees this exception. SystemException exc = wrapper.writeErrorSend(e1); purgeCalls(exc, false, true); throw exc; } } public void registerWaiter(MessageMediator messageMediator) { responseWaitingRoom.registerWaiter(messageMediator); } public void unregisterWaiter(MessageMediator messageMediator) { responseWaitingRoom.unregisterWaiter(messageMediator); } public InputObject waitForResponse(MessageMediator messageMediator) { return responseWaitingRoom.waitForResponse(messageMediator); } public void setConnectionCache(ConnectionCache connectionCache) { this.connectionCache = connectionCache; } public ConnectionCache getConnectionCache() { return connectionCache; } //////////////////////////////////////////////////// // // EventHandler methods // public void setUseSelectThreadToWait(boolean x) { useSelectThreadToWait = x; // REVISIT - Reading of a GIOP header only is information // that should be passed into the constructor // from the SocketOrChannelConnection factory. setReadGiopHeaderOnly(shouldUseSelectThreadToWait()); } public void handleEvent() { if (orb.transportDebugFlag) { dprint(".handleEvent->: " + this); } getSelectionKey().interestOps(getSelectionKey().interestOps() & (~ getInterestOps())); if (shouldUseWorkerThreadForEvent()) { Throwable throwable = null; try { int poolToUse = 0; if (shouldReadGiopHeaderOnly()) { partialMessageMediator = readBits(); poolToUse = partialMessageMediator.getThreadPoolToUse(); } if (orb.transportDebugFlag) { dprint(".handleEvent: addWork to pool: " + poolToUse); } orb.getThreadPoolManager().getThreadPool(poolToUse) .getWorkQueue(0).addWork(getWork()); } catch (NoSuchThreadPoolException e) { throwable = e; } catch (NoSuchWorkQueueException e) { throwable = e; } // REVISIT: need to close connection. if (throwable != null) { if (orb.transportDebugFlag) { dprint(".handleEvent: " + throwable); } INTERNAL i = new INTERNAL("NoSuchThreadPoolException"); i.initCause(throwable); throw i; } } else { if (orb.transportDebugFlag) { dprint(".handleEvent: doWork"); } getWork().doWork(); } if (orb.transportDebugFlag) { dprint(".handleEvent<-: " + this); } } public SelectableChannel getChannel() { return socketChannel; } public int getInterestOps() { return SelectionKey.OP_READ; } // public Acceptor getAcceptor() - already defined above. public Connection getConnection() { return this; } //////////////////////////////////////////////////// // // Work methods. // public String getName() { return this.toString(); } public void doWork() { try { if (orb.transportDebugFlag) { dprint(".doWork->: " + this); } // IMPORTANT: Sanity checks on SelectionKeys such as // SelectorKey.isValid() should not be done // here. // if (!shouldReadGiopHeaderOnly()) { read(); } else { // get the partialMessageMediator // created by SelectorThread CorbaMessageMediator messageMediator = this.getPartialMessageMediator(); // read remaining info needed in a MessageMediator messageMediator = finishReadingBits(messageMediator); if (messageMediator != null) { // Null can happen when client closes stream // causing purgecalls. dispatch(messageMediator); } } } catch (Throwable t) { if (orb.transportDebugFlag) { dprint(".doWork: ignoring Throwable: " + t + " " + this); } } finally { if (orb.transportDebugFlag) { dprint(".doWork<-: " + this); } } } public void setEnqueueTime(long timeInMillis) { enqueueTime = timeInMillis; } public long getEnqueueTime() { return enqueueTime; } //////////////////////////////////////////////////// // // spi.transport.CorbaConnection. // // IMPORTANT: Reader Threads must NOT read Giop header only. public boolean shouldReadGiopHeaderOnly() { return shouldReadGiopHeaderOnly; } protected void setReadGiopHeaderOnly(boolean shouldReadHeaderOnly) { shouldReadGiopHeaderOnly = shouldReadHeaderOnly; } public ResponseWaitingRoom getResponseWaitingRoom() { return responseWaitingRoom; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -