⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 channelniosocket.java

📁 业界著名的tomcat服务器的最新6.0的源代码。
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
                sSocket = ssc.socket();
                sSocket.bind(iddr);
                port=i;
                break;
            } catch( IOException ex ) {
                if(log.isInfoEnabled())
                    log.info("Port busy " + i + " " + ex.toString());
                sSocket = null;
            }
        }

        if( sSocket==null ) {
            log.error("Can't find free port " + startPort + " " + maxPort );
            return;
        }
        if(log.isInfoEnabled())
            log.info("JK: ajp13 listening on " + getAddress() + ":" + port );

        selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);
        // If this is not the base port and we are the 'main' channleSocket and
        // SHM didn't already set the localId - we'll set the instance id
        if( "channelNioSocket".equals( name ) &&
            port != startPort &&
            (wEnv.getLocalId()==0) ) {
            wEnv.setLocalId(  port - startPort );
        }

        // XXX Reverse it -> this is a notification generator !!
        if( next==null && wEnv!=null ) {
            if( nextName!=null )
                setNext( wEnv.getHandler( nextName ) );
            if( next==null )
                next=wEnv.getHandler( "dispatch" );
            if( next==null )
                next=wEnv.getHandler( "request" );
        }
        JMXRequestNote =wEnv.getNoteId( WorkerEnv.ENDPOINT_NOTE, "requestNote");
        running = true;

        // Run a thread that will accept connections.
        // XXX Try to find a thread first - not sure how...
        if( this.domain != null ) {
            try {
                tpOName=new ObjectName(domain + ":type=ThreadPool,name=" + 
                                       getChannelName());

                Registry.getRegistry(null, null)
                    .registerComponent(tp, tpOName, null);

                rgOName = new ObjectName
                    (domain+":type=GlobalRequestProcessor,name=" + getChannelName());
                Registry.getRegistry(null, null)
                    .registerComponent(global, rgOName, null);
            } catch (Exception e) {
                log.error("Can't register threadpool" );
            }
        }

        tp.start();
        Poller pollAjp = new Poller();
        tp.runIt(pollAjp);
    }

    ObjectName tpOName;
    ObjectName rgOName;
    RequestGroupInfo global=new RequestGroupInfo();
    int JMXRequestNote;

    public void start() throws IOException{
        if( sSocket==null )
            init();
        resume();
    }

    public void stop() throws IOException {
        destroy();
    }

    public void registerRequest(Request req, MsgContext ep, int count) {
        if(this.domain != null) {
            try {
                RequestInfo rp=req.getRequestProcessor();
                rp.setGlobalProcessor(global);
                ObjectName roname = new ObjectName
                    (getDomain() + ":type=RequestProcessor,worker="+
                     getChannelName()+",name=JkRequest" +count);
                ep.setNote(JMXRequestNote, roname);
                        
                Registry.getRegistry(null, null).registerComponent( rp, roname, null);
            } catch( Exception ex ) {
                log.warn("Error registering request");
            }
        }
    }

    public void open(MsgContext ep) throws IOException {
    }

    
    public void close(MsgContext ep) throws IOException {
        Socket s=(Socket)ep.getNote( socketNote );
        SelectionKey key = s.getChannel().keyFor(selector);
        if(key != null) {
            key.cancel();
        }
        s.close();
    }

    public void destroy() throws IOException {
        running = false;
        try {
            /* If we disabled the channel return */
            if (port == 0)
                return;
            tp.shutdown();

            selector.wakeup().close();
            sSocket.close(); // XXX?
            
            if( tpOName != null )  {
                Registry.getRegistry(null, null).unregisterComponent(tpOName);
            }
            if( rgOName != null ) {
                Registry.getRegistry(null, null).unregisterComponent(rgOName);
            }
        } catch(Exception e) {
            log.info("Error shutting down the channel " + port + " " +
                    e.toString());
            if( log.isDebugEnabled() ) log.debug("Trace", e);
        }
    }

    public int send( Msg msg, MsgContext ep)
        throws IOException    {
        msg.end(); // Write the packet header
        byte buf[]=msg.getBuffer();
        int len=msg.getLen();
        
        if(log.isTraceEnabled() )
            log.trace("send() " + len + " " + buf[4] );

        OutputStream os=(OutputStream)ep.getNote( osNote );
        os.write( buf, 0, len );
        return len;
    }

    public int flush( Msg msg, MsgContext ep)
        throws IOException    {
        OutputStream os=(OutputStream)ep.getNote( osNote );
        os.flush();
        return 0;
    }

    public int receive( Msg msg, MsgContext ep )
        throws IOException    {
        if (log.isTraceEnabled()) {
            log.trace("receive() ");
        }

        byte buf[]=msg.getBuffer();
        int hlen=msg.getHeaderLength();
        
        // XXX If the length in the packet header doesn't agree with the
        // actual number of bytes read, it should probably return an error
        // value.  Also, callers of this method never use the length
        // returned -- should probably return true/false instead.

        int rd = this.read(ep, buf, 0, hlen );
        
        if(rd < 0) {
            // Most likely normal apache restart.
            // log.warn("Wrong message " + rd );
            return rd;
        }

        msg.processHeader();

        /* After processing the header we know the body
           length
        */
        int blen=msg.getLen();
        
        // XXX check if enough space - it's assert()-ed !!!
        
        int total_read = 0;
        
        total_read = this.read(ep, buf, hlen, blen);
        
        if ((total_read <= 0) && (blen > 0)) {
            log.warn("can't read body, waited #" + blen);
            return  -1;
        }
        
        if (total_read != blen) {
             log.warn( "incomplete read, waited #" + blen +
                        " got only " + total_read);
            return -2;
        }
        
        return total_read;
    }
    
    /**
     * Read N bytes from the InputStream, and ensure we got them all
     * Under heavy load we could experience many fragmented packets
     * just read Unix Network Programming to recall that a call to
     * read didn't ensure you got all the data you want
     *
     * from read() Linux manual
     *
     * On success, the number of bytes read is returned (zero indicates end
     * of file),and the file position is advanced by this number.
     * It is not an error if this number is smaller than the number of bytes
     * requested; this may happen for example because fewer bytes
     * are actually available right now (maybe because we were close to
     * end-of-file, or because we are reading from a pipe, or  from  a
     * terminal),  or  because  read()  was interrupted by a signal.
     * On error, -1 is returned, and errno is set appropriately. In this
     * case it is left unspecified whether the file position (if any) changes.
     *
     **/
    public int read( MsgContext ep, byte[] b, int offset, int len)
        throws IOException
    {
        InputStream is=(InputStream)ep.getNote( isNote );
        int pos = 0;
        int got;

        while(pos < len) {
            try {
                got = is.read(b, pos + offset, len - pos);
            } catch(ClosedChannelException sex) {
                if(pos > 0) {
                    log.info("Error reading data after "+pos+"bytes",sex);
                } else {
                    log.debug("Error reading data", sex);
                }
                got = -1;
            }
            if (log.isTraceEnabled()) {
                log.trace("read() " + b + " " + (b==null ? 0: b.length) + " " +
                          offset + " " + len + " = " + got );
            }

            // connection just closed by remote. 
            if (got <= 0) {
                // This happens periodically, as apache restarts
                // periodically.
                // It should be more gracefull ! - another feature for Ajp14
                // log.warn( "server has closed the current connection (-1)" );
                return -3;
            }

            pos += got;
        }
        return pos;
    }
    
    protected boolean running=true;
    
    /** Accept incoming connections, dispatch to the thread pool
     */
    void acceptConnections() {
        if( running ) {
            try{
                MsgContext ep=createMsgContext();
                ep.setSource(this);
                ep.setWorkerEnv( wEnv );
                this.accept(ep);

                if( !running ) return;
                
                // Since this is a long-running connection, we don't care
                // about the small GC
                SocketConnection ajpConn=
                    new SocketConnection( ep);
                ajpConn.register(ep);
            }catch(Exception ex) {
                if (running)
                    log.warn("Exception executing accept" ,ex);
            }
        }
    }


    // XXX This should become handleNotification
    public int invoke( Msg msg, MsgContext ep ) throws IOException {
        int type=ep.getType();

        switch( type ) {
        case JkHandler.HANDLE_RECEIVE_PACKET:
            if( log.isDebugEnabled()) log.debug("RECEIVE_PACKET ?? ");
            return receive( msg, ep );
        case JkHandler.HANDLE_SEND_PACKET:
            return send( msg, ep );
        case JkHandler.HANDLE_FLUSH:
            return flush( msg, ep );
        }

        if( log.isTraceEnabled() )
            log.trace("Call next " + type + " " + next);

        // Send notification
        if( nSupport!=null ) {
            Notification notif=(Notification)ep.getNote(notifNote);
            if( notif==null ) {
                notif=new Notification("channelNioSocket.message", ep, requestCount );
                ep.setNote( notifNote, notif);
            }
            nSupport.sendNotification(notif);
        }

        if( next != null ) {
            return next.invoke( msg, ep );
        } else {
            log.info("No next ");
        }

        return OK;
    }
    
    public boolean isSameAddress(MsgContext ep) {
        Socket s=(Socket)ep.getNote( socketNote );
        return isSameAddress( s.getLocalAddress(), s.getInetAddress());
    }
    
    public String getChannelName() {
        String encodedAddr = "";
        if (inet != null && !"0.0.0.0".equals(inet.getHostAddress())) {
            encodedAddr = getAddress();
            if (encodedAddr.startsWith("/"))
                encodedAddr = encodedAddr.substring(1);
            encodedAddr = URLEncoder.encode(encodedAddr) + "-";
        }
        return ("jk-" + encodedAddr + port);
    }
    
    /**
     * Return <code>true</code> if the specified client and server addresses
     * are the same.  This method works around a bug in the IBM 1.1.8 JVM on
     * Linux, where the address bytes are returned reversed in some
     * circumstances.
     *
     * @param server The server's InetAddress
     * @param client The client's InetAddress
     */
    public static boolean isSameAddress(InetAddress server, InetAddress client)
    {
        // Compare the byte array versions of the two addresses
        byte serverAddr[] = server.getAddress();
        byte clientAddr[] = client.getAddress();
        if (serverAddr.length != clientAddr.length)
            return (false);
        boolean match = true;
        for (int i = 0; i < serverAddr.length; i++) {
            if (serverAddr[i] != clientAddr[i]) {
                match = false;
                break;
            }
        }
        if (match)
            return (true);

        // Compare the reversed form of the two addresses
        for (int i = 0; i < serverAddr.length; i++) {
            if (serverAddr[i] != clientAddr[(serverAddr.length-1)-i])
                return (false);
        }
        return (true);
    }

    public void sendNewMessageNotification(Notification notification) {
        if( nSupport!= null )
            nSupport.sendNotification(notification);
    }

    private NotificationBroadcasterSupport nSupport= null;

    public void addNotificationListener(NotificationListener listener,
                                        NotificationFilter filter,
                                        Object handback)
            throws IllegalArgumentException
    {
        if( nSupport==null ) nSupport=new NotificationBroadcasterSupport();
        nSupport.addNotificationListener(listener, filter, handback);
    }

    public void removeNotificationListener(NotificationListener listener)
            throws ListenerNotFoundException
    {
        if( nSupport!=null)
            nSupport.removeNotificationListener(listener);
    }

    MBeanNotificationInfo notifInfo[]=new MBeanNotificationInfo[0];

    public void setNotificationInfo( MBeanNotificationInfo info[]) {
        this.notifInfo=info;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -