⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 channelsocket.java

📁 业界著名的tomcat服务器的最新6.0的源代码。
💻 JAVA
📖 第 1 页 / 共 2 页
字号:

    public void registerRequest(Request req, MsgContext ep, int count) {
        if(this.domain != null) {
            try {
                RequestInfo rp=req.getRequestProcessor();
                rp.setGlobalProcessor(global);
                ObjectName roname = new ObjectName
                    (getDomain() + ":type=RequestProcessor,worker="+
                     getChannelName()+",name=JkRequest" +count);
                ep.setNote(JMXRequestNote, roname);
                        
                Registry.getRegistry(null, null).registerComponent( rp, roname, null);
            } catch( Exception ex ) {
                log.warn("Error registering request");
            }
        }
    }

    public void open(MsgContext ep) throws IOException {
    }

    
    public void close(MsgContext ep) throws IOException {
        Socket s=(Socket)ep.getNote( socketNote );
        s.close();
    }

    private void unLockSocket() throws IOException {
        // Need to create a connection to unlock the accept();
        Socket s;
        InetAddress ladr = inet;

        if(port == 0)
            return;
        if (ladr == null || "0.0.0.0".equals(ladr.getHostAddress())) {
            ladr = InetAddress.getLocalHost();
        }
        s=new Socket(ladr, port );
        // setting soLinger to a small value will help shutdown the
        // connection quicker
        s.setSoLinger(true, 0);

	s.close();
    }

    public void destroy() throws IOException {
        running = false;
        try {
            /* If we disabled the channel return */
            if (port == 0)
                return;
            tp.shutdown();

	    if(!paused) {
		unLockSocket();
	    }

            sSocket.close(); // XXX?
            
            if( tpOName != null )  {
                Registry.getRegistry(null, null).unregisterComponent(tpOName);
            }
            if( rgOName != null ) {
                Registry.getRegistry(null, null).unregisterComponent(rgOName);
            }
        } catch(Exception e) {
            log.info("Error shutting down the channel " + port + " " +
                    e.toString());
            if( log.isDebugEnabled() ) log.debug("Trace", e);
        }
    }

    public int send( Msg msg, MsgContext ep)
        throws IOException    {
        msg.end(); // Write the packet header
        byte buf[]=msg.getBuffer();
        int len=msg.getLen();
        
        if(log.isTraceEnabled() )
            log.trace("send() " + len + " " + buf[4] );

        OutputStream os=(OutputStream)ep.getNote( osNote );
        os.write( buf, 0, len );
        return len;
    }

    public int flush( Msg msg, MsgContext ep)
        throws IOException    {
        if( bufferSize > 0 ) {
            OutputStream os=(OutputStream)ep.getNote( osNote );
            os.flush();
        }
        return 0;
    }

    public int receive( Msg msg, MsgContext ep )
        throws IOException    {
        if (log.isDebugEnabled()) {
            log.debug("receive() ");
        }

        byte buf[]=msg.getBuffer();
        int hlen=msg.getHeaderLength();
        
	// XXX If the length in the packet header doesn't agree with the
	// actual number of bytes read, it should probably return an error
	// value.  Also, callers of this method never use the length
	// returned -- should probably return true/false instead.

        int rd = this.read(ep, buf, 0, hlen );
        
        if(rd < 0) {
            // Most likely normal apache restart.
            // log.warn("Wrong message " + rd );
            return rd;
        }

        msg.processHeader();

        /* After processing the header we know the body
           length
        */
        int blen=msg.getLen();
        
	// XXX check if enough space - it's assert()-ed !!!
        
 	int total_read = 0;
        
        total_read = this.read(ep, buf, hlen, blen);
        
        if ((total_read <= 0) && (blen > 0)) {
            log.warn("can't read body, waited #" + blen);
            return  -1;
        }
        
        if (total_read != blen) {
             log.warn( "incomplete read, waited #" + blen +
                        " got only " + total_read);
            return -2;
        }
        
	return total_read;
    }
    
    /**
     * Read N bytes from the InputStream, and ensure we got them all
     * Under heavy load we could experience many fragmented packets
     * just read Unix Network Programming to recall that a call to
     * read didn't ensure you got all the data you want
     *
     * from read() Linux manual
     *
     * On success, the number of bytes read is returned (zero indicates end
     * of file),and the file position is advanced by this number.
     * It is not an error if this number is smaller than the number of bytes
     * requested; this may happen for example because fewer bytes
     * are actually available right now (maybe because we were close to
     * end-of-file, or because we are reading from a pipe, or  from  a
     * terminal),  or  because  read()  was interrupted by a signal.
     * On error, -1 is returned, and errno is set appropriately. In this
     * case it is left unspecified whether the file position (if any) changes.
     *
     **/
    public int read( MsgContext ep, byte[] b, int offset, int len)
        throws IOException    {
        InputStream is=(InputStream)ep.getNote( isNote );
        int pos = 0;
        int got;

        while(pos < len) {
            try {
                got = is.read(b, pos + offset, len - pos);
            } catch(SocketException sex) {
                if(pos > 0) {
                    log.info("Error reading data after "+pos+"bytes",sex);
                } else {
                    log.debug("Error reading data", sex);
                }
                got = -1;
            }
            if (log.isTraceEnabled()) {
                log.trace("read() " + b + " " + (b==null ? 0: b.length) + " " +
                          offset + " " + len + " = " + got );
            }

            // connection just closed by remote. 
            if (got <= 0) {
                // This happens periodically, as apache restarts
                // periodically.
                // It should be more gracefull ! - another feature for Ajp14
                // log.warn( "server has closed the current connection (-1)" );
                return -3;
            }

            pos += got;
        }
        return pos;
    }
    
    protected boolean running=true;
    
    /** Accept incoming connections, dispatch to the thread pool
     */
    void acceptConnections() {
        if( log.isDebugEnabled() )
            log.debug("Accepting ajp connections on " + port);
        while( running ) {
	    try{
                MsgContext ep=createMsgContext(packetSize);
                ep.setSource(this);
                ep.setWorkerEnv( wEnv );
                this.accept(ep);

                if( !running ) break;
                
                // Since this is a long-running connection, we don't care
                // about the small GC
                SocketConnection ajpConn=
                    new SocketConnection(this, ep);
                tp.runIt( ajpConn );
	    }catch(Exception ex) {
                if (running)
                    log.warn("Exception executing accept" ,ex);
	    }
        }
    }

    /** Process a single ajp connection.
     */
    void processConnection(MsgContext ep) {
        try {
            MsgAjp recv=new MsgAjp(packetSize);
            while( running ) {
                if(paused) { // Drop the connection on pause
                    break;
                }
                int status= this.receive( recv, ep );
                if( status <= 0 ) {
                    if( status==-3)
                        log.debug( "server has been restarted or reset this connection" );
                    else 
                        log.warn("Closing ajp connection " + status );
                    break;
                }
                ep.setLong( MsgContext.TIMER_RECEIVED, System.currentTimeMillis());
                
                ep.setType( 0 );
                // Will call next
                status= this.invoke( recv, ep );
                if( status!= JkHandler.OK ) {
                    log.warn("processCallbacks status " + status );
                    break;
                }
            }
        } catch( Exception ex ) {
            String msg = ex.getMessage();
            if( msg != null && msg.indexOf( "Connection reset" ) >= 0)
                log.debug( "Server has been restarted or reset this connection");
            else if (msg != null && msg.indexOf( "Read timed out" ) >=0 )
                log.debug( "connection timeout reached");            
            else
                log.error( "Error, processing connection", ex);
        } finally {
	    	/*
	    	 * Whatever happened to this connection (remote closed it, timeout, read error)
	    	 * the socket SHOULD be closed, or we may be in situation where the webserver
	    	 * will continue to think the socket is still open and will forward request
	    	 * to tomcat without receiving ever a reply
	    	 */
            try {
                this.close( ep );
            }
            catch( Exception e) {
                log.error( "Error, closing connection", e);
            }
            try{
                Request req = (Request)ep.getRequest();
                if( req != null ) {
                    ObjectName roname = (ObjectName)ep.getNote(JMXRequestNote);
                    if( roname != null ) {
                        Registry.getRegistry(null, null).unregisterComponent(roname);
                    }
                    req.getRequestProcessor().setGlobalProcessor(null);
                }
            } catch( Exception ee) {
                log.error( "Error, releasing connection",ee);
            }
        }
    }

    // XXX This should become handleNotification
    public int invoke( Msg msg, MsgContext ep ) throws IOException {
        int type=ep.getType();

        switch( type ) {
        case JkHandler.HANDLE_RECEIVE_PACKET:
            if( log.isDebugEnabled()) log.debug("RECEIVE_PACKET ?? ");
            return receive( msg, ep );
        case JkHandler.HANDLE_SEND_PACKET:
            return send( msg, ep );
        case JkHandler.HANDLE_FLUSH:
            return flush( msg, ep );
        }

        if( log.isDebugEnabled() )
            log.debug("Call next " + type + " " + next);

        // Send notification
        if( nSupport!=null ) {
            Notification notif=(Notification)ep.getNote(notifNote);
            if( notif==null ) {
                notif=new Notification("channelSocket.message", ep, requestCount );
                ep.setNote( notifNote, notif);
            }
            nSupport.sendNotification(notif);
        }

        if( next != null ) {
            return next.invoke( msg, ep );
        } else {
            log.info("No next ");
        }

        return OK;
    }
    
    public boolean isSameAddress(MsgContext ep) {
        Socket s=(Socket)ep.getNote( socketNote );
        return isSameAddress( s.getLocalAddress(), s.getInetAddress());
    }
    
    public String getChannelName() {
        String encodedAddr = "";
        if (inet != null && !"0.0.0.0".equals(inet.getHostAddress())) {
            encodedAddr = getAddress();
            if (encodedAddr.startsWith("/"))
                encodedAddr = encodedAddr.substring(1);
	    encodedAddr = URLEncoder.encode(encodedAddr) + "-";
        }
        return ("jk-" + encodedAddr + port);
    }
    
    /**
     * Return <code>true</code> if the specified client and server addresses
     * are the same.  This method works around a bug in the IBM 1.1.8 JVM on
     * Linux, where the address bytes are returned reversed in some
     * circumstances.
     *
     * @param server The server's InetAddress
     * @param client The client's InetAddress
     */
    public static boolean isSameAddress(InetAddress server, InetAddress client)
    {
	// Compare the byte array versions of the two addresses
	byte serverAddr[] = server.getAddress();
	byte clientAddr[] = client.getAddress();
	if (serverAddr.length != clientAddr.length)
	    return (false);
	boolean match = true;
	for (int i = 0; i < serverAddr.length; i++) {
	    if (serverAddr[i] != clientAddr[i]) {
		match = false;
		break;
	    }
	}
	if (match)
	    return (true);

	// Compare the reversed form of the two addresses
	for (int i = 0; i < serverAddr.length; i++) {
	    if (serverAddr[i] != clientAddr[(serverAddr.length-1)-i])
		return (false);
	}
	return (true);
    }

    public void sendNewMessageNotification(Notification notification) {
        if( nSupport!= null )
            nSupport.sendNotification(notification);
    }

    private NotificationBroadcasterSupport nSupport= null;

    public void addNotificationListener(NotificationListener listener,
                                        NotificationFilter filter,
                                        Object handback)
            throws IllegalArgumentException
    {
        if( nSupport==null ) nSupport=new NotificationBroadcasterSupport();
        nSupport.addNotificationListener(listener, filter, handback);
    }

    public void removeNotificationListener(NotificationListener listener)
            throws ListenerNotFoundException
    {
        if( nSupport!=null)
            nSupport.removeNotificationListener(listener);
    }

    MBeanNotificationInfo notifInfo[]=new MBeanNotificationInfo[0];

    public void setNotificationInfo( MBeanNotificationInfo info[]) {
        this.notifInfo=info;
    }

    public MBeanNotificationInfo[] getNotificationInfo() {
        return notifInfo;
    }

    static class SocketAcceptor implements ThreadPoolRunnable {
	ChannelSocket wajp;
    
	SocketAcceptor(ChannelSocket wajp ) {
	    this.wajp=wajp;
	}
	
	public Object[] getInitData() {
	    return null;
	}
	
	public void runIt(Object thD[]) {
	    wajp.acceptConnections();
	}
    }

    static class SocketConnection implements ThreadPoolRunnable {
	ChannelSocket wajp;
	MsgContext ep;

	SocketConnection(ChannelSocket wajp, MsgContext ep) {
	    this.wajp=wajp;
	    this.ep=ep;
	}


	public Object[] getInitData() {
	    return null;
	}
	
	public void runIt(Object perTh[]) {
	    wajp.processConnection(ep);
	    ep = null;
	}
    }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -