hmuxrequest.java
来自「RESIN 3.2 最新源码」· Java 代码 · 共 1,842 行 · 第 1/3 页
JAVA
1,842 行
int code; int len; while (true) { code = is.read(); switch (code) { case -1: if (isLoggable) log.fine(dbgId() + "end of file"); return false; case HMUX_CHANNEL: int channel = (is.read() << 8) + is.read(); if (isLoggable) log.fine(dbgId() + "channel " + channel); break; case HMUX_QUIT: if (isLoggable) log.fine(dbgId() + (char) code + ": end of request"); return hasURI; case HMUX_EXIT: if (isLoggable) log.fine(dbgId() + (char) code + ": end of socket"); killKeepalive(); return hasURI; case HMUX_PROTOCOL: len = (is.read() << 8) + is.read(); if (len != 4) { log.fine(dbgId() + (char) code + ": protocol length (" + len + ") must be 4."); killKeepalive(); return false; } int value = ((is.read() << 24) + (is.read() << 16) + (is.read() << 8) + (is.read())); int result = HMUX_EXIT; boolean isKeepalive = false; if (value == HMUX_CLUSTER_PROTOCOL) { if (isLoggable) log.fine(dbgId() + (char) code + ": cluster protocol"); _filter.setClientClosed(true); if (_server == null || _server.isDestroyed()) { return false; } result = _clusterRequest.handleRequest(is, _rawWrite); } else if (value == HMUX_DISPATCH_PROTOCOL) { if (isLoggable) log.fine(dbgId() + (char) code + ": dispatch protocol"); _filter.setClientClosed(true); if (_server == null || _server.isDestroyed()) { return false; } isKeepalive = _dispatchRequest.handleRequest(is, _rawWrite); if (isKeepalive) result = HMUX_QUIT; else result = HMUX_EXIT; } else { if (_server == null || _server.isDestroyed()) { return false; } HmuxExtension ext = _hmuxProtocol.getExtension(value); if (ext != null) { if (isLoggable) log.fine(dbgId() + (char) code + ": extension " + ext); _filter.setClientClosed(true); result = ext.handleRequest(this, is, _rawWrite); } else { log.fine(dbgId() + (char) code + ": unknown protocol (" + value + ")"); result = HMUX_EXIT; } } if (result == HMUX_YIELD) break; else { if (result == HMUX_QUIT && ! allowKeepalive()) result = HMUX_EXIT; if (result == HMUX_QUIT) { _rawWrite.write(HMUX_QUIT); _rawWrite.flush(); } else { _rawWrite.write(HMUX_EXIT); _rawWrite.close(); } return result == HMUX_QUIT; } case HMUX_URI: hasURI = true; len = (is.read() << 8) + is.read(); _uri.setLength(len); _rawRead.readAll(_uri.getBuffer(), 0, len); if (isLoggable) log.fine(dbgId() + (char) code + ":uri " + _uri); break; case HMUX_METHOD: len = (is.read() << 8) + is.read(); is.readAll(_method, len); if (isLoggable) log.fine(dbgId() + (char) code + ":method " + _method); break; case CSE_REAL_PATH: len = (is.read() << 8) + is.read(); _cb1.clear(); _rawRead.readAll(_cb1, len); code = _rawRead.read(); if (code != HMUX_STRING) throw new IOException("protocol expected HMUX_STRING"); _cb2.clear(); _rawRead.readAll(_cb2, readLength()); //http.setRealPath(cb1.toString(), cb2.toString()); if (isLoggable) log.fine(dbgId() + (char) code + " " + _cb1.toString() + "->" + _cb2.toString()); //throw new RuntimeException(); break; case CSE_REMOTE_HOST: len = (is.read() << 8) + is.read(); _rawRead.readAll(_remoteHost, len); if (isLoggable) log.fine(dbgId() + (char) code + " " + _remoteHost); break; case CSE_REMOTE_ADDR: len = (is.read() << 8) + is.read(); _rawRead.readAll(_remoteAddr, len); if (isLoggable) log.fine(dbgId() + (char) code + " " + _remoteAddr); break; case HMUX_SERVER_NAME: len = (is.read() << 8) + is.read(); _rawRead.readAll(_serverName, len); if (isLoggable) log.fine(dbgId() + (char) code + " server-host: " + _serverName); break; case CSE_REMOTE_PORT: len = (is.read() << 8) + is.read(); _rawRead.readAll(_remotePort, len); if (isLoggable) log.fine(dbgId() + (char) code + " remote-port: " + _remotePort); break; case CSE_SERVER_PORT: len = (is.read() << 8) + is.read(); _rawRead.readAll(_serverPort, len); if (isLoggable) log.fine(dbgId() + (char) code + " server-port: " + _serverPort); break; case CSE_QUERY_STRING: len = (is.read() << 8) + is.read(); if (len > 0) { _uri.add('?'); _uri.ensureCapacity(_uri.getLength() + len); _rawRead.readAll(_uri.getBuffer(), _uri.getLength(), len); _uri.setLength(_uri.getLength() + len); } break; case CSE_PROTOCOL: len = (is.read() << 8) + is.read(); _rawRead.readAll(_protocol, len); if (isLoggable) log.fine(dbgId() + (char) code + " protocol: " + _protocol); for (int i = 0; i < len; i++) { char ch = _protocol.charAt(i); if (ch >= '0' && ch <= '9') _version = 16 * _version + ch - '0'; else if (ch == '.') _version = 16 * _version; } break; case HMUX_HEADER: len = (is.read() << 8) + is.read(); int headerSize = _headerSize; CharBuffer key = _headerKeys[headerSize]; key.clear(); CharBuffer valueCb = _headerValues[headerSize]; valueCb.clear(); _rawRead.readAll(key, len); code = _rawRead.read(); if (code != HMUX_STRING) throw new IOException("protocol expected HMUX_STRING at " + (char) code); _rawRead.readAll(valueCb, readLength()); if (isLoggable) log.fine(dbgId() + "H " + key + "=" + valueCb); if (addHeaderInt(key.getBuffer(), 0, key.length(), valueCb)) { _headerSize++; } break; case CSE_CONTENT_LENGTH: len = (is.read() << 8) + is.read(); if (_headerKeys.length <= _headerSize) resizeHeaders(); _headerKeys[_headerSize].clear(); _headerKeys[_headerSize].append("Content-Length"); _headerValues[_headerSize].clear(); _rawRead.readAll(_headerValues[_headerSize], len); if (isLoggable) log.fine(dbgId() + (char) code + " content-length=" + _headerValues[_headerSize]); _headerSize++; break; case CSE_CONTENT_TYPE: len = (is.read() << 8) + is.read(); if (_headerKeys.length <= _headerSize) resizeHeaders(); _headerKeys[_headerSize].clear(); _headerKeys[_headerSize].append("Content-Type"); _headerValues[_headerSize].clear(); _rawRead.readAll(_headerValues[_headerSize], len); if (isLoggable) log.fine(dbgId() + (char) code + " content-type=" + _headerValues[_headerSize]); _headerSize++; break; case CSE_IS_SECURE: len = (is.read() << 8) + is.read(); _isSecure = true; if (isLoggable) log.fine(dbgId() + "secure"); _rawRead.skip(len); break; case CSE_CLIENT_CERT: len = (is.read() << 8) + is.read(); _clientCert.clear(); _clientCert.setLength(len); _rawRead.readAll(_clientCert.getBuffer(), 0, len); if (isLoggable) log.fine(dbgId() + (char) code + " cert=" + _clientCert + " len:" + len); break; case CSE_SERVER_TYPE: len = (is.read() << 8) + is.read(); _cb1.clear(); _rawRead.readAll(_cb1, len); if (isLoggable) log.fine(dbgId() + (char) code + " server=" + _cb1); if (_cb1.length() > 0) _serverType = _cb1.charAt(0); break; case CSE_REMOTE_USER: len = (is.read() << 8) + is.read(); _cb.clear(); _rawRead.readAll(_cb, len); if (isLoggable) log.fine(dbgId() + (char) code + " " + _cb); setAttribute(com.caucho.server.security.AbstractAuthenticator.LOGIN_NAME, new com.caucho.security.BasicPrincipal(_cb.toString())); break; case CSE_DATA: len = (is.read() << 8) + is.read(); _pendingData = len; if (isLoggable) log.fine(dbgId() + (char) code + " post-data: " + len); return hasURI; case HMTP_MESSAGE: { len = (is.read() << 8) + is.read(); readHmtpMessage(is); hasURI = true; break; } case HMTP_QUERY_GET: { len = (is.read() << 8) + is.read(); long id = readLong(is); readHmtpQueryGet(is, id); hasURI = true; break; } case HMTP_QUERY_SET: { len = (is.read() << 8) + is.read(); long id = readLong(is); readHmtpQuerySet(is, id); hasURI = true; break; } case HMTP_QUERY_RESULT: { len = (is.read() << 8) + is.read(); long id = readLong(is); readHmtpQueryResult(is, id); hasURI = true; break; } default: len = (is.read() << 8) + is.read(); if (isLoggable) log.fine(dbgId() + (char) code + " " + len); is.skip(len); break; } } // _filter.setClientClosed(true); // return false; } private void resizeHeaders() { CharBuffer []newKeys = new CharBuffer[_headerSize * 2]; CharBuffer []newValues = new CharBuffer[_headerSize * 2]; for (int i = 0; i < _headerSize; i++) { newKeys[i] = _headerKeys[i]; newValues[i] = _headerValues[i]; } for (int i = _headerSize; i < newKeys.length; i++) { newKeys[i] = new CharBuffer(); newValues[i] = new CharBuffer(); } _headerKeys = newKeys; _headerValues = newValues; } private int readLength() throws IOException { return ((_rawRead.read() << 8) + _rawRead.read()); } private void readHmtpMessage(ReadStream is) throws IOException { String to = readString(is); String from = readString(is); try { Serializable query = (Serializable) readObject(); BamBroker broker = _server.getBroker(); BamStream brokerStream = broker.getBrokerStream(); if (log.isLoggable(Level.FINER)) log.fine(dbgId() + (char) HMTP_QUERY_GET + " hmtp message" + " to=" + to + " from=" + from + " " + query); if (brokerStream != null) { brokerStream.message(to, from, query); } } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); } } private void readHmtpQueryGet(ReadStream is, long id) throws IOException { String to = readString(is); String from = readString(is); Serializable query = (Serializable) readObject(); BamBroker broker = _server.getBroker(); BamStream brokerStream = broker.getBrokerStream(); if (log.isLoggable(Level.FINER)) log.fine(dbgId() + (char) HMTP_QUERY_GET + " queryGet id=" + id + " to=" + to + " from=" + from + " " + query); if (from != null && ! "".equals(from)) { brokerStream.queryGet(id, to, from, query); } else { // from=null means the client needs to block and return the data // on the same stream Serializable result = null; BamConnection conn = broker.getConnection("hmux", null); try { result = conn.queryGet(to, query); } finally { conn.close(); } WriteStream out = _rawWrite; out.write(HMTP_QUERY_RESULT); out.write(0); out.write(8); writeLong(out, id); writeString(HMUX_STRING, from); writeString(HMUX_STRING, to); writeObject(out, result); if (log.isLoggable(Level.FINER)) log.finer(this + " queryResult to=" + from + " from=" + to + " result=" + (result != null ? result.getClass() : null)); } } private void readHmtpQuerySet(ReadStream is, long id) throws IOException { String to = readString(is); String from = readString(is); Serializable query = (Serializable) readObject(); BamBroker broker = _server.getBroker(); BamStream brokerStream = broker.getBrokerStream(); if (log.isLoggable(Level.FINER)) log.fine(dbgId() + (char) HMTP_QUERY_SET + " querySet id=" + id + " to=" + to + " from=" + from + " " + query); if (from != null && ! "".equals(from)) { brokerStream.querySet(id, to, from, query); } else { // from=null means the client needs to block and return the data // on the same stream Serializable result = null; BamConnection conn = broker.getConnection("hmux", null); try { result = conn.querySet(to, query); } finally { conn.close(); } WriteStream out = _rawWrite; out.write(HMTP_QUERY_RESULT); out.write(0); out.write(8); writeLong(out, id); writeString(HMUX_STRING, from); writeString(HMUX_STRING, to); writeObject(out, result); if (log.isLoggable(Level.FINER)) log.finer(this + " queryResult to=" + from + " from=" + to + " result=" + (result != null ? result.getClass() : null)); } } private void readHmtpQueryResult(ReadStream is, long id) throws IOException { String to = readString(is); String from = readString(is); Serializable value = (Serializable) readObject(); BamStream hmtpStream = _server.getHmtpStream(); if (log.isLoggable(Level.FINER)) log.fine(dbgId() + (char) HMTP_QUERY_RESULT + ": hmtp queryResult id=" + id + " to=" + to + " from=" + from + " " + value); if (hmtpStream != null) { hmtpStream.queryResult(id, to, from, value); } } private Object readObject() throws IOException { if (_in == null) _in = new Hessian2StreamingInput(_rawRead); return _in.readObject(); } private String readString(ReadStream is) throws IOException { int code = is.read(); if (code != HMUX_STRING) throw new IOException(L.l("expected string at " + (char) code)); int len = (is.read() << 8) + is.read(); _cb1.clear(); _rawRead.readAll(_cb1, len); return _cb1.toString(); } private long readLong(ReadStream is) throws IOException { return (((long) is.read() << 56) + ((long) is.read() << 48) + ((long) is.read() << 40) + ((long) is.read() << 32) + ((long) is.read() << 24) + ((long) is.read() << 16) + ((long) is.read() << 8) + ((long) is.read())); } /** * Returns the header. */ public String getMethod() { if (_methodString == null) { CharSegment cb = getMethodBuffer(); if (cb.length() == 0) { _methodString = "GET"; return _methodString; } switch (cb.charAt(0)) { case 'G': _methodString = cb.equals(_getCb) ? "GET" : cb.toString(); break; case 'H': _methodString = cb.equals(_headCb) ? "HEAD" : cb.toString(); break; case 'P': _methodString = cb.equals(_postCb) ? "POST" : cb.toString(); break; default: _methodString = cb.toString(); } } return _methodString; } public CharSegment getMethodBuffer() { return _method; } /** * Returns a char buffer containing the host. */ @Override protected CharBuffer getHost() { if (_host.length() > 0) return _host; _host.append(_serverName); _host.toLowerCase(); return _host;
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?