tcpconnection.java

来自「RESIN 3.2 最新源码」· Java 代码 · 共 1,311 行 · 第 1/2 页

JAVA
1,311
字号
    _state = _state.toKeepaliveSelect();    if (_port.getSelectManager() != null) {      if (_port.getSelectManager().keepalive(this)) {        if (log.isLoggable(Level.FINE))          log.fine(dbgId() + " keepalive (select)");	return RequestState.THREAD_DETACHED;      }      else { 	log.warning(dbgId() + " failed keepalive (select)");	_state = _state.toActive();        port.keepaliveEnd(this);	close();		return RequestState.EXIT;      }    }    else {      if (log.isLoggable(Level.FINE))        log.fine(dbgId() + " keepalive (thread)");            if (getReadStream().waitForRead()) {        port.keepaliveEnd(this);	_state = _state.toActive();              return RequestState.REQUEST;      }      else {	_state = _state.toActive();        port.keepaliveEnd(this);                close();                  return RequestState.EXIT;      }    }  }  /**   * Try to read nonblock   */  private boolean waitForKeepalive()    throws IOException  {    if (_state.isClosed())      return false;        Port port = getPort();        if (port.isClosed())      return false;    ReadStream is = getReadStream();        if (getReadStream().getBufferAvailable() > 0)      return true;        long timeout = port.getKeepaliveTimeout();    boolean isSelectManager = port.getServer().isSelectManagerEnabled();        if (isSelectManager) {      timeout = port.getKeepaliveSelectThreadTimeout();    }    if (timeout > 0 && timeout < port.getSocketTimeout()) {      port.keepaliveThreadBegin();      try {	return is.fillWithTimeout(timeout);      } finally {	port.keepaliveThreadEnd();      }    }    else if (isSelectManager)      return false;    else      return true;  }  /**   * Sets controller   */  @Override  public void setController(ConnectionController controller)  {    super.setController(controller);    if (controller.isDuplex()) {      if (log.isLoggable(Level.FINER))	log.finer(this + " starting duplex");            TcpDuplexController duplex = (TcpDuplexController) controller;            _state = ConnectionState.DUPLEX;      _readTask = new DuplexReadTask(duplex);    }    else {      if (log.isLoggable(Level.FINER))	log.finer(this + " starting comet");            _state = ConnectionState.COMET;    }  }  /**   * Wakes the connection (comet-style).   */  @Override  protected boolean wake()  {    ConnectionController controller = getController();    if (controller == null)      return false;        _isWake = true;    // comet    if (getPort().resume(this)) {      log.fine(dbgId() + " wake");      return true;    }    log.fine(dbgId() + " wake failed");    return false;  }  /**   * Closes on shutdown.   */  public void closeOnShutdown()  {    QSocket socket = _socket;    if (socket != null) {      try {        socket.close();      } catch (Throwable e) {        log.log(Level.FINE, e.toString(), e);      }      Thread.yield();    }  }    public void close()  {    closeImpl();  }  /**   * Closes the connection.   */  private void closeImpl()  {    QSocket socket = _socket;    ConnectionState state;    synchronized (this) {      state = _state;            if (state == ConnectionState.IDLE || state.isClosed())        return;            _state = _state.toClosed();    }    // detach any comet    getPort().detach(this);    getRequest().protocolCloseEvent();         ConnectionController controller = getController();    if (controller != null)      controller.close();        _isKeepalive = false;    Port port = getPort();    if (state.isKeepalive()) {      port.keepaliveEnd(this);    }    if (log.isLoggable(Level.FINER)) {      if (port != null)	log.finer(dbgId() + "closing connection " + this + ", total=" + port.getConnectionCount());      else	log.finer(dbgId() + "closing connection " + this);    }    _isWake = false;    try {      getWriteStream().close();    } catch (Throwable e) {      log.log(Level.FINE, e.toString(), e);    }    try {      getReadStream().close();    } catch (Throwable e) {      log.log(Level.FINE, e.toString(), e);    }    if (socket != null) {      getPort().closeSocket(socket);            try {	socket.close();      } catch (Throwable e) {	log.log(Level.FINE, e.toString(), e);      }    }  }  /**   * Closes the controller.   */  @Override  protected void closeControllerImpl()  {    getPort().resume(this);  }    /**   * Destroys the connection()   */  public final void destroy()  {    closeImpl();        ConnectionState state = _state;    _state = ConnectionState.DESTROYED;    if (state != ConnectionState.DESTROYED) {      getPort().kill(this);    }  }    /**   * Completion processing at the end of the thread   */  void finish()  {    closeImpl();    if (_state != ConnectionState.IDLE	&& _state != ConnectionState.DESTROYED) {      _state = _state.toIdle();      _readTask = _acceptTask;      getPort().free(this);    }  }  protected String dbgId()  {    if (_dbgId == null) {      Object serverId = Environment.getAttribute("caucho.server-id");      if (serverId != null)	_dbgId = (getClass().getSimpleName() + "[id=" + getId()		  + "," + serverId + "] ");      else	_dbgId = (getClass().getSimpleName() + "[id=" + getId() + "]");    }    return _dbgId;  }  @Override  public String toString()  {    return "TcpConnection[id=" + _id + "," + _port.toURL() + "," + _state + "]";  }    enum RequestState {    REQUEST,    THREAD_DETACHED,    EXIT  };    enum ConnectionState {    IDLE,    ACCEPT,    REQUEST,    REQUEST_ACTIVE,    REQUEST_KEEPALIVE,    COMET,    DUPLEX,    DUPLEX_KEEPALIVE,    COMPLETE,    CLOSED,    DESTROYED;        boolean isComet()    {      return this == COMET;    }        /**     * True if the state is one of the keepalive states, either     * a true keepalive-select, or comet or duplex.     */    boolean isKeepalive()    {      // || this == COMET      return (this == REQUEST_KEEPALIVE              || this == DUPLEX_KEEPALIVE);    }        boolean isActive()    {      switch (this) {      case IDLE:      case ACCEPT:      case REQUEST:      case REQUEST_ACTIVE:	return true;	      default:	return false;      }    }        boolean isRequestActive()    {      switch (this) {      case REQUEST:      case REQUEST_ACTIVE:	return true;      default:	return false;      }    }        boolean isClosed()    {      return this == CLOSED || this == DESTROYED;    }        ConnectionState toKeepaliveSelect()    {      switch (this) {      case REQUEST:      case REQUEST_ACTIVE:      case REQUEST_KEEPALIVE:        return REQUEST_KEEPALIVE;              case DUPLEX:      case DUPLEX_KEEPALIVE:        return DUPLEX_KEEPALIVE;              default:	throw new IllegalStateException(this + " is an illegal keepalive state");      }    }        ConnectionState toActive()    {      switch (this) {      case ACCEPT:      case REQUEST:      case REQUEST_ACTIVE:      case REQUEST_KEEPALIVE:	return REQUEST_ACTIVE;              case DUPLEX_KEEPALIVE:      case DUPLEX:        return DUPLEX;      case COMET:	return COMET;	      default:	throw new IllegalStateException(this + " is an illegal active state");      }    }        ConnectionState toAccept()    {      if (this != DESTROYED)	return ACCEPT;      else	throw new IllegalStateException(this + " is an illegal accept state");    }        ConnectionState toIdle()    {      if (this != DESTROYED)	return IDLE;      else	throw new IllegalStateException(this + " is an illegal idle state");    }        ConnectionState toClosed()    {      if (this != DESTROYED)	return CLOSED;      else	throw new IllegalStateException(this + " is an illegal closed state");    }  }  class AcceptTask implements Runnable {    public void run()    {      doAccept(true);    }          void doAccept(boolean isStart)    {      Port port = _port;      // next state is keepalive      _readTask = _keepaliveTask;          ServerRequest request = getRequest();      boolean isWaitForRead = request.isWaitForRead();	      Thread thread = Thread.currentThread();      String oldThreadName = thread.getName();		         thread.setName(_id);       port.threadBegin(TcpConnection.this);      ClassLoader systemLoader = ClassLoader.getSystemClassLoader();      thread.setContextClassLoader(systemLoader);      boolean isValid = false;      RequestState result = RequestState.EXIT;            try {        _thread = thread;	_state = _state.toAccept();        // while (! _state.isClosed()) {        while (! _port.isClosed() && _state != ConnectionState.DESTROYED) {          _state = _state.toAccept();          if (! _port.accept(TcpConnection.this, isStart)) {            close();            break;          }          	  if (_readTask != _keepaliveTask)	    Thread.dumpStack();	            isStart = false;                  _connectionStartTime = Alarm.getCurrentTime();                    if (isWaitForRead && ! getReadStream().waitForRead()) {            close();            continue;          }	  _request.startConnection();	  result = handleConnection();	            if (result == RequestState.THREAD_DETACHED) {            break;          }	  close();        }                isValid = true;      } catch (Throwable e) {        log.log(Level.WARNING, e.toString(), e);      } finally {        thread.setContextClassLoader(systemLoader);        port.threadEnd(TcpConnection.this);         _thread = null;        thread.setName(oldThreadName);              if (! isValid)          destroy();	if (result != RequestState.THREAD_DETACHED)	  finish();      }    }    @Override    public String toString()    {      return getClass().getSimpleName() + "[" + TcpConnection.this + "]";    }  }   class KeepaliveTask implements Runnable {    public void run()    {      Thread thread = Thread.currentThread();      String oldThreadName = thread.getName();		         thread.setName(_id);      if (_state.isKeepalive()) {	_port.keepaliveEnd(TcpConnection.this);      }      _port.threadBegin(TcpConnection.this);      RequestState result = RequestState.EXIT;            boolean isValid = false;      try {	_state = _state.toActive();	        result = handleConnection();                 isValid = true;      } finally {        thread.setName(oldThreadName);        _port.threadEnd(TcpConnection.this);                if (! isValid)          destroy();	if (result != RequestState.THREAD_DETACHED)	  close();      }      if (isValid && result != RequestState.THREAD_DETACHED) {	// acceptTask significantly faster than finishing	_acceptTask.doAccept(false);      }    }    @Override    public String toString()    {      return getClass().getSimpleName() + "[" + TcpConnection.this + "]";    }  }  class DuplexReadTask implements Runnable {    private TcpDuplexController _duplex;        DuplexReadTask(TcpDuplexController duplex)    {      _duplex = duplex;    }        public void run()    {      Thread thread = Thread.currentThread();      String oldThreadName = thread.getName();		         thread.setName(_id);      if (_state.isKeepalive()) {	_port.keepaliveEnd(TcpConnection.this);      }            _port.threadBegin(TcpConnection.this);            boolean isValid = false;      RequestState result = RequestState.EXIT;            try {	_state = _state.toActive();	        do {	  result = RequestState.EXIT;	            if (! _duplex.serviceRead())	    break;        } while ((result = keepaliveRead()) == RequestState.REQUEST);        isValid = true;      } catch (IOException e) {        log.log(Level.FINE, e.toString(), e);      } finally {        thread.setName(oldThreadName);        _port.threadEnd(TcpConnection.this);                if (! isValid)          destroy();	if (result != RequestState.THREAD_DETACHED)	  finish();      }        }  }    class ResumeTask implements Runnable {    public void run()    {      boolean isValid = false;            try {        if (getRequest().handleResume()	    && _port.suspend(TcpConnection.this)) {	  isValid = true;	}      } catch (IOException e) {        log.log(Level.FINE, e.toString(), e);      } finally {        if (! isValid) {	  finish();	}      }    }  }  class Admin extends AbstractManagedObject implements TcpConnectionMXBean {    Admin()    {      super(ClassLoader.getSystemClassLoader());    }        public String getName()    {      return _name;    }    public long getThreadId()    {      return TcpConnection.this.getThreadId();    }    public long getRequestActiveTime()    {      return TcpConnection.this.getRequestActiveTime();    }    public String getState()    {      return TcpConnection.this.getState();    }    void register()    {      registerSelf();    }    void unregister()    {      unregisterSelf();    }  }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?