tcpconnection.java
来自「RESIN 3.2 最新源码」· Java 代码 · 共 1,311 行 · 第 1/2 页
JAVA
1,311 行
_state = _state.toKeepaliveSelect(); if (_port.getSelectManager() != null) { if (_port.getSelectManager().keepalive(this)) { if (log.isLoggable(Level.FINE)) log.fine(dbgId() + " keepalive (select)"); return RequestState.THREAD_DETACHED; } else { log.warning(dbgId() + " failed keepalive (select)"); _state = _state.toActive(); port.keepaliveEnd(this); close(); return RequestState.EXIT; } } else { if (log.isLoggable(Level.FINE)) log.fine(dbgId() + " keepalive (thread)"); if (getReadStream().waitForRead()) { port.keepaliveEnd(this); _state = _state.toActive(); return RequestState.REQUEST; } else { _state = _state.toActive(); port.keepaliveEnd(this); close(); return RequestState.EXIT; } } } /** * Try to read nonblock */ private boolean waitForKeepalive() throws IOException { if (_state.isClosed()) return false; Port port = getPort(); if (port.isClosed()) return false; ReadStream is = getReadStream(); if (getReadStream().getBufferAvailable() > 0) return true; long timeout = port.getKeepaliveTimeout(); boolean isSelectManager = port.getServer().isSelectManagerEnabled(); if (isSelectManager) { timeout = port.getKeepaliveSelectThreadTimeout(); } if (timeout > 0 && timeout < port.getSocketTimeout()) { port.keepaliveThreadBegin(); try { return is.fillWithTimeout(timeout); } finally { port.keepaliveThreadEnd(); } } else if (isSelectManager) return false; else return true; } /** * Sets controller */ @Override public void setController(ConnectionController controller) { super.setController(controller); if (controller.isDuplex()) { if (log.isLoggable(Level.FINER)) log.finer(this + " starting duplex"); TcpDuplexController duplex = (TcpDuplexController) controller; _state = ConnectionState.DUPLEX; _readTask = new DuplexReadTask(duplex); } else { if (log.isLoggable(Level.FINER)) log.finer(this + " starting comet"); _state = ConnectionState.COMET; } } /** * Wakes the connection (comet-style). */ @Override protected boolean wake() { ConnectionController controller = getController(); if (controller == null) return false; _isWake = true; // comet if (getPort().resume(this)) { log.fine(dbgId() + " wake"); return true; } log.fine(dbgId() + " wake failed"); return false; } /** * Closes on shutdown. */ public void closeOnShutdown() { QSocket socket = _socket; if (socket != null) { try { socket.close(); } catch (Throwable e) { log.log(Level.FINE, e.toString(), e); } Thread.yield(); } } public void close() { closeImpl(); } /** * Closes the connection. */ private void closeImpl() { QSocket socket = _socket; ConnectionState state; synchronized (this) { state = _state; if (state == ConnectionState.IDLE || state.isClosed()) return; _state = _state.toClosed(); } // detach any comet getPort().detach(this); getRequest().protocolCloseEvent(); ConnectionController controller = getController(); if (controller != null) controller.close(); _isKeepalive = false; Port port = getPort(); if (state.isKeepalive()) { port.keepaliveEnd(this); } if (log.isLoggable(Level.FINER)) { if (port != null) log.finer(dbgId() + "closing connection " + this + ", total=" + port.getConnectionCount()); else log.finer(dbgId() + "closing connection " + this); } _isWake = false; try { getWriteStream().close(); } catch (Throwable e) { log.log(Level.FINE, e.toString(), e); } try { getReadStream().close(); } catch (Throwable e) { log.log(Level.FINE, e.toString(), e); } if (socket != null) { getPort().closeSocket(socket); try { socket.close(); } catch (Throwable e) { log.log(Level.FINE, e.toString(), e); } } } /** * Closes the controller. */ @Override protected void closeControllerImpl() { getPort().resume(this); } /** * Destroys the connection() */ public final void destroy() { closeImpl(); ConnectionState state = _state; _state = ConnectionState.DESTROYED; if (state != ConnectionState.DESTROYED) { getPort().kill(this); } } /** * Completion processing at the end of the thread */ void finish() { closeImpl(); if (_state != ConnectionState.IDLE && _state != ConnectionState.DESTROYED) { _state = _state.toIdle(); _readTask = _acceptTask; getPort().free(this); } } protected String dbgId() { if (_dbgId == null) { Object serverId = Environment.getAttribute("caucho.server-id"); if (serverId != null) _dbgId = (getClass().getSimpleName() + "[id=" + getId() + "," + serverId + "] "); else _dbgId = (getClass().getSimpleName() + "[id=" + getId() + "]"); } return _dbgId; } @Override public String toString() { return "TcpConnection[id=" + _id + "," + _port.toURL() + "," + _state + "]"; } enum RequestState { REQUEST, THREAD_DETACHED, EXIT }; enum ConnectionState { IDLE, ACCEPT, REQUEST, REQUEST_ACTIVE, REQUEST_KEEPALIVE, COMET, DUPLEX, DUPLEX_KEEPALIVE, COMPLETE, CLOSED, DESTROYED; boolean isComet() { return this == COMET; } /** * True if the state is one of the keepalive states, either * a true keepalive-select, or comet or duplex. */ boolean isKeepalive() { // || this == COMET return (this == REQUEST_KEEPALIVE || this == DUPLEX_KEEPALIVE); } boolean isActive() { switch (this) { case IDLE: case ACCEPT: case REQUEST: case REQUEST_ACTIVE: return true; default: return false; } } boolean isRequestActive() { switch (this) { case REQUEST: case REQUEST_ACTIVE: return true; default: return false; } } boolean isClosed() { return this == CLOSED || this == DESTROYED; } ConnectionState toKeepaliveSelect() { switch (this) { case REQUEST: case REQUEST_ACTIVE: case REQUEST_KEEPALIVE: return REQUEST_KEEPALIVE; case DUPLEX: case DUPLEX_KEEPALIVE: return DUPLEX_KEEPALIVE; default: throw new IllegalStateException(this + " is an illegal keepalive state"); } } ConnectionState toActive() { switch (this) { case ACCEPT: case REQUEST: case REQUEST_ACTIVE: case REQUEST_KEEPALIVE: return REQUEST_ACTIVE; case DUPLEX_KEEPALIVE: case DUPLEX: return DUPLEX; case COMET: return COMET; default: throw new IllegalStateException(this + " is an illegal active state"); } } ConnectionState toAccept() { if (this != DESTROYED) return ACCEPT; else throw new IllegalStateException(this + " is an illegal accept state"); } ConnectionState toIdle() { if (this != DESTROYED) return IDLE; else throw new IllegalStateException(this + " is an illegal idle state"); } ConnectionState toClosed() { if (this != DESTROYED) return CLOSED; else throw new IllegalStateException(this + " is an illegal closed state"); } } class AcceptTask implements Runnable { public void run() { doAccept(true); } void doAccept(boolean isStart) { Port port = _port; // next state is keepalive _readTask = _keepaliveTask; ServerRequest request = getRequest(); boolean isWaitForRead = request.isWaitForRead(); Thread thread = Thread.currentThread(); String oldThreadName = thread.getName(); thread.setName(_id); port.threadBegin(TcpConnection.this); ClassLoader systemLoader = ClassLoader.getSystemClassLoader(); thread.setContextClassLoader(systemLoader); boolean isValid = false; RequestState result = RequestState.EXIT; try { _thread = thread; _state = _state.toAccept(); // while (! _state.isClosed()) { while (! _port.isClosed() && _state != ConnectionState.DESTROYED) { _state = _state.toAccept(); if (! _port.accept(TcpConnection.this, isStart)) { close(); break; } if (_readTask != _keepaliveTask) Thread.dumpStack(); isStart = false; _connectionStartTime = Alarm.getCurrentTime(); if (isWaitForRead && ! getReadStream().waitForRead()) { close(); continue; } _request.startConnection(); result = handleConnection(); if (result == RequestState.THREAD_DETACHED) { break; } close(); } isValid = true; } catch (Throwable e) { log.log(Level.WARNING, e.toString(), e); } finally { thread.setContextClassLoader(systemLoader); port.threadEnd(TcpConnection.this); _thread = null; thread.setName(oldThreadName); if (! isValid) destroy(); if (result != RequestState.THREAD_DETACHED) finish(); } } @Override public String toString() { return getClass().getSimpleName() + "[" + TcpConnection.this + "]"; } } class KeepaliveTask implements Runnable { public void run() { Thread thread = Thread.currentThread(); String oldThreadName = thread.getName(); thread.setName(_id); if (_state.isKeepalive()) { _port.keepaliveEnd(TcpConnection.this); } _port.threadBegin(TcpConnection.this); RequestState result = RequestState.EXIT; boolean isValid = false; try { _state = _state.toActive(); result = handleConnection(); isValid = true; } finally { thread.setName(oldThreadName); _port.threadEnd(TcpConnection.this); if (! isValid) destroy(); if (result != RequestState.THREAD_DETACHED) close(); } if (isValid && result != RequestState.THREAD_DETACHED) { // acceptTask significantly faster than finishing _acceptTask.doAccept(false); } } @Override public String toString() { return getClass().getSimpleName() + "[" + TcpConnection.this + "]"; } } class DuplexReadTask implements Runnable { private TcpDuplexController _duplex; DuplexReadTask(TcpDuplexController duplex) { _duplex = duplex; } public void run() { Thread thread = Thread.currentThread(); String oldThreadName = thread.getName(); thread.setName(_id); if (_state.isKeepalive()) { _port.keepaliveEnd(TcpConnection.this); } _port.threadBegin(TcpConnection.this); boolean isValid = false; RequestState result = RequestState.EXIT; try { _state = _state.toActive(); do { result = RequestState.EXIT; if (! _duplex.serviceRead()) break; } while ((result = keepaliveRead()) == RequestState.REQUEST); isValid = true; } catch (IOException e) { log.log(Level.FINE, e.toString(), e); } finally { thread.setName(oldThreadName); _port.threadEnd(TcpConnection.this); if (! isValid) destroy(); if (result != RequestState.THREAD_DETACHED) finish(); } } } class ResumeTask implements Runnable { public void run() { boolean isValid = false; try { if (getRequest().handleResume() && _port.suspend(TcpConnection.this)) { isValid = true; } } catch (IOException e) { log.log(Level.FINE, e.toString(), e); } finally { if (! isValid) { finish(); } } } } class Admin extends AbstractManagedObject implements TcpConnectionMXBean { Admin() { super(ClassLoader.getSystemClassLoader()); } public String getName() { return _name; } public long getThreadId() { return TcpConnection.this.getThreadId(); } public long getRequestActiveTime() { return TcpConnection.this.getRequestActiveTime(); } public String getState() { return TcpConnection.this.getState(); } void register() { registerSelf(); } void unregister() { unregisterSelf(); } }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?