hmuxrequest.java

来自「RESIN 3.2 最新源码」· Java 代码 · 共 1,842 行 · 第 1/3 页

JAVA
1,842
字号
    int code;    int len;    while (true) {      code = is.read();      switch (code) {      case -1:        if (isLoggable)          log.fine(dbgId() + "end of file");        return false;      case HMUX_CHANNEL:	int channel = (is.read() << 8) + is.read();	        if (isLoggable)          log.fine(dbgId() + "channel " + channel);	break;      case HMUX_QUIT:        if (isLoggable)          log.fine(dbgId() + (char) code + ": end of request");                return hasURI;      case HMUX_EXIT:        if (isLoggable)          log.fine(dbgId() + (char) code + ": end of socket");        killKeepalive();                return hasURI;      case HMUX_PROTOCOL:        len = (is.read() << 8) + is.read();        if (len != 4) {          log.fine(dbgId() + (char) code + ": protocol length (" + len + ") must be 4.");          killKeepalive();          return false;        }                int value = ((is.read() << 24)		     + (is.read() << 16)		     + (is.read() << 8)		     + (is.read()));	int result = HMUX_EXIT;	boolean isKeepalive = false;        if (value == HMUX_CLUSTER_PROTOCOL) {          if (isLoggable)            log.fine(dbgId() + (char) code + ": cluster protocol");          _filter.setClientClosed(true);	  	  if (_server == null || _server.isDestroyed()) {	    return false;	  }	            result = _clusterRequest.handleRequest(is, _rawWrite);        }        else if (value == HMUX_DISPATCH_PROTOCOL) {          if (isLoggable)            log.fine(dbgId() + (char) code + ": dispatch protocol");          _filter.setClientClosed(true);	  	  if (_server == null || _server.isDestroyed()) {	    return false;	  }	            isKeepalive = _dispatchRequest.handleRequest(is, _rawWrite);	  if (isKeepalive)	    result = HMUX_QUIT;	  else	    result = HMUX_EXIT;        }	else {	  if (_server == null || _server.isDestroyed()) {	    return false;	  }	  	  HmuxExtension ext = _hmuxProtocol.getExtension(value);	  if (ext != null) {	    if (isLoggable)	      log.fine(dbgId() + (char) code + ": extension " + ext);	    _filter.setClientClosed(true);	    result = ext.handleRequest(this, is, _rawWrite);	  }	  else {	    log.fine(dbgId() + (char) code + ": unknown protocol (" + value + ")");	    result = HMUX_EXIT;	  }	}	if (result == HMUX_YIELD)	  break;	else {	  if (result == HMUX_QUIT && ! allowKeepalive())	    result = HMUX_EXIT;		  if (result == HMUX_QUIT) {	    _rawWrite.write(HMUX_QUIT);	    _rawWrite.flush();	  }	  else {	    _rawWrite.write(HMUX_EXIT);	    _rawWrite.close();	  }	  return result == HMUX_QUIT;	}      case HMUX_URI:        hasURI = true;        len = (is.read() << 8) + is.read();        _uri.setLength(len);	_rawRead.readAll(_uri.getBuffer(), 0, len);	if (isLoggable)	  log.fine(dbgId() + (char) code + ":uri " + _uri);	break;	      case HMUX_METHOD:        len = (is.read() << 8) + is.read();	is.readAll(_method, len);	if (isLoggable)	  log.fine(dbgId() +                    (char) code + ":method " + _method);	break;      case CSE_REAL_PATH:        len = (is.read() << 8) + is.read();	_cb1.clear();	_rawRead.readAll(_cb1, len);	code = _rawRead.read();	if (code != HMUX_STRING)	  throw new IOException("protocol expected HMUX_STRING");	_cb2.clear();	_rawRead.readAll(_cb2, readLength());	//http.setRealPath(cb1.toString(), cb2.toString());	if (isLoggable)	  log.fine(dbgId() + (char) code + " " +                   _cb1.toString() + "->" + _cb2.toString());	//throw new RuntimeException();	break;      case CSE_REMOTE_HOST:        len = (is.read() << 8) + is.read();	_rawRead.readAll(_remoteHost, len);	if (isLoggable)	  log.fine(dbgId() + (char) code + " " + _remoteHost);	break;      case CSE_REMOTE_ADDR:        len = (is.read() << 8) + is.read();	_rawRead.readAll(_remoteAddr, len);	if (isLoggable)	  log.fine(dbgId() + (char) code + " " + _remoteAddr);	break;      case HMUX_SERVER_NAME:        len = (is.read() << 8) + is.read();	_rawRead.readAll(_serverName, len);	if (isLoggable)	  log.fine(dbgId() + (char) code + " server-host: " + _serverName);	break;      case CSE_REMOTE_PORT:        len = (is.read() << 8) + is.read();	_rawRead.readAll(_remotePort, len);	if (isLoggable)	  log.fine(dbgId() + (char) code +                  " remote-port: " + _remotePort);	break;      case CSE_SERVER_PORT:        len = (is.read() << 8) + is.read();	_rawRead.readAll(_serverPort, len);	if (isLoggable)	  log.fine(dbgId() + (char) code +                  " server-port: " + _serverPort);	break;	      case CSE_QUERY_STRING:        len = (is.read() << 8) + is.read();        if (len > 0) {          _uri.add('?');          _uri.ensureCapacity(_uri.getLength() + len);          _rawRead.readAll(_uri.getBuffer(), _uri.getLength(), len);          _uri.setLength(_uri.getLength() + len);        }	break;      case CSE_PROTOCOL:        len = (is.read() << 8) + is.read();	_rawRead.readAll(_protocol, len);	if (isLoggable)	  log.fine(dbgId() + (char) code + " protocol: " + _protocol);	for (int i = 0; i < len; i++) {	  char ch = _protocol.charAt(i);	  if (ch >= '0' && ch <= '9')	    _version = 16 * _version + ch - '0';	  else if (ch == '.')	    _version = 16 * _version;	}	break;      case HMUX_HEADER:        len = (is.read() << 8) + is.read();		int headerSize = _headerSize;		CharBuffer key = _headerKeys[headerSize];	key.clear();		CharBuffer valueCb = _headerValues[headerSize];	valueCb.clear();		_rawRead.readAll(key, len);	code = _rawRead.read();	if (code != HMUX_STRING)	  throw new IOException("protocol expected HMUX_STRING at " + (char) code);	_rawRead.readAll(valueCb, readLength());	if (isLoggable)	  log.fine(dbgId() + "H " + key + "=" + valueCb);	if (addHeaderInt(key.getBuffer(), 0, key.length(), valueCb)) {          _headerSize++;        }	break;      case CSE_CONTENT_LENGTH:        len = (is.read() << 8) + is.read();	if (_headerKeys.length <= _headerSize)	  resizeHeaders();	_headerKeys[_headerSize].clear();	_headerKeys[_headerSize].append("Content-Length");	_headerValues[_headerSize].clear();	_rawRead.readAll(_headerValues[_headerSize], len);	if (isLoggable)	  log.fine(dbgId() + (char) code + " content-length=" +                   _headerValues[_headerSize]);	_headerSize++;	break;      case CSE_CONTENT_TYPE:        len = (is.read() << 8) + is.read();	if (_headerKeys.length <= _headerSize)	  resizeHeaders();	_headerKeys[_headerSize].clear();	_headerKeys[_headerSize].append("Content-Type");	_headerValues[_headerSize].clear();	_rawRead.readAll(_headerValues[_headerSize], len);	if (isLoggable)	  log.fine(dbgId() + (char) code + " content-type=" +                   _headerValues[_headerSize]);	_headerSize++;	break;      case CSE_IS_SECURE:        len = (is.read() << 8) + is.read();	_isSecure = true;	if (isLoggable)	  log.fine(dbgId() + "secure");	_rawRead.skip(len);	break;      case CSE_CLIENT_CERT:        len = (is.read() << 8) + is.read();	_clientCert.clear();        _clientCert.setLength(len);	_rawRead.readAll(_clientCert.getBuffer(), 0, len);	if (isLoggable)	  log.fine(dbgId() + (char) code + " cert=" + _clientCert +                   " len:" + len);	break;      case CSE_SERVER_TYPE:        len = (is.read() << 8) + is.read();	_cb1.clear();	_rawRead.readAll(_cb1, len);	if (isLoggable)	  log.fine(dbgId() + (char) code + " server=" + _cb1);        if (_cb1.length() > 0)          _serverType = _cb1.charAt(0);	break;      case CSE_REMOTE_USER:        len = (is.read() << 8) + is.read();        _cb.clear();	_rawRead.readAll(_cb, len);	if (isLoggable)	  log.fine(dbgId() + (char) code + " " + _cb);        setAttribute(com.caucho.server.security.AbstractAuthenticator.LOGIN_NAME,                     new com.caucho.security.BasicPrincipal(_cb.toString()));	break;	      case CSE_DATA:        len = (is.read() << 8) + is.read();	_pendingData = len;	if (isLoggable)	  log.fine(dbgId() + (char) code + " post-data: " + len);	return hasURI;      case HMTP_MESSAGE:	{	  len = (is.read() << 8) + is.read();	  readHmtpMessage(is);	  hasURI = true;	  break;	}      case HMTP_QUERY_GET:	{	  len = (is.read() << 8) + is.read();	  long id = readLong(is);	  readHmtpQueryGet(is, id);	  hasURI = true;	  break;	}      case HMTP_QUERY_SET:	{	  len = (is.read() << 8) + is.read();	  long id = readLong(is);	  readHmtpQuerySet(is, id);	  hasURI = true;	  break;	}      case HMTP_QUERY_RESULT:	{	  len = (is.read() << 8) + is.read();	  long id = readLong(is);	  readHmtpQueryResult(is, id);	  hasURI = true;	  break;	}      default:        len = (is.read() << 8) + is.read();	if (isLoggable)	  log.fine(dbgId() + (char) code + " " + len);	is.skip(len);	break;      }    }    // _filter.setClientClosed(true);    // return false;  }  private void resizeHeaders()  {    CharBuffer []newKeys = new CharBuffer[_headerSize * 2];    CharBuffer []newValues = new CharBuffer[_headerSize * 2];    for (int i = 0; i < _headerSize; i++) {      newKeys[i] = _headerKeys[i];      newValues[i] = _headerValues[i];    }        for (int i = _headerSize; i < newKeys.length; i++) {      newKeys[i] = new CharBuffer();      newValues[i] = new CharBuffer();    }    _headerKeys = newKeys;    _headerValues = newValues;  }  private int readLength()    throws IOException  {    return ((_rawRead.read() << 8) + _rawRead.read());  }  private void readHmtpMessage(ReadStream is)    throws IOException  {    String to = readString(is);    String from = readString(is);    try {      Serializable query = (Serializable) readObject();      BamBroker broker = _server.getBroker();      BamStream brokerStream = broker.getBrokerStream();      if (log.isLoggable(Level.FINER))	log.fine(dbgId() + (char) HMTP_QUERY_GET + " hmtp message"		 + " to=" + to + " from=" + from + " " + query);      if (brokerStream != null) {	brokerStream.message(to, from, query);      }    } catch (Exception e) {      e.printStackTrace();      throw new RuntimeException(e);    }  }  private void readHmtpQueryGet(ReadStream is, long id)    throws IOException  {    String to = readString(is);    String from = readString(is);    Serializable query = (Serializable) readObject();    BamBroker broker = _server.getBroker();    BamStream brokerStream = broker.getBrokerStream();    if (log.isLoggable(Level.FINER))      log.fine(dbgId() + (char) HMTP_QUERY_GET + " queryGet id=" + id	       + " to=" + to + " from=" + from + " " + query);    if (from != null && ! "".equals(from)) {      brokerStream.queryGet(id, to, from, query);    }    else {      // from=null means the client needs to block and return the data      // on the same stream            Serializable result = null;      BamConnection conn = broker.getConnection("hmux", null);      try {	result = conn.queryGet(to, query);      } finally {	conn.close();      }      WriteStream out = _rawWrite;      out.write(HMTP_QUERY_RESULT);      out.write(0);      out.write(8);      writeLong(out, id);      writeString(HMUX_STRING, from);      writeString(HMUX_STRING, to);      writeObject(out, result);      if (log.isLoggable(Level.FINER))	log.finer(this + " queryResult to=" + from + " from=" + to +		  " result=" + (result != null ? result.getClass() : null));    }  }  private void readHmtpQuerySet(ReadStream is, long id)    throws IOException  {    String to = readString(is);    String from = readString(is);    Serializable query = (Serializable) readObject();    BamBroker broker = _server.getBroker();    BamStream brokerStream = broker.getBrokerStream();    if (log.isLoggable(Level.FINER))      log.fine(dbgId() + (char) HMTP_QUERY_SET + " querySet id=" + id	       + " to=" + to + " from=" + from + " " + query);    if (from != null && ! "".equals(from)) {      brokerStream.querySet(id, to, from, query);    }    else {      // from=null means the client needs to block and return the data      // on the same stream            Serializable result = null;      BamConnection conn = broker.getConnection("hmux", null);      try {	result = conn.querySet(to, query);      } finally {	conn.close();      }      WriteStream out = _rawWrite;      out.write(HMTP_QUERY_RESULT);      out.write(0);      out.write(8);      writeLong(out, id);      writeString(HMUX_STRING, from);      writeString(HMUX_STRING, to);      writeObject(out, result);      if (log.isLoggable(Level.FINER))	log.finer(this + " queryResult to=" + from + " from=" + to +		  " result=" + (result != null ? result.getClass() : null));    }  }  private void readHmtpQueryResult(ReadStream is, long id)    throws IOException  {    String to = readString(is);    String from = readString(is);    Serializable value = (Serializable) readObject();    BamStream hmtpStream = _server.getHmtpStream();    if (log.isLoggable(Level.FINER))      log.fine(dbgId() + (char) HMTP_QUERY_RESULT + ": hmtp queryResult id=" + id	       + " to=" + to + " from=" + from + " " + value);    if (hmtpStream != null) {      hmtpStream.queryResult(id, to, from, value);    }  }  private Object readObject()    throws IOException  {    if (_in == null)      _in = new Hessian2StreamingInput(_rawRead);    return _in.readObject();  }  private String readString(ReadStream is)    throws IOException  {    int code = is.read();    if (code != HMUX_STRING)      throw new IOException(L.l("expected string at " + (char) code));        int len = (is.read() << 8) + is.read();    _cb1.clear();    _rawRead.readAll(_cb1, len);    return _cb1.toString();  }  private long readLong(ReadStream is)    throws IOException  {    return (((long) is.read() << 56)	    + ((long) is.read() << 48)	    + ((long) is.read() << 40)	    + ((long) is.read() << 32)	    + ((long) is.read() << 24)	    + ((long) is.read() << 16)	    + ((long) is.read() << 8)	    + ((long) is.read()));  }  /**   * Returns the header.   */  public String getMethod()  {    if (_methodString == null) {      CharSegment cb = getMethodBuffer();      if (cb.length() == 0) {        _methodString = "GET";        return _methodString;      }            switch (cb.charAt(0)) {      case 'G':        _methodString = cb.equals(_getCb) ? "GET" : cb.toString();        break;              case 'H':        _methodString = cb.equals(_headCb) ? "HEAD" : cb.toString();        break;              case 'P':        _methodString = cb.equals(_postCb) ? "POST" : cb.toString();        break;      default:        _methodString = cb.toString();      }    }    return _methodString;      }  public CharSegment getMethodBuffer()  {    return _method;  }  /**   * Returns a char buffer containing the host.   */  @Override  protected CharBuffer getHost()  {    if (_host.length() > 0)      return _host;        _host.append(_serverName);    _host.toLowerCase();    return _host;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?