serverpool.java

来自「RESIN 3.2 最新源码」· Java 代码 · 共 1,285 行 · 第 1/2 页

JAVA
1,285
字号
  }  /**   * Called when the socket read/write fails.   */  public void failConnect()  {    synchronized (this) {      _failCountTotal++;            _firstSuccessTime = 0;      // only degrade one per 100ms      _warmupState--;      long now = Alarm.getCurrentTime();      _failTime = now;      _lastFailTime = _failTime;      _lastFailConnectTime = now;      _dynamicFailRecoverTime *= 2;      if (_failRecoverTime < _dynamicFailRecoverTime)	_dynamicFailRecoverTime = _failRecoverTime;      if (_warmupState < WARMUP_MIN)	_warmupState = WARMUP_MIN;            if (_state < ST_CLOSED)	_state = ST_FAIL;    }  }  /**   * Called when the server responds with "busy", e.g. HTTP 503   */  public void busy()  {    synchronized (this) {      _lastBusyTime = Alarm.getCurrentTime();      _firstSuccessTime = 0;            _warmupState--;      if (_warmupState < 0)	_warmupState = 0;          _busyCountTotal++;            if (_state < ST_CLOSED)	_state = ST_BUSY;    }  }  /**   * Enable the client.   */  public void start()  {    synchronized (this) {      if (_state == ST_ACTIVE) {      }      else if (_state < ST_CLOSED)	_state = ST_STARTING;    }  }  /**   * Disable the client.   */  public void stop()  {    synchronized (this) {      if (_state < ST_CLOSED)	_state = ST_STANDBY;    }  }  /**   * Session only   */  public void enableSessionOnly()  {    synchronized (this) {      if (_state < ST_CLOSED && _state != ST_STANDBY)	_state = ST_SESSION_ONLY;    }  }  /**   * Open a stream to the target server.   *   * @return the socket's read/write pair.   */  public ClusterStream openSoft()  {    int state = _state;    if (! (ST_STARTING <= state && state <= ST_ACTIVE)) {      return null;    }    ClusterStream stream = openRecycle();    if (stream != null)      return stream;    if (canOpenSoft()) {      return connect();    }    else {      return null;    }  }  /**   * Open a stream to the target server object persistence.   *   * @return the socket's read/write pair.   */  public ClusterStream openIfLive()  {    if (_state == ST_CLOSED) {      return null;    }    ClusterStream stream = openRecycle();    if (stream != null)      return stream;    long now = Alarm.getCurrentTime();    if (now < _failTime + _failRecoverTime)      return null;    else if (_state == ST_FAIL && _startingCount > 0) {      // if in fail state, only one thread should try to connect      return null;    }    return connect();  }  /**   * Open a stream to the target server for a session.   *   * @return the socket's read/write pair.   */  public ClusterStream openForSession()  {    int state = _state;    if (! (ST_SESSION_ONLY <= state && state < ST_CLOSED)) {      return null;    }    ClusterStream stream = openRecycle();    if (stream != null)      return stream;    long now = Alarm.getCurrentTime();    if (now < _failTime + _failRecoverTime) {      return null;    }        if (now < _lastBusyTime + _failRecoverTime) {      return null;    }    return connect();  }  /**   * Open a stream to the target server for the load balancer.   *   * @return the socket's read/write pair.   */  public ClusterStream open()  {    int state = _state;    if (! (ST_STARTING <= state && state < ST_CLOSED))      return null;    ClusterStream stream = openRecycle();    if (stream != null)      return stream;    return connect();  }  /**   * Returns a valid recycled stream from the idle pool to the backend.   *   * If the stream has been in the pool for too long (> live_time),   * close it instead.   *   * @return the socket's read/write pair.   */  private ClusterStream openRecycle()  {    long now = Alarm.getCurrentTime();    ClusterStream stream = null;    synchronized (this) {      if (_idleHead != _idleTail) {        stream = _idle[_idleHead];        long freeTime = stream.getFreeTime();        _idle[_idleHead] = null;        _idleHead = (_idleHead + _idle.length - 1) % _idle.length;        if (now < freeTime + _loadBalanceIdleTime) {          _activeCount++;	  _keepaliveCountTotal++;          return stream;        }      }    }    if (stream != null) {      if (log.isLoggable(Level.FINER))	log.finer(this + " close idle " + stream		  + " expire=" + QDate.formatISO8601(stream.getFreeTime() + _loadBalanceIdleTime));            stream.closeImpl();    }    return null;  }  /**   * Connect to the backend server.   *   * @return the socket's read/write pair.   */  private ClusterStream connect()  {    synchronized (this) {      if (_maxConnections <= _activeCount + _startingCount)	return null;            _startingCount++;    }        try {      ReadWritePair pair = openTCPPair();      ReadStream rs = pair.getReadStream();      rs.setAttribute("timeout", new Integer((int) _loadBalanceSocketTimeout));      synchronized (this) {        _activeCount++;	_connectCountTotal++;      }      ClusterStream stream = new ClusterStream(this, _streamCount++,					       rs, pair.getWriteStream());            if (log.isLoggable(Level.FINER))	log.finer("connect " + stream);      if (_firstSuccessTime <= 0) {	if (ST_STARTING <= _state && _state < ST_ACTIVE) {	  if (_loadBalanceWarmupTime > 0)	    _state = ST_WARMUP;	  else	    _state = ST_ACTIVE;	  _firstSuccessTime = Alarm.getCurrentTime();	}	if (_warmupState < 0)	  _warmupState = 0;      }            return stream;    } catch (IOException e) {      if (log.isLoggable(Level.FINEST))	log.log(Level.FINEST, this + " " + e.toString(), e);      else	log.finer(this + " " + e.toString());      failConnect();            return null;    } finally {      synchronized (this) {	_startingCount--;      }    }  }  /**   * We now know that the server is live, e.g. if a sibling has   * contacted us.   */  public void wake()  {    synchronized (this) {      if (_state == ST_FAIL) {	_state = ST_STARTING;      }      _failTime = 0;    }  }  /**   * Free the read/write pair for reuse.  Called only from   * ClusterStream.free()   */  void free(ClusterStream stream)  {    synchronized (this) {      _activeCount--;      int size = (_idleHead - _idleTail + _idle.length) % _idle.length;      if (_state != ST_CLOSED && size < _idleSize) {        _idleHead = (_idleHead + 1) % _idle.length;        _idle[_idleHead] = stream;	stream = null;      }      long now = Alarm.getCurrentTime();      long prevSuccessTime = _prevSuccessTime;      if (prevSuccessTime > 0) {	_latencyFactor = (0.95 * _latencyFactor			  + 0.05 * (now - prevSuccessTime));      }      if (_activeCount > 0)	_prevSuccessTime = now;      else	_prevSuccessTime = 0;	      _lastSuccessTime = now;    }        updateWarmup();    long now = Alarm.getCurrentTime();    long maxIdleTime = _loadBalanceIdleTime;    ClusterStream oldStream = null;        do {      oldStream = null;      synchronized (this) {	if (_idleHead != _idleTail) {	  int nextTail = (_idleTail + 1) % _idle.length;	  	  oldStream = _idle[nextTail];	  if (oldStream != null	      && oldStream.getFreeTime() + maxIdleTime < now) {	    _idle[nextTail] = null;	    _idleTail = nextTail;	  }	  else	    oldStream = null;	}      }      if (oldStream != null)	oldStream.closeImpl();    } while (oldStream != null);    if (stream != null)      stream.closeImpl();  }  private void updateWarmup()  {    synchronized (this) {      if (! isEnabled())	return;          long now = Alarm.getCurrentTime();      int warmupState = _warmupState;      if (warmupState >= 0 && _firstSuccessTime > 0) {	warmupState = (int) ((now - _firstSuccessTime) / _warmupChunkTime);	// reset the connection fail recover time	_dynamicFailRecoverTime = 1000L;	if (WARMUP_MAX <= warmupState) {	  warmupState = WARMUP_MAX;	  toActive();	}      }      _warmupState = warmupState;    }  }  /**   * Closes the read/write pair for reuse.  Called only   * from ClusterStream.close().   */  void close(ClusterStream stream)  {    if (log.isLoggable(Level.FINER))      log.finer("close " + stream);        synchronized (this) {      _activeCount--;    }  }  /**   * Clears the recycled connections, e.g. on detection of backend   * server going down.   */  public void clearRecycle()  {    ArrayList<ClusterStream> recycleList = null;    synchronized (this) {      _idleHead = _idleTail = 0;      for (int i = 0; i < _idle.length; i++) {        ClusterStream stream;        stream = _idle[i];        _idle[i] = null;        if (stream != null) {          if (recycleList == null)            recycleList = new ArrayList<ClusterStream>();          recycleList.add(stream);        }      }    }    if (recycleList != null) {      for (ClusterStream stream : recycleList) {        stream.closeImpl();      }    }  }  /**   * Close the client   */  public void close()  {    synchronized (this) {      if (_state == ST_CLOSED)	return;      _state = ST_CLOSED;    }        synchronized (this) {      _idleHead = _idleTail = 0;    }    for (int i = 0; i < _idle.length; i++) {      ClusterStream stream;      synchronized (this) {        stream = _idle[i];        _idle[i] = null;      }      if (stream != null)        stream.closeImpl();    }  }  /**   * Open a read/write pair to the target srun connection.   *   * @return the socket's read/write pair.   */  ReadWritePair openTCPPair()    throws IOException  {    return _tcpPath.openReadWrite();  }  /**   * Returns true if can connect to the client.   */  public boolean canConnect()  {    try {      wake();      ClusterStream stream = open();      if (stream != null) {        stream.free();        return true;      }      return false;    } catch (Exception e) {      log.log(Level.FINER, e.toString(), e);      return false;    }  }  //  // BAM API  //  /**   * Non-blocking message   */  public boolean message(String to, Serializable message)  {    ClusterStream stream = null;    boolean isQuit = false;    try {      stream = open();      if (stream != null) {	stream.message(to, "", message);	isQuit = true;      }    } catch (Exception e) {      isQuit = false;            throw ConfigException.create(e);    } finally {      if (stream == null) {      }      else if (isQuit)	stream.free();      else	stream.close();    }    return isQuit;  }  /**   * Blocking 'GET' query   */  public Object queryGet(String to, Serializable query)  {    ClusterStream stream = null;    boolean isQuit = false;    try {      stream = open();      long id = 0;      stream.queryGet(id, to, "", query);      Object result = stream.readQueryResult(id);      int code = stream.getReadStream().read();      if (code == 'Q')	isQuit = true;      else if (code != 'X')	throw new IllegalStateException("unexpected code " + (char) code);      return result;    } catch (Exception e) {      isQuit = false;            throw ConfigException.create(e);    } finally {      if (stream == null) {      }      else if (isQuit)	stream.free();      else	stream.close();    }  }  public Object querySet(String to, Serializable query)    throws IOException  {    ClusterStream stream = null;    boolean isQuit = false;        try {      stream = open();      if (stream == null) {	if (log.isLoggable(Level.FINE))	  log.fine(this + " can't open for querySet");		return null;      }      long id = 0;      stream.querySet(id, to, "", query);      Object result = stream.readQueryResult(id);      int code = stream.getReadStream().read();      if (code == 'Q')	isQuit = true;      else if (code != 'X')	throw new IllegalStateException("unexpected code " + (char) code);      return result;    } catch (IOException e) {      isQuit = false;      throw e;    } catch (Exception e) {      isQuit = false;            throw new IOException(e);    } finally {      if (stream == null) {      }      else if (isQuit)	stream.free();      else	stream.close();    }  }  @Override  public String toString()  {    return (getClass().getSimpleName()	    + "[" + getDebugId()	    + "," + _address + ":" + _port + "]");  }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?