⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nonblockingcoordinator.java

📁 业界著名的tomcat服务器的最新6.0的源代码。
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
        data.setAddress(local);
        data.setMessage(msg.getBuffer());
        data.setOptions(Channel.SEND_OPTIONS_USE_ACK);
        data.setTimestamp(System.currentTimeMillis());
        return data;
    }
    
    protected void viewChange(UniqueId viewId, Member[] view) {
        //invoke any listeners
    }
    
    protected boolean alive(Member mbr) {
        return TcpFailureDetector.memberAlive(mbr,
                                              COORD_ALIVE,
                                              false,
                                              false,
                                              waitForCoordMsgTimeout,
                                              waitForCoordMsgTimeout,
                                              getOptionFlag());
    }
    
    protected Membership mergeOnArrive(CoordinationMessage msg, Member sender) {
        fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PRE_MERGE,this,"Pre merge"));
        MemberImpl local = (MemberImpl)getLocalMember(false);
        Membership merged = new Membership(local,AbsoluteOrder.comp,true);
        Arrays.fill(merged,msg.getMembers());
        Arrays.fill(merged,getMembers());
        Member[] diff = Arrays.diff(merged,membership,local);
        for ( int i=0; i<diff.length; i++ ) {
            if (!alive(diff[i])) merged.removeMember((MemberImpl)diff[i]);
            else memberAdded(diff[i],false);
        }
        fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_POST_MERGE,this,"Post merge"));
        return merged;
    }
    
    protected void processCoordMessage(CoordinationMessage msg, Member sender) throws ChannelException {
        if ( !coordMsgReceived.get() ) {
            coordMsgReceived.set(true);
            synchronized (electionMutex) { electionMutex.notifyAll();}
        } 
        msg.timestamp = System.currentTimeMillis();
        Membership merged = mergeOnArrive(msg, sender);
        if (isViewConf(msg)) handleViewConf(msg, sender, merged);
        else handleToken(msg, sender, merged);
        ClassLoader loader;

    }
    
    protected void handleToken(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
        MemberImpl local = (MemberImpl)getLocalMember(false);
        if ( local.equals(msg.getSource()) ) {
            //my message msg.src=local
            handleMyToken(local, msg, sender,merged);
        } else {
            handleOtherToken(local, msg, sender,merged);
        }
    }
    
    protected void handleMyToken(MemberImpl local, CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
        if ( local.equals(msg.getLeader()) ) {
            //no leadership change
            if ( Arrays.sameMembers(msg.getMembers(),merged.getMembers()) ) {
                msg.type = COORD_CONF;
                super.sendMessage(Arrays.remove(msg.getMembers(),local),createData(msg,local),null);
                handleViewConf(msg,local,merged);
            } else {
                //membership change
                suggestedView = new Membership(local,AbsoluteOrder.comp,true);
                suggestedviewId = msg.getId();
                Arrays.fill(suggestedView,merged.getMembers());
                msg.view = (MemberImpl[])merged.getMembers();
                sendElectionMsgToNextInline(local,msg);
            }
        } else {
            //leadership change
            suggestedView = null;
            suggestedviewId = null;
            msg.view = (MemberImpl[])merged.getMembers();
            sendElectionMsgToNextInline(local,msg);
        }
    }
    
    protected void handleOtherToken(MemberImpl local, CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
        if ( local.equals(msg.getLeader()) ) {
            //I am the new leader
            //startElection(false);
        } else {
            msg.view = (MemberImpl[])merged.getMembers();
            sendElectionMsgToNextInline(local,msg);
        }
    }
    
    protected void handleViewConf(CoordinationMessage msg, Member sender,Membership merged) throws ChannelException {
        if ( viewId != null && msg.getId().equals(viewId) ) return;//we already have this view
        view = new Membership((MemberImpl)getLocalMember(false),AbsoluteOrder.comp,true);
        Arrays.fill(view,msg.getMembers());
        viewId = msg.getId();
        
        if ( viewId.equals(suggestedviewId) ) {
            suggestedView = null;
            suggestedviewId = null;
        }
        
        if (suggestedView != null && AbsoluteOrder.comp.compare(suggestedView.getMembers()[0],merged.getMembers()[0])<0 ) {
            suggestedView = null;
            suggestedviewId = null;
        }
        
        viewChange(viewId,view.getMembers());
        fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_CONF_RX,this,"Accepted View"));
        
        if ( suggestedviewId == null && hasHigherPriority(merged.getMembers(),membership.getMembers()) ) {
            startElection(false);
        }
    }
    
    protected boolean isViewConf(CoordinationMessage msg) {
        return Arrays.contains(msg.getType(),0,COORD_CONF,0,COORD_CONF.length);
    }
    
    protected boolean hasHigherPriority(Member[] complete, Member[] local) {
        if ( local == null || local.length == 0 ) return false;
        if ( complete == null || complete.length == 0 ) return true;
        AbsoluteOrder.absoluteOrder(complete);
        AbsoluteOrder.absoluteOrder(local);
        return (AbsoluteOrder.comp.compare(complete[0],local[0]) > 0);
        
    }

    
    /**
     * Returns coordinator if one is available
     * @return Member
     */
    public Member getCoordinator() {
        return (view != null && view.hasMembers()) ? view.getMembers()[0] : null;
    }
    
    public Member[] getView() {
        return (view != null && view.hasMembers()) ? view.getMembers() : new Member[0];
    }
    
    public UniqueId getViewId() {
        return viewId;
    }
    
    /**
    * Block in/out messages while a election is going on
    */
   protected void halt() {

   }

   /**
    * Release lock for in/out messages election is completed
    */
   protected void release() {

   }

   /**
    * Wait for an election to end
    */
   protected void waitForRelease() {

   }

    
//============================================================================================================    
//              OVERRIDDEN METHODS FROM CHANNEL INTERCEPTOR BASE    
//============================================================================================================
    public void start(int svc) throws ChannelException {
            if (membership == null) setupMembership();
            if (started)return;
            fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "Before start"));
            super.start(startsvc);
            started = true;
            if (view == null) view = new Membership( (MemberImpl)super.getLocalMember(true), AbsoluteOrder.comp, true);
            fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START, this, "After start"));
            startElection(false);
    }

    public void stop(int svc) throws ChannelException {
        try {
            halt();
            synchronized (electionMutex) {
                if (!started)return;
                started = false;
                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP, this, "Before stop"));
                super.stop(startsvc);
                this.view = null;
                this.viewId = null;
                this.suggestedView = null;
                this.suggestedviewId = null;
                this.membership.reset();
                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_STOP, this, "After stop"));
            }
        }finally {
            release();
        }
    }
    
    
    public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
        waitForRelease();
        super.sendMessage(destination, msg, payload);
    }

    public void messageReceived(ChannelMessage msg) {
        if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_ALIVE,0,COORD_ALIVE.length) ) {
            //ignore message, its an alive message
            fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE,this,"Alive Message"));

        } else if ( Arrays.contains(msg.getMessage().getBytesDirect(),0,COORD_HEADER,0,COORD_HEADER.length) ) {
            try {
                CoordinationMessage cmsg = new CoordinationMessage(msg.getMessage());
                Member[] cmbr = cmsg.getMembers();
                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MSG_ARRIVE,this,"Coord Msg Arrived("+Arrays.toNameString(cmbr)+")"));
                processCoordMessage(cmsg, msg.getAddress());
            }catch ( ChannelException x ) {
                log.error("Error processing coordination message. Could be fatal.",x);
            }
        } else {
            super.messageReceived(msg);
        }
    }

    public boolean accept(ChannelMessage msg) {
        return super.accept(msg);
    }

    public void memberAdded(Member member) {
        memberAdded(member,true);
    }

    public void memberAdded(Member member,boolean elect) {
        try {
            if ( membership == null ) setupMembership();
            if ( membership.memberAlive((MemberImpl)member) ) super.memberAdded(member);
            try {
                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_ADD,this,"Member add("+member.getName()+")"));
                if (started && elect) startElection(false);
            }catch ( ChannelException x ) {
                log.error("Unable to start election when member was added.",x);
            }
        }finally {
        }
        
    }

    public void memberDisappeared(Member member) {
        try {
            
            membership.removeMember((MemberImpl)member);
            super.memberDisappeared(member);
            try {
                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_MBR_DEL,this,"Member remove("+member.getName()+")"));
                if ( started && (isCoordinator() || isHighest()) ) 
                    startElection(true); //to do, if a member disappears, only the coordinator can start
            }catch ( ChannelException x ) {
                log.error("Unable to start election when member was removed.",x);
            }
        }finally {
        }
    }
    
    public boolean isHighest() {
        Member local = getLocalMember(false);
        if ( membership.getMembers().length == 0 ) return true;
        else return AbsoluteOrder.comp.compare(local,membership.getMembers()[0])<=0;
    }
    
    public boolean isCoordinator() {
        Member coord = getCoordinator();
        return coord != null && getLocalMember(false).equals(coord);
    }

    public void heartbeat() {
        try {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -