connection.java
来自「一款Java实现的HTTP代理服务器」· Java 代码 · 共 1,060 行 · 第 1/2 页
JAVA
1,060 行
package rabbit.proxy;import java.io.ByteArrayOutputStream;import java.io.FileNotFoundException;import java.io.IOException;import java.io.PrintWriter;import java.io.StringWriter;import java.net.URL;import java.net.MalformedURLException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Date;import java.util.List;import rabbit.cache.Cache;import rabbit.cache.CacheEntry;import rabbit.filter.HttpFilter;import rabbit.handler.BaseHandler;import rabbit.handler.Handler;import rabbit.handler.HandlerFactory;import rabbit.handler.MultiPartHandler;import rabbit.handler.ResourceSource;import rabbit.http.HttpDateParser;import rabbit.http.HttpHeader;import rabbit.io.WebConnection;import rabbit.util.Counter;import rabbit.util.Logger;import rabbit.util.TrafficLogger;/** The base connection class for rabbit. * * This is the class that handle the http protocoll for proxies. * * @author <a href="mailto:robo@khelekore.org">Robert Olofsson</a> */public class Connection { /** The id of this connection. */ private ConnectionId id; /** The client channel */ private SocketChannel channel; /** The current request */ private HttpHeader request; /** The current byte buffer */ private ByteBuffer requestBuffer; /** The selector to use */ private Selector selector; /** The proxy we are serving */ private HttpProxy proxy; /** The current status of this connection. */ private String status; /** The time this connection was started. */ private long started; private boolean keepalive = true; private boolean meta = false; private boolean chunk = true; private boolean mayUseCache = true; private boolean mayCache = true; private boolean mayFilter = true; private boolean mustRevalidate = false; private boolean addedINM = false; private boolean addedIMS = false; /** If the user has authenticated himself */ private String userName = null; private String password = null; /* Current status information */ private String requestVersion = null; private String requestLine = null; private String statusCode = null; private String extraInfo = null; private String contentLength = null; private ClientResourceHandler clientResourceHandler; private StandardResponseHeaders responseHandler = new StandardResponseHeaders (this); private TrafficLoggerHandler tlh = new TrafficLoggerHandler (); public Connection (ConnectionId id, SocketChannel channel, Selector selector, HttpProxy proxy) { this.id = id; this.channel = channel; this.selector = selector; this.proxy = proxy; proxy.addCurrentConnection (this); } public ConnectionId getId () { return id; } /** Read a request. */ public void readRequest () { clearStatuses (); try { channel.socket ().setTcpNoDelay (true); HttpHeaderListener clientListener = new RequestListener (); HttpHeaderReader requestReader = new HttpHeaderReader (channel, requestBuffer, selector, getLogger (), tlh.getClient (), true, proxy.getStrictHttp (), clientListener); } catch (Throwable ex) { handleFailedRequestRead (ex); } } private void handleFailedRequestRead (Throwable t) { if (t instanceof RequestLineTooLongException) { HttpHeader err = getHttpGenerator ().get414 (); sendAndClose (err); } else { getLogger ().logInfo ("Exception when reading request: " + getStackTrace (t)); closeDown (); } } private class RequestListener implements HttpHeaderListener { public void httpHeaderRead (HttpHeader header, ByteBuffer buffer, boolean keepalive, boolean isChunked, long dataSize) { setKeepalive (keepalive); requestRead (header, buffer, isChunked, dataSize); } public void closed () { closeDown (); } public void timeout () { getLogger ().logInfo ("Timeout when reading client request"); closeDown (); } public void failed (Exception e) { handleFailedRequestRead (e); } } private String getStackTrace (Throwable t) { StringWriter sw = new StringWriter (); PrintWriter ps = new PrintWriter (sw); t.printStackTrace (ps); return sw.toString (); } private void handleInternalError (Throwable t) { extraInfo = extraInfo != null ? extraInfo + t.toString () : t.toString (); String message = getStackTrace (t); getLogger ().logError ("Internal Error: " + message); HttpHeader internalError = getHttpGenerator ().get500 (t); sendAndClose (internalError); } private void requestRead (HttpHeader request, ByteBuffer buffer, boolean isChunked, long dataSize) { status = "Request read, processing"; this.request = request; this.requestBuffer = buffer; requestVersion = request.getHTTPVersion (); if (requestVersion == null) { // TODO: fix http/0.9 handling. getLogger ().logInfo ("bad header read: " + request); closeDown (); return; } requestVersion = requestVersion.toUpperCase (); request.addHeader ("Via", requestVersion + " RabbIT"); requestLine = request.getRequestLine (); getCounter ().inc ("Requests"); try { // SSL requests are special in a way... // Don't depend upon being able to build URLs from the header... if (request.isSSLRequest ()) { status = "Handling ssl request"; checkAndHandleSSL (requestBuffer); return; } // Read in any posted data. if (isChunked) { setMayUseCache (false); setMayCache (false); status = "Request read, reading chunked data"; setupChunkedContent (); } String ct = null; ct = request.getHeader ("Content-Type"); if (request.getContent () == null && (ct == null || !ct.startsWith ("multipart/byteranges")) && dataSize > -1) { setupClientResourceHandler (dataSize); } if (ct != null) { status = "Request read, reading multipart data"; readMultiPart (ct); } filterAndHandleRequest (); } catch (Throwable t) { handleInternalError (t); } } /** Filter the request and handle it. * @param header the request */ // TODO: filtering here may block! be prepared to run filters in a // TODO: separate thread. private void filterAndHandleRequest () { // Filter the request based on the header. // A response means that the request is blocked. // For ad blocking, bad header configuration (http/1.1 correctness) ... HttpHeaderFilterer filterer = proxy.getHttpHeaderFilterer (); HttpHeader badresponse = filterer.filterHttpIn (this, channel, request); if (badresponse != null) { statusCode = badresponse.getStatusCode (); sendAndClose (badresponse); } else { status = "Handling request"; if (getMeta ()) handleMeta (); else handleRequest (); } } /** Handle a meta page. */ public void handleMeta () { status = "Handling meta page"; MetaHandlerHandler mhh = new MetaHandlerHandler (); try { mhh.handleMeta (this, request, tlh.getProxy (), tlh.getClient ()); } catch (IOException ex) { logAndClose (null); } } /** A container to send around less parameters.*/ class RequestHandler { public ResourceSource content = null; public ByteBuffer webBuffer = null; public HttpHeader webHeader = null; public CacheEntry<HttpHeader, HttpHeader> entry = null; public HttpHeader dataHook = null; // the entrys datahook if any. public HandlerFactory handlerFactory = null; public long size = -1; public WebConnection wc = null; public long requestTime = -1; public ConditionalChecker cond = new ConditionalChecker (getLogger ()); public boolean conditional; } private void checkNoStore (CacheEntry<HttpHeader, HttpHeader> entry) { if (entry == null) return; for (String cc : request.getHeaders ("Cache-Control")) if (cc.equals ("no-store")) proxy.getCache ().remove (entry.getKey ()); } private boolean checkMaxAge (RequestHandler rh) { return rh.cond.checkMaxAge (this, rh.dataHook, rh); } /** Handle a request by getting the datastream (from the cache or the web). * After getting the handler for the mimetype, send it. */ public void handleRequest () { // TODO: move this method to separate thread, // TODO: it may block in many places. RequestHandler rh = new RequestHandler (); Cache<HttpHeader, HttpHeader> cache = proxy.getCache (); String method = request.getMethod (); if (!method.equals ("GET") && !method.equals ("HEAD")) cache.remove (request); rh.entry = cache.getEntry (request); if (rh.entry != null) rh.dataHook = rh.entry.getDataHook (proxy.getCache ()); checkNoStore (rh.entry); if (!rh.cond.checkMaxStale (request, rh) && checkMaxAge (rh)) setMayUseCache (false); rh.conditional = rh.cond.checkConditional (this, request, rh); if (partialContent (rh)) fillupContent (rh); checkIfRange (rh); boolean mc = getMayCache (); if (getMayUseCache ()) { // in cache? if (rh.entry != null) { CacheChecker cc = new CacheChecker (); if (cc.checkCachedEntry (this, request, rh)) { return; } } } if (rh.content == null) { // Ok cache did not have a usable resource, // so get the resource from the net. // reset value to one before we thought we could use cache... mayCache = mc; SWC swc = new SWC (this, proxy.getOffset (), request, requestBuffer, tlh, clientResourceHandler, rh); swc.establish (); } else { resourceEstablished (rh); } } void webConnectionSetupFailed (RequestHandler rh, Exception cause) { getLogger ().logWarn ("strange error setting up web connection: " + cause.toString ()); tryStaleEntry (rh, cause); } private void setMayCacheFromCC (RequestHandler rh) { HttpHeader resp = rh.webHeader; for (String val : resp.getHeaders ("Cache-Control")) { if ("public".equals (val) || "must-revalidate".equals (val) || val.startsWith ("s-maxage=")) { String auth = request.getHeader ("Authorization"); if (auth != null) { // TODO this ignores no-store and a few other things... mayCache = true; break; } } } } /** Check if we must tunnel a request. * Currently will only check if the Authorization starts with NTLM or Negotiate. * @param rh the request handler. */ protected boolean mustTunnel (RequestHandler rh) { String auth = request.getHeader ("Authorization"); if (auth != null) { if (auth.startsWith ("NTLM") || auth.startsWith ("Negotiate")) return true; } return false; } void webConnectionEstablished (RequestHandler rh) { getProxy ().markForPipelining (rh.wc); setMayCacheFromCC (rh); resourceEstablished (rh); } private void tunnel (RequestHandler rh) { try { TunnelDoneListener tdl = new TDL (rh); Tunnel tunnel = new Tunnel (selector, getLogger (), channel, requestBuffer, tlh.getClient (), rh.wc.getChannel (), rh.webBuffer, tlh.getNetwork (), tdl); } catch (IOException ex) { logAndClose (rh); } } private void resourceEstablished (RequestHandler rh) { try { // and now we filter the response header if any. if (!request.isDot9Request ()) { if (mustTunnel (rh)) { tunnel (rh); return; } String status = rh.webHeader.getStatusCode ().trim (); rh.cond.checkStaleCache (request, this, rh); CacheChecker cc = new CacheChecker (); cc.removeOtherStaleCaches (request, rh.webHeader, proxy.getCache (), getLogger ()); if (status.equals ("304")) { NotModifiedHandler nmh = new NotModifiedHandler (); nmh.updateHeader (rh, getLogger ()); if (rh.entry != null) { proxy.getCache ().entryChanged (rh.entry, request, rh.dataHook); } } HttpHeader bad = cc.checkExpectations (this, request, rh.webHeader); if (bad == null) { HttpHeaderFilterer filterer = proxy.getHttpHeaderFilterer (); bad = filterer.filterHttpOut (this, channel, rh.webHeader); } if (bad != null) { rh.content.release (this); sendAndClose (bad); return; } rh.entry = proxy.getCache ().getEntry (request); if (rh.conditional && rh.entry != null && status.equals ("304")) { if (handleConditional (rh)) { return; } } else if (status != null && status.length () > 0) { if (status.equals ("304") || status.equals ("204") || status.charAt (0) == '1') { rh.content.release (this); sendAndClose (rh.webHeader); return; } } } setHandlerFactory (rh); Handler handler = rh.handlerFactory.getNewInstance (this, tlh, request, requestBuffer, rh.webHeader, rh.content, getMayCache (), getMayFilter (), rh.size); if (handler == null) { doError (500, "Something fishy with that handler...."); } else { finalFixesOnWebHeader (rh, handler); // HTTP/0.9 does not support HEAD, so webheader should be valid. if (request.isHeadOnlyRequest ()) { rh.content.release (this); sendAndRestart (rh.webHeader); } else { handler.handle (); } } } catch (Throwable t) { handleInternalError (t); } } private void finalFixesOnWebHeader (RequestHandler rh, Handler handler) { if (chunk) { if (rh.size < 0 || handler.changesContentSize ()) { rh.webHeader.removeHeader ("Content-Length"); rh.webHeader.setHeader ("Transfer-Encoding", "chunked"); } else { setChunking (false); } } else { if (getKeepalive ()) { rh.webHeader.setHeader ("Proxy-Connection", "Keep-Alive"); rh.webHeader.setHeader ("Connection", "Keep-Alive"); } else { rh.webHeader.setHeader ("Proxy-Connection", "close"); rh.webHeader.setHeader ("Connection", "close"); } } } private void setHandlerFactory (RequestHandler rh) { if (rh.handlerFactory == null) { if (rh.webHeader != null) { String ct = rh.webHeader.getHeader ("Content-Type"); if (ct != null) { ct = ct.toLowerCase (); if (getMayFilter ()) rh.handlerFactory = proxy.getHandlerFactory (ct.toLowerCase ()); if (rh.handlerFactory == null && ct.startsWith ("multipart/byteranges")) rh.handlerFactory = new MultiPartHandler (); } } if (rh.handlerFactory == null) { // still null rh.handlerFactory = new BaseHandler (); // fallback... } } } private boolean handleConditional (RequestHandler rh) throws IOException { HttpHeader cachedHeader = rh.dataHook; proxy.releaseWebConnection (rh.wc); if (addedINM) request.removeHeader ("If-None-Match"); if (addedIMS) request.removeHeader ("If-Modified-Since"); if (checkWeakEtag (cachedHeader, rh.webHeader)) { NotModifiedHandler nmh = new NotModifiedHandler (); nmh.updateHeader (rh, getLogger ()); setMayCache (false); try { HttpHeader res304 = nmh.is304 (request, this, rh); if (res304 != null) { sendAndClose (res304); return true; } else { if (rh.content != null) rh.content.release (this); setupCachedEntry (rh); } } catch (IOException e) { getLogger ().logWarn ("Conditional request: IOException (" + request.getRequestURI () + ",: " + e); } } else { // retry... request.removeHeader ("If-None-Match"); proxy.getCache ().remove (request);
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?