📄 channelsocket.java
字号:
if( serverTimeout > 0 ) sSocket.setSoTimeout( serverTimeout ); if( next==null ) { if( nextName!=null ) setNext( wEnv.getHandler( nextName ) ); if( next==null ) next=wEnv.getHandler( "dispatch" ); if( next==null ) next=wEnv.getHandler( "request" ); } running = true; // Run a thread that will accept connections. tp.start(); SocketAcceptor acceptAjp=new SocketAcceptor( this ); tp.runIt( acceptAjp); } public void open(MsgContext ep) throws IOException { } public void close(MsgContext ep) throws IOException { Socket s=(Socket)ep.getNote( socketNote ); s.close(); } public void destroy() throws IOException { running = false; try { /* If we disabled the channel return */ if (port == 0) return; tp.shutdown(); // Need to create a connection to unlock the accept(); Socket s; if (inet == null) { s=new Socket("127.0.0.1", port ); }else{ s=new Socket(inet, port ); // setting soLinger to a small value will help shutdown the // connection quicker s.setSoLinger(true, 0); } s.close(); sSocket.close(); // XXX? } catch(Exception e) { e.printStackTrace(); } } public int send( Msg msg, MsgContext ep) throws IOException { msg.end(); // Write the packet header byte buf[]=msg.getBuffer(); int len=msg.getLen(); if(log.isTraceEnabled() ) log.trace("send() " + len + " " + buf[4] ); OutputStream os=(OutputStream)ep.getNote( osNote ); os.write( buf, 0, len ); return len; } public int flush( Msg msg, MsgContext ep) throws IOException { if( BUFFER_WRITE ) { OutputStream os=(OutputStream)ep.getNote( osNote ); os.flush(); } return 0; } public int receive( Msg msg, MsgContext ep ) throws IOException { if (log.isDebugEnabled()) { log.debug("receive() "); } byte buf[]=msg.getBuffer(); int hlen=msg.getHeaderLength(); // XXX If the length in the packet header doesn't agree with the // actual number of bytes read, it should probably return an error // value. Also, callers of this method never use the length // returned -- should probably return true/false instead. int rd = this.read(ep, buf, 0, hlen ); if(rd < 0) { // Most likely normal apache restart. // log.warn("Wrong message " + rd ); return rd; } msg.processHeader(); /* After processing the header we know the body length */ int blen=msg.getLen(); // XXX check if enough space - it's assert()-ed !!! int total_read = 0; total_read = this.read(ep, buf, hlen, blen); if ((total_read <= 0) && (blen > 0)) { log.warn("can't read body, waited #" + blen); return -1; } if (total_read != blen) { log.warn( "incomplete read, waited #" + blen + " got only " + total_read); return -2; } return total_read; } /** * Read N bytes from the InputStream, and ensure we got them all * Under heavy load we could experience many fragmented packets * just read Unix Network Programming to recall that a call to * read didn't ensure you got all the data you want * * from read() Linux manual * * On success, the number of bytes read is returned (zero indicates end * of file),and the file position is advanced by this number. * It is not an error if this number is smaller than the number of bytes * requested; this may happen for example because fewer bytes * are actually available right now (maybe because we were close to * end-of-file, or because we are reading from a pipe, or from a * terminal), or because read() was interrupted by a signal. * On error, -1 is returned, and errno is set appropriately. In this * case it is left unspecified whether the file position (if any) changes. * **/ public int read( MsgContext ep, byte[] b, int offset, int len) throws IOException { InputStream is=(InputStream)ep.getNote( isNote ); int pos = 0; int got; while(pos < len) { got = is.read(b, pos + offset, len - pos); if (log.isTraceEnabled()) { log.trace("read() " + b + " " + (b==null ? 0: b.length) + " " + offset + " " + len + " = " + got ); } // connection just closed by remote. if (got <= 0) { // This happens periodically, as apache restarts // periodically. // It should be more gracefull ! - another feature for Ajp14 // log.warn( "server has closed the current connection (-1)" ); return -3; } pos += got; } return pos; } protected boolean running=true; /** Accept incoming connections, dispatch to the thread pool */ void acceptConnections() { if( log.isDebugEnabled() ) log.debug("Accepting ajp connections on " + port); while( running ) { try { MsgContext ep=new MsgContext(); ep.setSource(this); ep.setWorkerEnv( wEnv ); this.accept(ep); if( !running ) break; SocketConnection ajpConn= new SocketConnection(this, ep); tp.runIt( ajpConn ); } catch( Exception ex ) { if (running) ex.printStackTrace(); } } } /** Process a single ajp connection. */ void processConnection(MsgContext ep) { try { MsgAjp recv=new MsgAjp(); while( running ) { int status= this.receive( recv, ep ); if( status <= 0 ) { if( status==-3) log.info( "server has been restarted or reset this connection" ); else log.warn("Closing ajp connection " + status ); break; } ep.setLong( MsgContext.TIMER_RECEIVED, System.currentTimeMillis()); ep.setType( 0 ); status= this.invoke( recv, ep ); if( status!= JkHandler.OK ) { log.warn("processCallbacks status " + status ); break; } } } catch( Exception ex ) { if( ex.getMessage().indexOf( "Connection reset" ) >= 0) log.debug( "Server has been restarted or reset this connection"); else if (ex.getMessage().indexOf( "Read timed out" ) >=0 ) log.info( "connection timeout reached"); else log.error( "Error, processing connection", ex); } finally { /* * Whatever happened to this connection (remote closed it, timeout, read error) * the socket SHOULD be closed, or we may be in situation where the webserver * will continue to think the socket is still open and will forward request * to tomcat without receiving ever a reply */ try { this.close( ep ); } catch( Exception e) { log.error( "Error, closing connection", e); } } } public int invoke( Msg msg, MsgContext ep ) throws IOException { int type=ep.getType(); switch( type ) { case JkHandler.HANDLE_RECEIVE_PACKET: return receive( msg, ep ); case JkHandler.HANDLE_SEND_PACKET: return send( msg, ep ); case JkHandler.HANDLE_FLUSH: return flush( msg, ep ); } return next.invoke( msg, ep ); } public boolean isSameAddress(MsgContext ep) { Socket s=(Socket)ep.getNote( socketNote ); return isSameAddress( s.getLocalAddress(), s.getInetAddress()); } /** * Return <code>true</code> if the specified client and server addresses * are the same. This method works around a bug in the IBM 1.1.8 JVM on * Linux, where the address bytes are returned reversed in some * circumstances. * * @param server The server's InetAddress * @param client The client's InetAddress */ public static boolean isSameAddress(InetAddress server, InetAddress client) { // Compare the byte array versions of the two addresses byte serverAddr[] = server.getAddress(); byte clientAddr[] = client.getAddress(); if (serverAddr.length != clientAddr.length) return (false); boolean match = true; for (int i = 0; i < serverAddr.length; i++) { if (serverAddr[i] != clientAddr[i]) { match = false; break; } } if (match) return (true); // Compare the reversed form of the two addresses for (int i = 0; i < serverAddr.length; i++) { if (serverAddr[i] != clientAddr[(serverAddr.length-1)-i]) return (false); } return (true); }}class SocketAcceptor implements ThreadPoolRunnable { ChannelSocket wajp; SocketAcceptor(ChannelSocket wajp ) { this.wajp=wajp; } public Object[] getInitData() { return null; } public void runIt(Object thD[]) { wajp.acceptConnections(); }}class SocketConnection implements ThreadPoolRunnable { ChannelSocket wajp; MsgContext ep; SocketConnection(ChannelSocket wajp, MsgContext ep) { this.wajp=wajp; this.ep=ep; } public Object[] getInitData() { return null; } public void runIt(Object perTh[]) { wajp.processConnection(ep); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -