serverpool.java
来自「RESIN 3.2 最新源码」· Java 代码 · 共 1,285 行 · 第 1/2 页
JAVA
1,285 行
} /** * Called when the socket read/write fails. */ public void failConnect() { synchronized (this) { _failCountTotal++; _firstSuccessTime = 0; // only degrade one per 100ms _warmupState--; long now = Alarm.getCurrentTime(); _failTime = now; _lastFailTime = _failTime; _lastFailConnectTime = now; _dynamicFailRecoverTime *= 2; if (_failRecoverTime < _dynamicFailRecoverTime) _dynamicFailRecoverTime = _failRecoverTime; if (_warmupState < WARMUP_MIN) _warmupState = WARMUP_MIN; if (_state < ST_CLOSED) _state = ST_FAIL; } } /** * Called when the server responds with "busy", e.g. HTTP 503 */ public void busy() { synchronized (this) { _lastBusyTime = Alarm.getCurrentTime(); _firstSuccessTime = 0; _warmupState--; if (_warmupState < 0) _warmupState = 0; _busyCountTotal++; if (_state < ST_CLOSED) _state = ST_BUSY; } } /** * Enable the client. */ public void start() { synchronized (this) { if (_state == ST_ACTIVE) { } else if (_state < ST_CLOSED) _state = ST_STARTING; } } /** * Disable the client. */ public void stop() { synchronized (this) { if (_state < ST_CLOSED) _state = ST_STANDBY; } } /** * Session only */ public void enableSessionOnly() { synchronized (this) { if (_state < ST_CLOSED && _state != ST_STANDBY) _state = ST_SESSION_ONLY; } } /** * Open a stream to the target server. * * @return the socket's read/write pair. */ public ClusterStream openSoft() { int state = _state; if (! (ST_STARTING <= state && state <= ST_ACTIVE)) { return null; } ClusterStream stream = openRecycle(); if (stream != null) return stream; if (canOpenSoft()) { return connect(); } else { return null; } } /** * Open a stream to the target server object persistence. * * @return the socket's read/write pair. */ public ClusterStream openIfLive() { if (_state == ST_CLOSED) { return null; } ClusterStream stream = openRecycle(); if (stream != null) return stream; long now = Alarm.getCurrentTime(); if (now < _failTime + _failRecoverTime) return null; else if (_state == ST_FAIL && _startingCount > 0) { // if in fail state, only one thread should try to connect return null; } return connect(); } /** * Open a stream to the target server for a session. * * @return the socket's read/write pair. */ public ClusterStream openForSession() { int state = _state; if (! (ST_SESSION_ONLY <= state && state < ST_CLOSED)) { return null; } ClusterStream stream = openRecycle(); if (stream != null) return stream; long now = Alarm.getCurrentTime(); if (now < _failTime + _failRecoverTime) { return null; } if (now < _lastBusyTime + _failRecoverTime) { return null; } return connect(); } /** * Open a stream to the target server for the load balancer. * * @return the socket's read/write pair. */ public ClusterStream open() { int state = _state; if (! (ST_STARTING <= state && state < ST_CLOSED)) return null; ClusterStream stream = openRecycle(); if (stream != null) return stream; return connect(); } /** * Returns a valid recycled stream from the idle pool to the backend. * * If the stream has been in the pool for too long (> live_time), * close it instead. * * @return the socket's read/write pair. */ private ClusterStream openRecycle() { long now = Alarm.getCurrentTime(); ClusterStream stream = null; synchronized (this) { if (_idleHead != _idleTail) { stream = _idle[_idleHead]; long freeTime = stream.getFreeTime(); _idle[_idleHead] = null; _idleHead = (_idleHead + _idle.length - 1) % _idle.length; if (now < freeTime + _loadBalanceIdleTime) { _activeCount++; _keepaliveCountTotal++; return stream; } } } if (stream != null) { if (log.isLoggable(Level.FINER)) log.finer(this + " close idle " + stream + " expire=" + QDate.formatISO8601(stream.getFreeTime() + _loadBalanceIdleTime)); stream.closeImpl(); } return null; } /** * Connect to the backend server. * * @return the socket's read/write pair. */ private ClusterStream connect() { synchronized (this) { if (_maxConnections <= _activeCount + _startingCount) return null; _startingCount++; } try { ReadWritePair pair = openTCPPair(); ReadStream rs = pair.getReadStream(); rs.setAttribute("timeout", new Integer((int) _loadBalanceSocketTimeout)); synchronized (this) { _activeCount++; _connectCountTotal++; } ClusterStream stream = new ClusterStream(this, _streamCount++, rs, pair.getWriteStream()); if (log.isLoggable(Level.FINER)) log.finer("connect " + stream); if (_firstSuccessTime <= 0) { if (ST_STARTING <= _state && _state < ST_ACTIVE) { if (_loadBalanceWarmupTime > 0) _state = ST_WARMUP; else _state = ST_ACTIVE; _firstSuccessTime = Alarm.getCurrentTime(); } if (_warmupState < 0) _warmupState = 0; } return stream; } catch (IOException e) { if (log.isLoggable(Level.FINEST)) log.log(Level.FINEST, this + " " + e.toString(), e); else log.finer(this + " " + e.toString()); failConnect(); return null; } finally { synchronized (this) { _startingCount--; } } } /** * We now know that the server is live, e.g. if a sibling has * contacted us. */ public void wake() { synchronized (this) { if (_state == ST_FAIL) { _state = ST_STARTING; } _failTime = 0; } } /** * Free the read/write pair for reuse. Called only from * ClusterStream.free() */ void free(ClusterStream stream) { synchronized (this) { _activeCount--; int size = (_idleHead - _idleTail + _idle.length) % _idle.length; if (_state != ST_CLOSED && size < _idleSize) { _idleHead = (_idleHead + 1) % _idle.length; _idle[_idleHead] = stream; stream = null; } long now = Alarm.getCurrentTime(); long prevSuccessTime = _prevSuccessTime; if (prevSuccessTime > 0) { _latencyFactor = (0.95 * _latencyFactor + 0.05 * (now - prevSuccessTime)); } if (_activeCount > 0) _prevSuccessTime = now; else _prevSuccessTime = 0; _lastSuccessTime = now; } updateWarmup(); long now = Alarm.getCurrentTime(); long maxIdleTime = _loadBalanceIdleTime; ClusterStream oldStream = null; do { oldStream = null; synchronized (this) { if (_idleHead != _idleTail) { int nextTail = (_idleTail + 1) % _idle.length; oldStream = _idle[nextTail]; if (oldStream != null && oldStream.getFreeTime() + maxIdleTime < now) { _idle[nextTail] = null; _idleTail = nextTail; } else oldStream = null; } } if (oldStream != null) oldStream.closeImpl(); } while (oldStream != null); if (stream != null) stream.closeImpl(); } private void updateWarmup() { synchronized (this) { if (! isEnabled()) return; long now = Alarm.getCurrentTime(); int warmupState = _warmupState; if (warmupState >= 0 && _firstSuccessTime > 0) { warmupState = (int) ((now - _firstSuccessTime) / _warmupChunkTime); // reset the connection fail recover time _dynamicFailRecoverTime = 1000L; if (WARMUP_MAX <= warmupState) { warmupState = WARMUP_MAX; toActive(); } } _warmupState = warmupState; } } /** * Closes the read/write pair for reuse. Called only * from ClusterStream.close(). */ void close(ClusterStream stream) { if (log.isLoggable(Level.FINER)) log.finer("close " + stream); synchronized (this) { _activeCount--; } } /** * Clears the recycled connections, e.g. on detection of backend * server going down. */ public void clearRecycle() { ArrayList<ClusterStream> recycleList = null; synchronized (this) { _idleHead = _idleTail = 0; for (int i = 0; i < _idle.length; i++) { ClusterStream stream; stream = _idle[i]; _idle[i] = null; if (stream != null) { if (recycleList == null) recycleList = new ArrayList<ClusterStream>(); recycleList.add(stream); } } } if (recycleList != null) { for (ClusterStream stream : recycleList) { stream.closeImpl(); } } } /** * Close the client */ public void close() { synchronized (this) { if (_state == ST_CLOSED) return; _state = ST_CLOSED; } synchronized (this) { _idleHead = _idleTail = 0; } for (int i = 0; i < _idle.length; i++) { ClusterStream stream; synchronized (this) { stream = _idle[i]; _idle[i] = null; } if (stream != null) stream.closeImpl(); } } /** * Open a read/write pair to the target srun connection. * * @return the socket's read/write pair. */ ReadWritePair openTCPPair() throws IOException { return _tcpPath.openReadWrite(); } /** * Returns true if can connect to the client. */ public boolean canConnect() { try { wake(); ClusterStream stream = open(); if (stream != null) { stream.free(); return true; } return false; } catch (Exception e) { log.log(Level.FINER, e.toString(), e); return false; } } // // BAM API // /** * Non-blocking message */ public boolean message(String to, Serializable message) { ClusterStream stream = null; boolean isQuit = false; try { stream = open(); if (stream != null) { stream.message(to, "", message); isQuit = true; } } catch (Exception e) { isQuit = false; throw ConfigException.create(e); } finally { if (stream == null) { } else if (isQuit) stream.free(); else stream.close(); } return isQuit; } /** * Blocking 'GET' query */ public Object queryGet(String to, Serializable query) { ClusterStream stream = null; boolean isQuit = false; try { stream = open(); long id = 0; stream.queryGet(id, to, "", query); Object result = stream.readQueryResult(id); int code = stream.getReadStream().read(); if (code == 'Q') isQuit = true; else if (code != 'X') throw new IllegalStateException("unexpected code " + (char) code); return result; } catch (Exception e) { isQuit = false; throw ConfigException.create(e); } finally { if (stream == null) { } else if (isQuit) stream.free(); else stream.close(); } } public Object querySet(String to, Serializable query) throws IOException { ClusterStream stream = null; boolean isQuit = false; try { stream = open(); if (stream == null) { if (log.isLoggable(Level.FINE)) log.fine(this + " can't open for querySet"); return null; } long id = 0; stream.querySet(id, to, "", query); Object result = stream.readQueryResult(id); int code = stream.getReadStream().read(); if (code == 'Q') isQuit = true; else if (code != 'X') throw new IllegalStateException("unexpected code " + (char) code); return result; } catch (IOException e) { isQuit = false; throw e; } catch (Exception e) { isQuit = false; throw new IOException(e); } finally { if (stream == null) { } else if (isQuit) stream.free(); else stream.close(); } } @Override public String toString() { return (getClass().getSimpleName() + "[" + getDebugId() + "," + _address + ":" + _port + "]"); }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?