📄 socketorchannelconnectionimpl.java
字号:
/* * @(#)SocketOrChannelConnectionImpl.java 1.93 07/09/24 * * Copyright 2006 Sun Microsystems, Inc. All rights reserved. * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. */package com.sun.corba.se.impl.transport;import java.io.IOException;import java.net.InetSocketAddress;import java.net.Socket;import java.nio.ByteBuffer;import java.nio.channels.SelectableChannel;import java.nio.channels.SelectionKey;import java.nio.channels.SocketChannel;import java.security.AccessController;import java.security.PrivilegedAction;import java.util.Collections;import java.util.Hashtable;import java.util.HashMap;import java.util.Map;import org.omg.CORBA.COMM_FAILURE;import org.omg.CORBA.CompletionStatus;import org.omg.CORBA.DATA_CONVERSION;import org.omg.CORBA.INTERNAL;import org.omg.CORBA.MARSHAL;import org.omg.CORBA.OBJECT_NOT_EXIST;import org.omg.CORBA.SystemException;import com.sun.org.omg.SendingContext.CodeBase;import com.sun.corba.se.pept.broker.Broker;import com.sun.corba.se.pept.encoding.InputObject;import com.sun.corba.se.pept.encoding.OutputObject;import com.sun.corba.se.pept.protocol.MessageMediator;import com.sun.corba.se.pept.transport.Acceptor;import com.sun.corba.se.pept.transport.Connection;import com.sun.corba.se.pept.transport.ConnectionCache;import com.sun.corba.se.pept.transport.ContactInfo;import com.sun.corba.se.pept.transport.EventHandler;import com.sun.corba.se.pept.transport.InboundConnectionCache;import com.sun.corba.se.pept.transport.OutboundConnectionCache;import com.sun.corba.se.pept.transport.ResponseWaitingRoom;import com.sun.corba.se.pept.transport.Selector;import com.sun.corba.se.spi.ior.IOR;import com.sun.corba.se.spi.ior.iiop.GIOPVersion;import com.sun.corba.se.spi.logging.CORBALogDomains;import com.sun.corba.se.spi.orb.ORB ;import com.sun.corba.se.spi.orbutil.threadpool.NoSuchThreadPoolException;import com.sun.corba.se.spi.orbutil.threadpool.NoSuchWorkQueueException;import com.sun.corba.se.spi.orbutil.threadpool.Work;import com.sun.corba.se.spi.protocol.CorbaMessageMediator;import com.sun.corba.se.spi.transport.CorbaContactInfo;import com.sun.corba.se.spi.transport.CorbaConnection;import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom;import com.sun.corba.se.spi.transport.ReadTimeouts;import com.sun.corba.se.impl.encoding.CachedCodeBase;import com.sun.corba.se.impl.encoding.CDRInputStream_1_0;import com.sun.corba.se.impl.encoding.CDROutputObject;import com.sun.corba.se.impl.encoding.CDROutputStream_1_0;import com.sun.corba.se.impl.encoding.CodeSetComponentInfo;import com.sun.corba.se.impl.encoding.OSFCodeSetRegistry;import com.sun.corba.se.impl.logging.ORBUtilSystemException;import com.sun.corba.se.impl.orbutil.ORBConstants;import com.sun.corba.se.impl.orbutil.ORBUtility;import com.sun.corba.se.impl.protocol.giopmsgheaders.Message;import com.sun.corba.se.impl.protocol.giopmsgheaders.MessageBase;import com.sun.corba.se.impl.transport.CorbaResponseWaitingRoomImpl;/** * @author Harold Carr */public class SocketOrChannelConnectionImpl extends EventHandlerBase implements CorbaConnection, Work{ public static boolean dprintWriteLocks = false; // // New transport. // protected long enqueueTime; protected SocketChannel socketChannel; public SocketChannel getSocketChannel() { return socketChannel; } // REVISIT: // protected for test: genericRPCMSGFramework.IIOPConnection constructor. protected CorbaContactInfo contactInfo; protected Acceptor acceptor; protected ConnectionCache connectionCache; // // From iiop.Connection.java // protected Socket socket; // The socket used for this connection. protected long timeStamp = 0; protected boolean isServer = false; // Start at some value other than zero since this is a magic // value in some protocols. protected int requestId = 5; protected CorbaResponseWaitingRoom responseWaitingRoom; protected int state; protected java.lang.Object stateEvent = new java.lang.Object(); protected java.lang.Object writeEvent = new java.lang.Object(); protected boolean writeLocked; protected int serverRequestCount = 0; // Server request map: used on the server side of Connection // Maps request ID to IIOPInputStream. Map serverRequestMap = null; // This is a flag associated per connection telling us if the // initial set of sending contexts were sent to the receiver // already... protected boolean postInitialContexts = false; // Remote reference to CodeBase server (supplies // FullValueDescription, among other things) protected IOR codeBaseServerIOR; // CodeBase cache for this connection. This will cache remote operations, // handle connecting, and ensure we don't do any remote operations until // necessary. protected CachedCodeBase cachedCodeBase = new CachedCodeBase(this); protected ORBUtilSystemException wrapper ; // transport read timeout values protected ReadTimeouts readTimeouts; protected boolean shouldReadGiopHeaderOnly; // A message mediator used when shouldReadGiopHeaderOnly is // true to maintain request message state across execution in a // SelectorThread and WorkerThread. protected CorbaMessageMediator partialMessageMediator = null; // Used in genericRPCMSGFramework test. protected SocketOrChannelConnectionImpl(ORB orb) { this.orb = orb; wrapper = ORBUtilSystemException.get( orb, CORBALogDomains.RPC_TRANSPORT ) ; setWork(this); responseWaitingRoom = new CorbaResponseWaitingRoomImpl(orb, this); setReadTimeouts(orb.getORBData().getTransportTCPReadTimeouts()); } // Both client and servers. protected SocketOrChannelConnectionImpl(ORB orb, boolean useSelectThreadToWait, boolean useWorkerThread) { this(orb) ; setUseSelectThreadToWait(useSelectThreadToWait); setUseWorkerThreadForEvent(useWorkerThread); } // Client constructor. public SocketOrChannelConnectionImpl(ORB orb, CorbaContactInfo contactInfo, boolean useSelectThreadToWait, boolean useWorkerThread, String socketType, String hostname, int port) { this(orb, useSelectThreadToWait, useWorkerThread); this.contactInfo = contactInfo; try { socket = orb.getORBData().getSocketFactory() .createSocket(socketType, new InetSocketAddress(hostname, port)); socketChannel = socket.getChannel(); if (socketChannel != null) { boolean isBlocking = !useSelectThreadToWait; socketChannel.configureBlocking(isBlocking); } else { // IMPORTANT: non-channel-backed sockets must use // dedicated reader threads. setUseSelectThreadToWait(false); } if (orb.transportDebugFlag) { dprint(".initialize: connection created: " + socket); } } catch (Throwable t) { throw wrapper.connectFailure(t, socketType, hostname, Integer.toString(port)); } state = OPENING; } // Client-side convenience. public SocketOrChannelConnectionImpl(ORB orb, CorbaContactInfo contactInfo, String socketType, String hostname, int port) { this(orb, contactInfo, orb.getORBData().connectionSocketUseSelectThreadToWait(), orb.getORBData().connectionSocketUseWorkerThreadForEvent(), socketType, hostname, port); } // Server-side constructor. public SocketOrChannelConnectionImpl(ORB orb, Acceptor acceptor, Socket socket, boolean useSelectThreadToWait, boolean useWorkerThread) { this(orb, useSelectThreadToWait, useWorkerThread); this.socket = socket; socketChannel = socket.getChannel(); if (socketChannel != null) { // REVISIT try { boolean isBlocking = !useSelectThreadToWait; socketChannel.configureBlocking(isBlocking); } catch (IOException e) { RuntimeException rte = new RuntimeException(); rte.initCause(e); throw rte; } } this.acceptor = acceptor; serverRequestMap = Collections.synchronizedMap(new HashMap()); isServer = true; state = ESTABLISHED; } // Server-side convenience public SocketOrChannelConnectionImpl(ORB orb, Acceptor acceptor, Socket socket) { this(orb, acceptor, socket, (socket.getChannel() == null ? false : orb.getORBData().connectionSocketUseSelectThreadToWait()), (socket.getChannel() == null ? false : orb.getORBData().connectionSocketUseWorkerThreadForEvent())); } //////////////////////////////////////////////////// // // framework.transport.Connection // public boolean shouldRegisterReadEvent() { return true; } public boolean shouldRegisterServerReadEvent() { return true; } public boolean read() { try { if (orb.transportDebugFlag) { dprint(".read->: " + this); } CorbaMessageMediator messageMediator = readBits(); if (messageMediator != null) { // Null can happen when client closes stream // causing purgecalls. return dispatch(messageMediator); } return true; } finally { if (orb.transportDebugFlag) { dprint(".read<-: " + this); } } } protected CorbaMessageMediator readBits() { try { if (orb.transportDebugFlag) { dprint(".readBits->: " + this); } MessageMediator messageMediator; // REVISIT - use common factory base class. if (contactInfo != null) { messageMediator = contactInfo.createMessageMediator(orb, this); } else if (acceptor != null) { messageMediator = acceptor.createMessageMediator(orb, this); } else { throw new RuntimeException("SocketOrChannelConnectionImpl.readBits"); } return (CorbaMessageMediator) messageMediator; } catch (ThreadDeath td) { if (orb.transportDebugFlag) { dprint(".readBits: " + this + ": ThreadDeath: " + td, td); } try { purgeCalls(wrapper.connectionAbort(td), false, false); } catch (Throwable t) { if (orb.transportDebugFlag) { dprint(".readBits: " + this + ": purgeCalls: Throwable: " + t, t); } } throw td; } catch (Throwable ex) { if (orb.transportDebugFlag) { dprint(".readBits: " + this + ": Throwable: " + ex, ex); } try { if (ex instanceof INTERNAL) { sendMessageError(GIOPVersion.DEFAULT_VERSION); } } catch (IOException e) { if (orb.transportDebugFlag) { dprint(".readBits: " + this + ": sendMessageError: IOException: " + e, e); } } // REVISIT - make sure reader thread is killed. orb.getTransportManager().getSelector(0).unregisterForEvent(this); // Notify anyone waiting. purgeCalls(wrapper.connectionAbort(ex), true, false); // REVISIT //keepRunning = false; // REVISIT - if this is called after purgeCalls then // the state of the socket is ABORT so the writeLock // in close throws an exception. It is ignored but // causes IBM (screen scraping) tests to fail. //close(); } finally { if (orb.transportDebugFlag) { dprint(".readBits<-: " + this); } } return null; } protected CorbaMessageMediator finishReadingBits(MessageMediator messageMediator) { try { if (orb.transportDebugFlag) { dprint(".finishReadingBits->: " + this); } // REVISIT - use common factory base class. if (contactInfo != null) { messageMediator = contactInfo.finishCreatingMessageMediator(orb, this, messageMediator); } else if (acceptor != null) { messageMediator = acceptor.finishCreatingMessageMediator(orb, this, messageMediator); } else { throw new RuntimeException("SocketOrChannelConnectionImpl.finishReadingBits"); } return (CorbaMessageMediator) messageMediator; } catch (ThreadDeath td) { if (orb.transportDebugFlag) { dprint(".finishReadingBits: " + this + ": ThreadDeath: " + td, td); } try { purgeCalls(wrapper.connectionAbort(td), false, false); } catch (Throwable t) { if (orb.transportDebugFlag) { dprint(".finishReadingBits: " + this + ": purgeCalls: Throwable: " + t, t); } } throw td; } catch (Throwable ex) { if (orb.transportDebugFlag) { dprint(".finishReadingBits: " + this + ": Throwable: " + ex, ex); } try { if (ex instanceof INTERNAL) { sendMessageError(GIOPVersion.DEFAULT_VERSION); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -