⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nonblockingcoordinator.java

📁 业界著名的tomcat服务器的最新6.0的源代码。
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
            MemberImpl local = (MemberImpl)getLocalMember(false);
            if ( view != null && (Arrays.diff(view,membership,local).length != 0 ||  Arrays.diff(membership,view,local).length != 0) ) {
                if ( isHighest() ) {
                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT, this,
                                                               "Heartbeat found inconsistency, restart election"));
                    startElection(true);
                }            
            }
        } catch ( Exception x  ){
            log.error("Unable to perform heartbeat.",x);
        } finally {
            super.heartbeat();
        }
    }

    /**
     * has members
     */
    public boolean hasMembers() {
        
        return membership.hasMembers();
    }

    /**
     * Get all current cluster members
     * @return all members or empty array
     */
    public Member[] getMembers() {
        
        return membership.getMembers();
    }

    /**
     *
     * @param mbr Member
     * @return Member
     */
    public Member getMember(Member mbr) {
        
        return membership.getMember(mbr);
    }

    /**
     * Return the member that represents this node.
     *
     * @return Member
     */
    public Member getLocalMember(boolean incAlive) {
        Member local = super.getLocalMember(incAlive);
        if ( view == null && (local != null)) setupMembership();
        return local;
    }
    
    protected synchronized void setupMembership() {
        if ( membership == null ) {
            membership  = new Membership((MemberImpl)super.getLocalMember(true),AbsoluteOrder.comp,false);
        }
    }
    
    
//============================================================================================================    
//              HELPER CLASSES FOR COORDINATION
//============================================================================================================
    
    
   
    
    public static class CoordinationMessage {
        //X{A-ldr, A-src, mbrs-A,B,C,D}
        protected XByteBuffer buf;
        protected MemberImpl leader;
        protected MemberImpl source;
        protected MemberImpl[] view;
        protected UniqueId id;
        protected byte[] type;
        protected long timestamp = System.currentTimeMillis();
        
        public CoordinationMessage(XByteBuffer buf) {
            this.buf = buf;
            parse();
        }

        public CoordinationMessage(MemberImpl leader,
                                   MemberImpl source, 
                                   MemberImpl[] view,
                                   UniqueId id,
                                   byte[] type) {
            this.buf = new XByteBuffer(4096,false);
            this.leader = leader;
            this.source = source;
            this.view = view;
            this.id = id;
            this.type = type;
            this.write();
        }
        

        public byte[] getHeader() {
            return NonBlockingCoordinator.COORD_HEADER;
        }
        
        public MemberImpl getLeader() {
            if ( leader == null ) parse();
            return leader;
        }
        
        public MemberImpl getSource() {
            if ( source == null ) parse();
            return source;
        }
        
        public UniqueId getId() {
            if ( id == null ) parse();
            return id;
        }
        
        public MemberImpl[] getMembers() {
            if ( view == null ) parse();
            return view;
        }
        
        public byte[] getType() {
            if (type == null ) parse();
            return type;
        }
        
        public XByteBuffer getBuffer() {
            return this.buf;
        }
        
        public void parse() {
            //header
            int offset = 16;
            //leader
            int ldrLen = buf.toInt(buf.getBytesDirect(),offset);
            offset += 4;
            byte[] ldr = new byte[ldrLen];
            System.arraycopy(buf.getBytesDirect(),offset,ldr,0,ldrLen);
            leader = MemberImpl.getMember(ldr);
            offset += ldrLen;
            //source
            int srcLen = buf.toInt(buf.getBytesDirect(),offset);
            offset += 4;
            byte[] src = new byte[srcLen];
            System.arraycopy(buf.getBytesDirect(),offset,src,0,srcLen);
            source = MemberImpl.getMember(src);
            offset += srcLen;
            //view
            int mbrCount = buf.toInt(buf.getBytesDirect(),offset);
            offset += 4;
            view = new MemberImpl[mbrCount];
            for (int i=0; i<view.length; i++ ) {
                int mbrLen = buf.toInt(buf.getBytesDirect(),offset);
                offset += 4;
                byte[] mbr = new byte[mbrLen];
                System.arraycopy(buf.getBytesDirect(), offset, mbr, 0, mbrLen);
                view[i] = MemberImpl.getMember(mbr);
                offset += mbrLen;
            }
            //id
            this.id = new UniqueId(buf.getBytesDirect(),offset,16);
            offset += 16;
            type = new byte[16];
            System.arraycopy(buf.getBytesDirect(), offset, type, 0, type.length);
            offset += 16;
            
        }
        
        public void write() {
            buf.reset();
            //header
            buf.append(COORD_HEADER,0,COORD_HEADER.length);
            //leader
            byte[] ldr = leader.getData(false,false);
            buf.append(ldr.length);
            buf.append(ldr,0,ldr.length);
            ldr = null;
            //source
            byte[] src = source.getData(false,false);
            buf.append(src.length);
            buf.append(src,0,src.length);
            src = null;
            //view
            buf.append(view.length);
            for (int i=0; i<view.length; i++ ) {
                byte[] mbr = view[i].getData(false,false);
                buf.append(mbr.length);
                buf.append(mbr,0,mbr.length);
            }
            //id
            buf.append(id.getBytes(),0,id.getBytes().length);
            buf.append(type,0,type.length);
        }
    }
    
    public void fireInterceptorEvent(InterceptorEvent event) {
        if (event instanceof CoordinationEvent &&
            ((CoordinationEvent)event).type == CoordinationEvent.EVT_CONF_RX) 
            log.info(event);
    }
    
    public static class CoordinationEvent implements InterceptorEvent {
        public static final int EVT_START = 1;
        public static final int EVT_MBR_ADD = 2;
        public static final int EVT_MBR_DEL = 3;
        public static final int EVT_START_ELECT = 4;
        public static final int EVT_PROCESS_ELECT = 5;
        public static final int EVT_MSG_ARRIVE = 6;
        public static final int EVT_PRE_MERGE = 7;
        public static final int EVT_POST_MERGE = 8;
        public static final int EVT_WAIT_FOR_MSG = 9;
        public static final int EVT_SEND_MSG = 10;
        public static final int EVT_STOP = 11;
        public static final int EVT_CONF_RX = 12;
        public static final int EVT_ELECT_ABANDONED = 13;
        
        int type;
        ChannelInterceptor interceptor;
        Member coord; 
        Member[] mbrs;
        String info;
        Membership view;
        Membership suggestedView;
        public CoordinationEvent(int type,ChannelInterceptor interceptor, String info) {
            this.type = type;
            this.interceptor = interceptor;
            this.coord = ((NonBlockingCoordinator)interceptor).getCoordinator();
            this.mbrs = ((NonBlockingCoordinator)interceptor).membership.getMembers();
            this.info = info;
            this.view = ((NonBlockingCoordinator)interceptor).view;
            this.suggestedView = ((NonBlockingCoordinator)interceptor).suggestedView;
        }
        
        public int getEventType() {
            return type;
        }
        
        public String getEventTypeDesc() {
            switch (type) {
                case  EVT_START: return "EVT_START:"+info;
                case  EVT_MBR_ADD: return "EVT_MBR_ADD:"+info;
                case  EVT_MBR_DEL: return "EVT_MBR_DEL:"+info;
                case  EVT_START_ELECT: return "EVT_START_ELECT:"+info;
                case  EVT_PROCESS_ELECT: return "EVT_PROCESS_ELECT:"+info;
                case  EVT_MSG_ARRIVE: return "EVT_MSG_ARRIVE:"+info;
                case  EVT_PRE_MERGE: return "EVT_PRE_MERGE:"+info;
                case  EVT_POST_MERGE: return "EVT_POST_MERGE:"+info;
                case  EVT_WAIT_FOR_MSG: return "EVT_WAIT_FOR_MSG:"+info;
                case  EVT_SEND_MSG: return "EVT_SEND_MSG:"+info;
                case  EVT_STOP: return "EVT_STOP:"+info;
                case  EVT_CONF_RX: return "EVT_CONF_RX:"+info;
                case EVT_ELECT_ABANDONED: return "EVT_ELECT_ABANDONED:"+info;
                default: return "Unknown";
            }
        }
        
        public ChannelInterceptor getInterceptor() {
            return interceptor;
        }
        
        public String toString() {
            StringBuffer buf = new StringBuffer("CoordinationEvent[type=");
            buf.append(type).append("\n\tLocal:");
            Member local = interceptor.getLocalMember(false);
            buf.append(local!=null?local.getName():"").append("\n\tCoord:");
            buf.append(coord!=null?coord.getName():"").append("\n\tView:");
            buf.append(Arrays.toNameString(view!=null?view.getMembers():null)).append("\n\tSuggested View:");
            buf.append(Arrays.toNameString(suggestedView!=null?suggestedView.getMembers():null)).append("\n\tMembers:");
            buf.append(Arrays.toNameString(mbrs)).append("\n\tInfo:");
            buf.append(info).append("]");
            return buf.toString();
        }
    }

    



}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -