port.java
来自「RESIN 3.2 最新源码」· Java 代码 · 共 1,800 行 · 第 1/3 页
JAVA
1,800 行
log.info(_protocol.getProtocolName() + " disabled *:" + _port); } } /** * returns the connection info for jmx */ TcpConnectionInfo []connectionInfo() { TcpConnection []connections; synchronized (this) { connections = new TcpConnection[_activeList.size()]; _activeList.toArray(connections); } long now = Alarm.getExactTime(); TcpConnectionInfo []infoList = new TcpConnectionInfo[connections.length]; for (int i = 0 ; i < connections.length; i++) { TcpConnection conn = connections[i]; long requestTime = -1; long startTime = conn.getRequestStartTime(); if (conn.isRequestActive() && startTime > 0) requestTime = now - startTime; TcpConnectionInfo info = new TcpConnectionInfo(conn.getId(), conn.getThreadId(), getAddress() + ":" + getPort(), conn.getState().toString(), requestTime); infoList[i] = info; } return infoList; } /** * returns the select manager. */ public AbstractSelectManager getSelectManager() { return _selectManager; } /** * Accepts a new connection. * * @param isStart boolean to mark the first request on the thread for * bookkeeping. */ public boolean accept(TcpConnection conn, boolean isStart) { try { synchronized (this) { _idleThreadCount++; if (isStart) { _startThreadCount--; if (_startThreadCount < 0) { _startThreadCount = 0; log.warning(conn + " _startThreadCount assertion failure"); // conn.getStartThread().printStackTrace(); } } if (_acceptThreadMax < _idleThreadCount) { return false; } } while (_lifecycle.isActive()) { QSocket socket = conn.startSocket(); Thread.interrupted(); if (_serverSocket.accept(socket)) { conn.initSocket(); if (_throttle.accept(socket)) return true; else socket.close(); } else { if (_acceptThreadMax < _idleThreadCount) { return false; } } } } catch (Throwable e) { if (_lifecycle.isActive() && log.isLoggable(Level.FINER)) log.log(Level.FINER, e.toString(), e); } finally { synchronized (this) { _idleThreadCount--; if (_idleThreadCount + _startThreadCount < _acceptThreadMin) { notify(); } } } return false; } /** * Notification when a socket closes. */ void closeSocket(QSocket socket) { if (_throttle != null) _throttle.close(socket); } /** * Registers the new connection as started */ void startConnection(TcpConnection conn) { synchronized (this) { _startThreadCount--; } } /** * Marks a new thread as running. */ void threadBegin(TcpConnection conn) { synchronized (_threadCountLock) { _threadCount++; } } /** * Marks a new thread as stopped. */ void threadEnd(TcpConnection conn) { synchronized (_threadCountLock) { _threadCount--; } } /** * Returns true if the keepalive is allowed */ public boolean allowKeepalive(long acceptStartTime) { synchronized (_keepaliveCountLock) { if (! _lifecycle.isActive()) return false; else if (acceptStartTime + _keepaliveTimeMax < Alarm.getCurrentTime()) return false; else if (_keepaliveMax <= _keepaliveCount) return false; else if (_connectionMax <= _connectionCount + _minSpareConnection) return false; else return true; } } /** * Marks a keepalive as starting running. Called only from TcpConnection. */ boolean keepaliveBegin(TcpConnection conn, long acceptStartTime) { synchronized (_keepaliveCountLock) { if (! _lifecycle.isActive()) return false; else if (_connectionMax <= _connectionCount + _minSpareConnection) { log.warning(conn + " failed keepalive, connection-max=" + _connectionCount); return false; } else if (false && acceptStartTime + _keepaliveTimeMax < Alarm.getCurrentTime()) { // #2262 - skip this check to avoid confusing the load balancer // the keepalive check is in allowKeepalive log.warning(conn + " failed keepalive, delay=" + (Alarm.getCurrentTime() - acceptStartTime)); return false; } else if (false && _keepaliveMax <= _keepaliveCount) { // #2262 - skip this check to avoid confusing the load balancer // the keepalive check is in allowKeepalive log.warning(conn + " failed keepalive, keepalive-max " + _keepaliveCount); return false; } _keepaliveCount++; return true; } } /** * Marks the keepalive as ending. Called only from TcpConnection. */ void keepaliveEnd(TcpConnection conn) { synchronized (_keepaliveCountLock) { _keepaliveCount--; if (_keepaliveCount < 0) { int count = _keepaliveCount; _keepaliveCount = 0; log.warning(conn + " internal error: negative keepalive count " + count); Thread.dumpStack(); } } } /** * Starts a keepalive thread. */ void keepaliveThreadBegin() { synchronized (_keepaliveCountLock) { _keepaliveThreadCount++; } } /** * Ends a keepalive thread. */ void keepaliveThreadEnd() { synchronized (_keepaliveCountLock) { _keepaliveThreadCount--; } } /** * Suspends the controller (for comet-style ajax) */ boolean suspend(TcpConnection conn) { boolean isResume = false; synchronized (_suspendList) { if (conn.isWake()) { isResume = true; conn.setResume(); } else if (conn.isComet()) { conn.toSuspend(); _suspendList.add(conn); return true; } else return false; } if (isResume) { ThreadPool.getThreadPool().schedule(conn.getResumeTask()); return true; } else return false; } /** * Remove from suspend list. */ boolean detach(TcpConnection conn) { synchronized (_suspendList) { return _suspendList.remove(conn); } } /** * Resumes the controller (for comet-style ajax) */ boolean resume(TcpConnection conn) { synchronized (_suspendList) { if (! _suspendList.remove(conn)) { return false; } conn.setResume(); } if (conn != null) ThreadPool.getThreadPool().schedule(conn.getResumeTask()); return true; } /** * Returns true if the port is closed. */ public boolean isClosed() { return _lifecycle.isAfterActive(); } /** * The port thread is responsible for creating new connections. */ public void run() { while (! _lifecycle.isDestroyed()) { boolean isStart; try { // need delay to avoid spawing too many threads over a short time, // when the load doesn't justify it Thread.yield(); // XXX: Thread.sleep(10); synchronized (this) { isStart = _startThreadCount + _idleThreadCount < _acceptThreadMin; if (_connectionMax <= _connectionCount) isStart = false; if (! isStart) { Thread.interrupted(); wait(60000); } if (isStart) { _connectionCount++; _startThreadCount++; } } if (isStart && _lifecycle.isActive()) { TcpConnection conn = _freeConn.allocate(); if (conn == null) { conn = new TcpConnection(this, _serverSocket.createSocket()); conn.setRequest(_protocol.createRequest(conn)); } else { // XXX: remove when 3.2 stable if (! conn._isFree) { log.warning(conn + " unfree allocate"); Thread.dumpStack(); } conn._isFree = false; } synchronized (this) { _activeList.add(conn); } ThreadPool.getThreadPool().schedule(conn.getReadTask()); } } catch (Throwable e) { e.printStackTrace(); } } } /** * Handles the environment config phase */ public void environmentConfigure(EnvironmentClassLoader loader) { } /** * Handles the environment bind phase */ public void environmentBind(EnvironmentClassLoader loader) { } /** * Handles the case where the environment is starting (after init). */ public void environmentStart(EnvironmentClassLoader loader) { } /** * Handles the case where the environment is stopping */ public void environmentStop(EnvironmentClassLoader loader) { close(); } /** * Frees the connection. * * Called only from TcpConnection */ void free(TcpConnection conn) { closeConnection(conn); // XXX: remove when 3.2 stable if (conn._isFree) { log.warning(conn + " double free"); Thread.dumpStack(); return; } conn._isFree = true; _freeConn.free(conn); } /** * Frees the connection. * * Called only from TcpConnection */ void kill(TcpConnection conn) { closeConnection(conn); } /** * Closes the stats for the connection. */ private void closeConnection(TcpConnection conn) { synchronized (this) { _activeList.remove(conn); if (_connectionCount-- == _connectionMax) { try { notify(); } catch (Throwable e) { } } } } /** * Shuts the Port down. The server gives connections 30 * seconds to complete. */ public void close() { Environment.removeEnvironmentListener(this); if (! _lifecycle.toDestroy()) return; if (log.isLoggable(Level.FINE)) log.fine(this + " closing"); Alarm suspendAlarm = _suspendAlarm; _suspendAlarm = null; if (suspendAlarm != null) suspendAlarm.dequeue(); QServerSocket serverSocket = _serverSocket; _serverSocket = null; _selectManager = null; AbstractSelectManager selectManager = null; if (_server != null) { selectManager = _server.getSelectManager(); _server.initSelectManager(null); } InetAddress localAddress = null; int localPort = 0; if (serverSocket != null) { localAddress = serverSocket.getLocalAddress(); localPort = serverSocket.getLocalPort(); } // close the server socket if (serverSocket != null) { try { serverSocket.close(); } catch (Throwable e) { } try { synchronized (serverSocket) { serverSocket.notifyAll(); } } catch (Throwable e) { } } if (selectManager != null) { try { selectManager.close(); } catch (Throwable e) { } } /* ArrayList<TcpConnection> connections = new ArrayList<TcpConnection>(); synchronized (this) { connections.addAll(_activeConnections); } */ // Close the socket server socket and send some request to make // sure the Port accept thread is woken and dies. // The ping is before the server socket closes to avoid // confusing the threads // ping the accept port to wake the listening threads if (localPort > 0) { int idleCount = _idleThreadCount + _startThreadCount; for (int i = 0; i < idleCount + 10; i++) { try { Socket socket = new Socket(); InetSocketAddress addr; if (localAddress == null || localAddress.getHostAddress().startsWith("0.")) addr = new InetSocketAddress("127.0.0.1", localPort); else addr = new InetSocketAddress(localAddress, localPort); socket.connect(addr, 100); socket.close(); } catch (ConnectException e) { } catch (Throwable e) { log.log(Level.FINEST, e.toString(), e); } } } TcpConnection conn; while ((conn = _freeConn.allocate()) != null) { conn.destroy(); } log.finest(this + " closed"); } public String toURL() { return _url; } @Override public String toString() { return getClass().getSimpleName() + "[" + _url + "]"; } public class SuspendReaper implements AlarmListener { public void handleAlarm(Alarm alarm) { try { ArrayList<TcpConnection> oldList = null; long now = Alarm.getCurrentTime(); synchronized (_suspendList) { for (int i = _suspendList.size() - 1; i >=0; i--) { TcpConnection conn = _suspendList.get(i); if (conn.getSuspendTime() + _suspendTimeMax < now) { _suspendList.remove(i); if (oldList == null) oldList = new ArrayList<TcpConnection>(); oldList.add(conn); } } } if (oldList != null) { for (int i = 0; i < oldList.size(); i++) { TcpConnection conn = oldList.get(i); if (log.isLoggable(Level.FINE)) log.fine(this + " suspend idle timeout " + conn); conn.destroy(); } } } finally { if (! isClosed()) alarm.queue(60000); } } }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?