port.java

来自「RESIN 3.2 最新源码」· Java 代码 · 共 1,800 行 · 第 1/3 页

JAVA
1,800
字号
        log.info(_protocol.getProtocolName() + " disabled *:" + _port);    }  }  /**   * returns the connection info for jmx   */  TcpConnectionInfo []connectionInfo()  {    TcpConnection []connections;    synchronized (this) {      connections = new TcpConnection[_activeList.size()];      _activeList.toArray(connections);    }    long now = Alarm.getExactTime();    TcpConnectionInfo []infoList = new TcpConnectionInfo[connections.length];    for (int i = 0 ; i < connections.length; i++) {      TcpConnection conn = connections[i];      long requestTime = -1;      long startTime = conn.getRequestStartTime();      if (conn.isRequestActive() && startTime > 0)	requestTime = now - startTime;            TcpConnectionInfo info	= new TcpConnectionInfo(conn.getId(),				conn.getThreadId(),				getAddress() + ":" + getPort(),				conn.getState().toString(),				requestTime);      infoList[i] = info;    }    return infoList;  }  /**   * returns the select manager.   */  public AbstractSelectManager getSelectManager()  {    return _selectManager;  }   /**   * Accepts a new connection.   *    * @param isStart boolean to mark the first request on the thread for   *   bookkeeping.   */  public boolean accept(TcpConnection conn, boolean isStart)  {    try {      synchronized (this) {        _idleThreadCount++;        if (isStart) {          _startThreadCount--;          if (_startThreadCount < 0) {	    _startThreadCount = 0;	    log.warning(conn + " _startThreadCount assertion failure");	    // conn.getStartThread().printStackTrace();          }        }        if (_acceptThreadMax < _idleThreadCount) {          return false;        }      }      while (_lifecycle.isActive()) {        QSocket socket = conn.startSocket();        Thread.interrupted();        if (_serverSocket.accept(socket)) {          conn.initSocket();	  if (_throttle.accept(socket))	    return true;	  else	    socket.close();        }        else {          if (_acceptThreadMax < _idleThreadCount) {            return false;          }        }      }    } catch (Throwable e) {      if (_lifecycle.isActive() && log.isLoggable(Level.FINER))        log.log(Level.FINER, e.toString(), e);    } finally {      synchronized (this) {        _idleThreadCount--;        if (_idleThreadCount + _startThreadCount < _acceptThreadMin) {          notify();        }      }    }    return false;  }  /**   * Notification when a socket closes.   */  void closeSocket(QSocket socket)  {    if (_throttle != null)      _throttle.close(socket);  }  /**   * Registers the new connection as started   */  void startConnection(TcpConnection conn)  {    synchronized (this) {      _startThreadCount--;    }  }  /**   * Marks a new thread as running.   */  void threadBegin(TcpConnection conn)  {    synchronized (_threadCountLock) {      _threadCount++;    }  }  /**   * Marks a new thread as stopped.   */  void threadEnd(TcpConnection conn)  {    synchronized (_threadCountLock) {      _threadCount--;    }  }  /**   * Returns true if the keepalive is allowed   */  public boolean allowKeepalive(long acceptStartTime)  {    synchronized (_keepaliveCountLock) {      if (! _lifecycle.isActive())	return false;      else if (acceptStartTime + _keepaliveTimeMax < Alarm.getCurrentTime())	return false;      else if (_keepaliveMax <= _keepaliveCount)	return false;      else if (_connectionMax <= _connectionCount + _minSpareConnection)	return false;      else	return true;    }  }  /**   * Marks a keepalive as starting running.  Called only from TcpConnection.   */  boolean keepaliveBegin(TcpConnection conn, long acceptStartTime)  {    synchronized (_keepaliveCountLock) {      if (! _lifecycle.isActive())        return false;      else if (_connectionMax <= _connectionCount + _minSpareConnection) {	log.warning(conn + " failed keepalive, connection-max=" + _connectionCount);	        return false;      }      else if (false &&	       acceptStartTime + _keepaliveTimeMax < Alarm.getCurrentTime()) {	// #2262 - skip this check to avoid confusing the load balancer	// the keepalive check is in allowKeepalive	log.warning(conn + " failed keepalive, delay=" + (Alarm.getCurrentTime() - acceptStartTime));		return false;      }      else if (false && _keepaliveMax <= _keepaliveCount) {	// #2262 - skip this check to avoid confusing the load balancer	// the keepalive check is in allowKeepalive	log.warning(conn + " failed keepalive, keepalive-max " + _keepaliveCount);	        return false;      }      _keepaliveCount++;      return true;    }  }  /**   * Marks the keepalive as ending. Called only from TcpConnection.   */  void keepaliveEnd(TcpConnection conn)  {    synchronized (_keepaliveCountLock) {      _keepaliveCount--;      if (_keepaliveCount < 0) {        int count = _keepaliveCount;        _keepaliveCount = 0;        log.warning(conn + " internal error: negative keepalive count " + count);	Thread.dumpStack();      }    }  }    /**   * Starts a keepalive thread.   */  void keepaliveThreadBegin()  {    synchronized (_keepaliveCountLock) {      _keepaliveThreadCount++;    }  }    /**   * Ends a keepalive thread.   */  void keepaliveThreadEnd()  {    synchronized (_keepaliveCountLock) {      _keepaliveThreadCount--;    }  }  /**   * Suspends the controller (for comet-style ajax)   */  boolean suspend(TcpConnection conn)  {    boolean isResume = false;    synchronized (_suspendList) {      if (conn.isWake()) {	isResume = true;	conn.setResume();      }      else if (conn.isComet()) {	conn.toSuspend();	_suspendList.add(conn);	return true;      }      else	return false;    }    if (isResume) {      ThreadPool.getThreadPool().schedule(conn.getResumeTask());      return true;    }    else      return false;  }  /**   * Remove from suspend list.   */  boolean detach(TcpConnection conn)  {    synchronized (_suspendList) {      return _suspendList.remove(conn);    }  }  /**   * Resumes the controller (for comet-style ajax)   */  boolean resume(TcpConnection conn)  {    synchronized (_suspendList) {      if (! _suspendList.remove(conn)) {	return false;      }            conn.setResume();    }    if (conn != null)      ThreadPool.getThreadPool().schedule(conn.getResumeTask());    return true;  }  /**   * Returns true if the port is closed.   */  public boolean isClosed()  {    return _lifecycle.isAfterActive();  }  /**   * The port thread is responsible for creating new connections.   */  public void run()  {    while (! _lifecycle.isDestroyed()) {      boolean isStart;      try {        // need delay to avoid spawing too many threads over a short time,        // when the load doesn't justify it        Thread.yield();	        // XXX: Thread.sleep(10);        synchronized (this) {          isStart = _startThreadCount + _idleThreadCount < _acceptThreadMin;	            if (_connectionMax <= _connectionCount)            isStart = false;          if (! isStart) {            Thread.interrupted();            wait(60000);          }          if (isStart) {            _connectionCount++;            _startThreadCount++;          }        }        if (isStart && _lifecycle.isActive()) {          TcpConnection conn = _freeConn.allocate();          if (conn == null) {            conn = new TcpConnection(this, _serverSocket.createSocket());            conn.setRequest(_protocol.createRequest(conn));          }	  else {	    // XXX: remove when 3.2 stable	    if (! conn._isFree) {	      log.warning(conn + " unfree allocate");	      Thread.dumpStack();	    }	    conn._isFree = false;	  }	  synchronized (this) {	    _activeList.add(conn);	  }          ThreadPool.getThreadPool().schedule(conn.getReadTask());        }      } catch (Throwable e) {        e.printStackTrace();      }    }  }  /**   * Handles the environment config phase   */  public void environmentConfigure(EnvironmentClassLoader loader)  {  }  /**   * Handles the environment bind phase   */  public void environmentBind(EnvironmentClassLoader loader)  {  }  /**   * Handles the case where the environment is starting (after init).   */  public void environmentStart(EnvironmentClassLoader loader)  {  }  /**   * Handles the case where the environment is stopping   */  public void environmentStop(EnvironmentClassLoader loader)  {    close();  }  /**   * Frees the connection.   *   * Called only from TcpConnection   */  void free(TcpConnection conn)  {    closeConnection(conn);    // XXX: remove when 3.2 stable    if (conn._isFree) {      log.warning(conn + " double free");      Thread.dumpStack();      return;    }    conn._isFree = true;    _freeConn.free(conn);  }  /**   * Frees the connection.   *   * Called only from TcpConnection   */  void kill(TcpConnection conn)  {    closeConnection(conn);  }  /**   * Closes the stats for the connection.   */  private void closeConnection(TcpConnection conn)  {    synchronized (this) {      _activeList.remove(conn);            if (_connectionCount-- == _connectionMax) {        try {          notify();        } catch (Throwable e) {        }      }    }  }  /**   * Shuts the Port down.  The server gives connections 30   * seconds to complete.   */  public void close()  {    Environment.removeEnvironmentListener(this);    if (! _lifecycle.toDestroy())      return;    if (log.isLoggable(Level.FINE))      log.fine(this + " closing");    Alarm suspendAlarm = _suspendAlarm;    _suspendAlarm = null;        if (suspendAlarm != null)      suspendAlarm.dequeue();    QServerSocket serverSocket = _serverSocket;    _serverSocket = null;    _selectManager = null;    AbstractSelectManager selectManager = null;    if (_server != null) {      selectManager = _server.getSelectManager();      _server.initSelectManager(null);    }    InetAddress localAddress = null;    int localPort = 0;    if (serverSocket != null) {      localAddress = serverSocket.getLocalAddress();      localPort = serverSocket.getLocalPort();    }    // close the server socket    if (serverSocket != null) {      try {        serverSocket.close();      } catch (Throwable e) {      }      try {        synchronized (serverSocket) {          serverSocket.notifyAll();        }      } catch (Throwable e) {      }    }    if (selectManager != null) {      try {        selectManager.close();      } catch (Throwable e) {      }    }    /*    ArrayList<TcpConnection> connections = new ArrayList<TcpConnection>();    synchronized (this) {      connections.addAll(_activeConnections);    }    */    // Close the socket server socket and send some request to make    // sure the Port accept thread is woken and dies.    // The ping is before the server socket closes to avoid    // confusing the threads    // ping the accept port to wake the listening threads    if (localPort > 0) {      int idleCount = _idleThreadCount + _startThreadCount;      for (int i = 0; i < idleCount + 10; i++) {        try {          Socket socket = new Socket();          InetSocketAddress addr;          if (localAddress == null ||              localAddress.getHostAddress().startsWith("0."))            addr = new InetSocketAddress("127.0.0.1", localPort);          else            addr = new InetSocketAddress(localAddress, localPort);          socket.connect(addr, 100);          socket.close();        } catch (ConnectException e) {        } catch (Throwable e) {          log.log(Level.FINEST, e.toString(), e);        }      }    }        TcpConnection conn;    while ((conn = _freeConn.allocate()) != null) {      conn.destroy();    }    log.finest(this + " closed");  }  public String toURL()  {    return _url;  }      @Override  public String toString()  {    return getClass().getSimpleName() + "[" + _url + "]";  }  public class SuspendReaper implements AlarmListener {    public void handleAlarm(Alarm alarm)    {      try {	ArrayList<TcpConnection> oldList = null;	long now = Alarm.getCurrentTime();	synchronized (_suspendList) {	  for (int i = _suspendList.size() - 1; i >=0; i--) {	    TcpConnection conn = _suspendList.get(i);	    if (conn.getSuspendTime() + _suspendTimeMax < now) {	      _suspendList.remove(i);	      	      if (oldList == null)		oldList = new ArrayList<TcpConnection>();	      oldList.add(conn);	    }	  }	}	if (oldList != null) {	  for (int i = 0; i < oldList.size(); i++) {	    TcpConnection conn = oldList.get(i);	    if (log.isLoggable(Level.FINE))	      log.fine(this + " suspend idle timeout " + conn);	    	    conn.destroy();	  }	}      } finally {	if (! isClosed())	  alarm.queue(60000);      }    }  }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?