tcpconnection.java

来自「RESIN 3.2 最新源码」· Java 代码 · 共 1,311 行 · 第 1/2 页

JAVA
1,311
字号
/* * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved * * This file is part of Resin(R) Open Source * * Each copy or derived work must preserve the copyright notice and this * notice unmodified. * * Resin Open Source is finishThread software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * Resin Open Source is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty * of NON-INFRINGEMENT.  See the GNU General Public License for more * details. * * You should have received a copy of the GNU General Public License * along with Resin Open Source; if not, write to the * *   Free Software Foundation, Inc. *   59 Temple Place, Suite 330 *   Boston, MA 02111-1307  USA * * @author Scott Ferguson */package com.caucho.server.port;import com.caucho.server.connection.*;import com.caucho.loader.Environment;import com.caucho.management.server.AbstractManagedObject;import com.caucho.management.server.TcpConnectionMXBean;import com.caucho.server.connection.*;import com.caucho.server.resin.Resin;import com.caucho.util.Alarm;import com.caucho.vfs.ClientDisconnectException;import com.caucho.vfs.QSocket;import com.caucho.vfs.ReadStream;import com.caucho.vfs.WriteStream;import java.io.IOException;import java.net.InetAddress;import java.util.logging.Level;import java.util.logging.Logger;/** * A protocol-independent TcpConnection.  TcpConnection controls the * TCP Socket and provides buffered streams. * * <p>Each TcpConnection has its own thread. */public class TcpConnection extends Connection{  private static final Logger log    = Logger.getLogger(TcpConnection.class.getName());    private static int _connectionCount;  private int _connectionId;  // The connection's id  private final String _id;  private String _dbgId;  private String _name;  private final Port _port;  private final QSocket _socket;    private final AcceptTask _acceptTask = new AcceptTask();  private final KeepaliveTask _keepaliveTask = new KeepaliveTask();  private final ResumeTask _resumeTask = new ResumeTask();    private ServerRequest _request;  private boolean _isSecure; // port security overrisde    private final Admin _admin = new Admin();  private final Object _requestLock = new Object();  private ConnectionState _state = ConnectionState.IDLE;    private boolean _isClosed;  private boolean _isKeepalive;  private boolean _isWake;    private Runnable _readTask = _acceptTask;    private long _idleTimeMax;    private long _accessTime;   // Time the current request started  private long _connectionStartTime;  private long _keepaliveStartTime;    private long _keepaliveExpireTime;  private long _requestStartTime;  private long _suspendTime;    private Thread _thread;  public boolean _isFree;  private Exception _startThread;  /**   * Creates a new TcpConnection.   *   * @param server The TCP server controlling the connections   * @param request The protocol Request   */  TcpConnection(Port port, QSocket socket)  {    synchronized (TcpConnection.class) {      _connectionId = _connectionCount++;    }        _port = port;    _socket = socket;        _isSecure = port.isSecure();    int id = getId();    String protocol = port.getProtocol().getProtocolName();    if (port.getAddress() == null) {      Resin resin = Resin.getLocal();      String serverId = resin != null ? resin.getServerId() : null;      if (serverId == null)	serverId = "";            _id = protocol + "-" + serverId + "-" + port.getPort() + "-" + id;      _name = protocol + "-" + port.getPort() + "-" + id;    }    else {      _id = (protocol + "-" + port.getAddress() + ":" +             port.getPort() + "-" + id);      _name = (protocol + "-" + port.getAddress() + "-" +	       port.getPort() + "-" + id);    }  }  public void setStartThread()  {    _startThread = new Exception();    _startThread.fillInStackTrace();  }  public Exception getStartThread()  {    return _startThread;  }  /**   * Sets the time of the request start.  ServerRequests can use   * setAccessTime() to put off connection reaping.  HttpRequest calls   * setAccessTime() at the beginning of each request.   *   * @param now the current time in milliseconds as by Alarm.getCurrentTime().   */  public void setAccessTime(long now)  {    _accessTime = now;  }  /**   * Returns the time the last Request began in milliseconds.   */  public long getAccessTime()  {    return _accessTime;  }  /**   * Returns true for active.   */  public boolean isActive()  {    return _state.isActive();  }  /**   * Returns true for active.   */  public boolean isRequestActive()  {    return _state.isRequestActive();  }  /**   * Returns the connection id.  Primarily for debugging.   */  public int getId()  {    return _connectionId;  }  /**   * Returns true for closed.   */  public boolean isClosed()  {    return _isClosed;  }    /**   * Sets the idle time for a keepalive connection.   */  public void setIdleTimeMax(long idleTime)  {    _idleTimeMax = idleTime;  }    /**   * The idle time for a keepalive connection   */  public long getIdleTimeMax()  {    return _idleTimeMax;  }  /**   * Sets the keepalive state.  Called only by the SelectManager and Port.   */  public void setKeepalive()  {    if (_isKeepalive)      log.warning(this + " illegal state: setting keepalive with active keepalive: ");        _isKeepalive = true;  }  /**   * Sets the keepalive state.  Called only by the SelectManager and Port.   */  public void clearKeepalive()  {    if (! _isKeepalive)      log.warning(this + " illegal state: clearing keepalive with inactive keepalive");        _isKeepalive = false;  }  /**   * Returns the keepalive expire time.   */  public long getKeepaliveExpireTime()  {    return _keepaliveExpireTime;  }    /**   * Returns the keepalive start time   */  public long getKeepaliveStartTime()  {    return _keepaliveStartTime;  }  /**   * Returns the object name for jmx.   */  public String getName()  {    return _name;  }  /**   * Returns the port which generated the connection.   */  public Port getPort()  {    return _port;  }   /**   * Returns the current read task   */  public Runnable getReadTask()  {    if (_state.isClosed())      Thread.dumpStack();    /* XXX: debugging    setStartThread();    */        return _readTask;  }   /**   * Returns the current write task   */  public Runnable getResumeTask()  {    return _resumeTask;  }  /**   * Returns the request for the connection.   */  public final ServerRequest getRequest()  {    return _request;  }  /**   * Sets the connection's request.   */  public final void setRequest(ServerRequest request)  {    _request = request;  }  @Override  public boolean isSecure()  {    if (_isClosed)      return false;    else      return _socket.isSecure() || _isSecure;  }    /**   * Returns the connection's socket   */  public QSocket getSocket()  {    return _socket;  }  /**   * Returns the time the comet suspend started   */  public long getSuspendTime()  {    return _suspendTime;  }   /**   * Initialize the socket for a new connection   */  public void initSocket()    throws IOException  {    _idleTimeMax = _port.getKeepaliveTimeout();    getWriteStream().init(_socket.getStream());    getReadStream().init(_socket.getStream(), getWriteStream());    if (log.isLoggable(Level.FINE)) {      log.fine(dbgId() + "starting connection " + this + ", total=" + _port.getConnectionCount());    }  }  /**   * Returns the connection's socket   */  public QSocket startSocket()  {    _isClosed = false;        return _socket;  }  /**   * Returns the local address of the socket.   */  public InetAddress getLocalAddress()  {    // The extra cases handle Kaffe problems.    try {      return _socket.getLocalAddress();    } catch (Exception e) {      try {	return InetAddress.getLocalHost();      } catch (Exception e1) {	try {	  return InetAddress.getByName("127.0.0.1");	} catch (Exception e2) {	  return null;	}      }    }  }  /**   * Returns the socket's local TCP port.   */  public int getLocalPort()  {    return _socket.getLocalPort();  }  /**   * Returns the socket's remote address.   */  public InetAddress getRemoteAddress()  {    return _socket.getRemoteAddress();  }  /**   * Returns the socket's remote host name.   */  @Override  public String getRemoteHost()  {    return _socket.getRemoteHost();  }  /**   * Adds from the socket's remote address.   */  @Override  public int getRemoteAddress(byte []buffer, int offset, int length)  {    return _socket.getRemoteAddress(buffer, offset, length);  }  /**   * Returns the socket's remote port   */  public int getRemotePort()  {    return _socket.getRemotePort();  }  /**   * Returns the state string.   */  public final String getState()  {    return _state.toString();  }  /**   * Returns the virtual host.   */  @Override  public String getVirtualHost()  {    return getPort().getVirtualHost();  }  /**   * Returns the thread id.   */  public final long getThreadId()  {    Thread thread = _thread;    if (thread != null)      return thread.getId();    else      return -1;  }  /**   * Returns the time the current request has taken.   */  public final long getRequestActiveTime()  {    if (_requestStartTime > 0)      return Alarm.getCurrentTime() - _requestStartTime;    else      return -1;  }  void setResume()  {    _isWake = false;    _suspendTime = 0;  }  void setWake()  {    _isWake = true;  }  boolean isWake()  {    return _isWake;  }  boolean isComet()  {    return _state.isComet();  }  /**   * Begins an active connection.   */  public final long beginActive()  {    _state = _state.toActive();    _requestStartTime = Alarm.getCurrentTime();    return _requestStartTime;  }  public final long getRequestStartTime()  {    return _requestStartTime;  }  /**   * Ends an active connection.   */  public final void endActive()  {    // state change?    // _requestStartTime = 0;  }  void toSuspend()  {    _suspendTime = Alarm.getCurrentTime();    _keepaliveExpireTime = _suspendTime + _idleTimeMax;  }  public boolean toKeepalive()  {    if (! _isKeepalive) {      _isKeepalive = _port.allowKeepalive(_connectionStartTime);    }        return _isKeepalive;  }  /**   * Starts the connection.   */  public void start()  {  }    RequestState handleConnection()  {    Thread thread = Thread.currentThread();    ClassLoader systemLoader = ClassLoader.getSystemClassLoader();     boolean isValid = false;    RequestState result = RequestState.EXIT;   try {     // clear the interrupted flag     Thread.interrupted();     boolean isStatKeepalive = _state.isKeepalive();     boolean isKeepalive;     do {       thread.setContextClassLoader(systemLoader);       _state = _state.toActive();       if (_port.isClosed()) {         return RequestState.EXIT;       }       result = RequestState.EXIT;       synchronized (_requestLock) {         isKeepalive = getRequest().handleRequest();       }       // statistics       if (_requestStartTime > 0) {	 long startTime = _requestStartTime;	 _requestStartTime = 0;	 	 if (isStatKeepalive)	   _port.addLifetimeKeepaliveCount();	 	 _port.addLifetimeRequestCount();	 	 long now = Alarm.getCurrentTime();	 _port.addLifetimeRequestTime(now - startTime);	 ReadStream rs = getReadStream();	 long readCount = rs.getPosition();	 rs.clearPosition();	 _port.addLifetimeReadBytes(readCount);	 WriteStream ws = getWriteStream();	 long writeCount = ws.getPosition();	 ws.clearPosition();	 _port.addLifetimeWriteBytes(writeCount);       }       // duplex (xmpp/hmtp) handling       if (_state == ConnectionState.DUPLEX) {	 isValid = true;	 _readTask.run();	 return RequestState.THREAD_DETACHED;       }       else if (_state == ConnectionState.COMET) {	 if (_port.suspend(this)) {	   isValid = true;	   return RequestState.THREAD_DETACHED;	 }	 else	   return RequestState.EXIT;       }     } while (isKeepalive	      && (result = keepaliveRead()) == RequestState.REQUEST);     isValid = true;     return result;   } catch (ClientDisconnectException e) {     _port.addLifetimeClientDisconnectCount();           if (log.isLoggable(Level.FINER)) {        log.finer(dbgId() + e);      }    } catch (IOException e) {      if (log.isLoggable(Level.FINE)) {        log.log(Level.FINE, dbgId() + e, e);      }    } finally {      thread.setContextClassLoader(systemLoader);      if (! isValid)        destroy();    }        return RequestState.EXIT;  }   /**   * Starts a keepalive, either returning available data or   * returning false to close the loop   *   * If keepaliveRead() returns true, data is available.   * If it returns false, either the connection is closed,   * or the connection has been registered with the select.   */  private RequestState keepaliveRead()    throws IOException  {    if (waitForKeepalive())      return RequestState.REQUEST;        Port port = _port;        _suspendTime = Alarm.getCurrentTime();    _keepaliveExpireTime = _suspendTime + _idleTimeMax;        if (! _port.keepaliveBegin(this, _connectionStartTime)) {      close();            return RequestState.EXIT;    }    

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?