⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 nonblockingcoordinator.java

📁 业界著名的tomcat服务器的最新6.0的源代码。
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 */
package org.apache.catalina.tribes.group.interceptors;

import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelInterceptor;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.UniqueId;
import org.apache.catalina.tribes.group.AbsoluteOrder;
import org.apache.catalina.tribes.group.ChannelInterceptorBase;
import org.apache.catalina.tribes.group.InterceptorPayload;
import org.apache.catalina.tribes.io.ChannelData;
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.membership.MemberImpl;
import org.apache.catalina.tribes.membership.Membership;
import org.apache.catalina.tribes.util.Arrays;
import org.apache.catalina.tribes.util.UUIDGenerator;

/**
 * <p>Title: Auto merging leader election algorithm</p>
 *
 * <p>Description: Implementation of a simple coordinator algorithm that not only selects a coordinator,
 *    it also merges groups automatically when members are discovered that werent part of the 
 *    </p>
 * <p>This algorithm is non blocking meaning it allows for transactions while the coordination phase is going on
 * </p>
 * <p>This implementation is based on a home brewed algorithm that uses the AbsoluteOrder of a membership
 * to pass a token ring of the current membership.<br>
 * This is not the same as just using AbsoluteOrder! Consider the following scenario:<br>
 * Nodes, A,B,C,D,E on a network, in that priority. AbsoluteOrder will only work if all
 * nodes are receiving pings from all the other nodes. 
 * meaning, that node{i} receives pings from node{all}-node{i}<br>
 * but the following could happen if a multicast problem occurs.
 * A has members {B,C,D}<br>
 * B has members {A,C}<br>
 * C has members {D,E}<br>
 * D has members {A,B,C,E}<br>
 * E has members {A,C,D}<br>
 * Because the default Tribes membership implementation, relies on the multicast packets to 
 * arrive at all nodes correctly, there is nothing guaranteeing that it will.<br>
 * <br>
 * To best explain how this algorithm works, lets take the above example:
 * For simplicity we assume that a send operation is O(1) for all nodes, although this algorithm will work
 * where messages overlap, as they all depend on absolute order<br>
 * Scenario 1: A,B,C,D,E all come online at the same time
 * Eval phase, A thinks of itself as leader, B thinks of A as leader,
 * C thinks of itself as leader, D,E think of A as leader<br>
 * Token phase:<br>
 * (1) A sends out a message X{A-ldr, A-src, mbrs-A,B,C,D} to B where X is the id for the message(and the view)<br>
 * (1) C sends out a message Y{C-ldr, C-src, mbrs-C,D,E} to D where Y is the id for the message(and the view)<br>
 * (2) B receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D} to C <br>
 * (2) D receives Y{C-ldr, C-src, mbrs-C,D,E} D is aware of A,B, sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to E<br>
 * (3) C receives X{A-ldr, A-src, mbrs-A,B,C,D}, sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to D<br>
 * (3) E receives Y{A-ldr, C-src, mbrs-A,B,C,D,E} sends Y{A-ldr, C-src, mbrs-A,B,C,D,E} to A<br>
 * (4) D receives X{A-ldr, A-src, mbrs-A,B,C,D,E} sends sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to A<br>
 * (4) A receives Y{A-ldr, C-src, mbrs-A,B,C,D,E}, holds the message, add E to its list of members<br>
 * (5) A receives X{A-ldr, A-src, mbrs-A,B,C,D,E} <br>
 * At this point, the state looks like<br>
 * A - {A-ldr, mbrs-A,B,C,D,E, id=X}<br>
 * B - {A-ldr, mbrs-A,B,C,D, id=X}<br>
 * C - {A-ldr, mbrs-A,B,C,D,E, id=X}<br>
 * D - {A-ldr, mbrs-A,B,C,D,E, id=X}<br>
 * E - {A-ldr, mbrs-A,B,C,D,E, id=Y}<br>
 * <br>
 * A message doesn't stop until it reaches its original sender, unless its dropped by a higher leader.
 * As you can see, E still thinks the viewId=Y, which is not correct. But at this point we have 
 * arrived at the same membership and all nodes are informed of each other.<br>
 * To synchronize the rest we simply perform the following check at A when A receives X:<br>
 * Original X{A-ldr, A-src, mbrs-A,B,C,D} == Arrived X{A-ldr, A-src, mbrs-A,B,C,D,E}<br>
 * Since the condition is false, A, will resend the token, and A sends X{A-ldr, A-src, mbrs-A,B,C,D,E} to B
 * When A receives X again, the token is complete. <br>
 * Optionally, A can send a message X{A-ldr, A-src, mbrs-A,B,C,D,E confirmed} to A,B,C,D,E who then
 * install and accept the view.
 * </p>
 * <p>
 * Lets assume that C1 arrives, C1 has lower priority than C, but higher priority than D.<br>
 * Lets also assume that C1 sees the following view {B,D,E}<br>
 * C1 waits for a token to arrive. When the token arrives, the same scenario as above will happen.<br>
 * In the scenario where C1 sees {D,E} and A,B,C can not see C1, no token will ever arrive.<br>
 * In this case, C1 sends a Z{C1-ldr, C1-src, mbrs-C1,D,E} to D<br>
 * D receives Z{C1-ldr, C1-src, mbrs-C1,D,E} and sends Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} to E<br>
 * E receives Z{A-ldr, C1-src, mbrs-A,B,C,C1,D,E} and sends it to A<br>
 * A sends Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E} to B and the chain continues until A receives the token again.
 * At that time A optionally sends out Z{A-ldr, A-src, mbrs-A,B,C,C1,D,E, confirmed} to A,B,C,C1,D,E
 * </p>
 * <p>To ensure that the view gets implemented at all nodes at the same time, 
 *    A will send out a VIEW_CONF message, this is the 'confirmed' message that is optional above.
 * <p>Ideally, the interceptor below this one would be the TcpFailureDetector to ensure correct memberships</p>
 *
 * <p>The example above, of course can be simplified with a finite statemachine:<br>
 * But I suck at writing state machines, my head gets all confused. One day I will document this algorithm though.<br>
 * Maybe I'll do a state diagram :)
 * </p>
 * <h2>State Diagrams</h2>
 * <a href="http://people.apache.org/~fhanik/tribes/docs/leader-election-initiate-election.jpg">Initiate an election</a><br><br>
 * <a href="http://people.apache.org/~fhanik/tribes/docs/leader-election-message-arrives.jpg">Receive an election message</a><br><br>
 * 
 * @author Filip Hanik
 * @version 1.0
 * 
 * 
 * 
 */
public class NonBlockingCoordinator extends ChannelInterceptorBase {
    
    /**
     * header for a coordination message
     */
    protected static final byte[] COORD_HEADER = new byte[] {-86, 38, -34, -29, -98, 90, 65, 63, -81, -122, -6, -110, 99, -54, 13, 63};
    /**
     * Coordination request
     */
    protected static final byte[] COORD_REQUEST = new byte[] {104, -95, -92, -42, 114, -36, 71, -19, -79, 20, 122, 101, -1, -48, -49, 30};
    /**
     * Coordination confirmation, for blocking installations
     */
    protected static final byte[] COORD_CONF = new byte[] {67, 88, 107, -86, 69, 23, 76, -70, -91, -23, -87, -25, -125, 86, 75, 20};
    
    /**
     * Alive message
     */
    protected static final byte[] COORD_ALIVE = new byte[] {79, -121, -25, -15, -59, 5, 64, 94, -77, 113, -119, -88, 52, 114, -56, -46,
                                                            -18, 102, 10, 34, -127, -9, 71, 115, -70, 72, -101, 88, 72, -124, 127, 111,
                                                            74, 76, -116, 50, 111, 103, 65, 3, -77, 51, -35, 0, 119, 117, 9, -26,
                                                            119, 50, -75, -105, -102, 36, 79, 37, -68, -84, -123, 15, -22, -109, 106, -55};
    /**
     * Time to wait for coordination timeout
     */
    protected long waitForCoordMsgTimeout = 15000;
    /**
     * Our current view
     */
    protected Membership view = null;
    /**
     * Out current viewId
     */
    protected UniqueId viewId;

    /**
     * Our nonblocking membership
     */
    protected Membership membership = null;
    
    /**
     * indicates that we are running an election 
     * and this is the one we are running
     */
    protected UniqueId suggestedviewId;
    protected Membership suggestedView;
    
    protected boolean started = false;
    protected final int startsvc = 0xFFFF;
    
    protected Object electionMutex = new Object();
    
    protected AtomicBoolean coordMsgReceived = new AtomicBoolean(false);
    
    public NonBlockingCoordinator() {
        super();
    }
    
//============================================================================================================    
//              COORDINATION HANDLING
//============================================================================================================
    
    public void startElection(boolean force) throws ChannelException {
        synchronized (electionMutex) {
            MemberImpl local = (MemberImpl)getLocalMember(false);
            MemberImpl[] others = (MemberImpl[])membership.getMembers();
            fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_START_ELECT,this,"Election initated"));
            if ( others.length == 0 ) {
                this.viewId = new UniqueId(UUIDGenerator.randomUUID(false));
                this.view = new Membership(local,AbsoluteOrder.comp, true);
                this.handleViewConf(this.createElectionMsg(local,others,local),local,view);
                return; //the only member, no need for an election
            }
            if ( suggestedviewId != null ) {
                
                if ( view != null && Arrays.diff(view,suggestedView,local).length == 0 &&  Arrays.diff(suggestedView,view,local).length == 0) {
                    suggestedviewId = null;
                    suggestedView = null;
                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, running election matches view"));
                } else {
                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, election running"));
                }
                return; //election already running, I'm not allowed to have two of them
            }
            if ( view != null && Arrays.diff(view,membership,local).length == 0 &&  Arrays.diff(membership,view,local).length == 0) {
                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, view matches membership"));
                return; //already have this view installed
            }            
            int prio = AbsoluteOrder.comp.compare(local,others[0]);
            MemberImpl leader = ( prio < 0 )?local:others[0];//am I the leader in my view?
            if ( local.equals(leader) || force ) {
                CoordinationMessage msg = createElectionMsg(local, others, leader);
                suggestedviewId = msg.getId();
                suggestedView = new Membership(local,AbsoluteOrder.comp,true);
                Arrays.fill(suggestedView,msg.getMembers());
                fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_PROCESS_ELECT,this,"Election, sending request"));
                sendElectionMsg(local,others[0],msg);
            } else {
                try {
                    coordMsgReceived.set(false);
                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting for request"));
                    electionMutex.wait(waitForCoordMsgTimeout);
                }catch ( InterruptedException x ) {
                    Thread.currentThread().interrupted();
                }
                if ( suggestedviewId == null && (!coordMsgReceived.get())) {
                    //no message arrived, send the coord msg
//                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_WAIT_FOR_MSG,this,"Election, waiting timed out."));
//                    startElection(true);
                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, waiting timed out."));
                } else {
                    fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_ELECT_ABANDONED,this,"Election abandoned, received a message"));
                }
            }//end if
            
        }
    }

    private CoordinationMessage createElectionMsg(MemberImpl local, MemberImpl[] others, MemberImpl leader) {
        Membership m = new Membership(local,AbsoluteOrder.comp,true);
        Arrays.fill(m,others);
        MemberImpl[] mbrs = m.getMembers();
        m.reset(); 
        CoordinationMessage msg = new CoordinationMessage(leader, local, mbrs,new UniqueId(UUIDGenerator.randomUUID(true)), this.COORD_REQUEST);
        return msg;
    }

    protected void sendElectionMsg(MemberImpl local, MemberImpl next, CoordinationMessage msg) throws ChannelException {
        fireInterceptorEvent(new CoordinationEvent(CoordinationEvent.EVT_SEND_MSG,this,"Sending election message to("+next.getName()+")"));
        super.sendMessage(new Member[] {next}, createData(msg, local), null);
    }
    
    protected void sendElectionMsgToNextInline(MemberImpl local, CoordinationMessage msg) throws ChannelException { 
        int next = Arrays.nextIndex(local,msg.getMembers());
        int current = next;
        msg.leader = msg.getMembers()[0];
        boolean sent =  false;
        while ( !sent && current >= 0 ) {
            try {
                sendElectionMsg(local, (MemberImpl) msg.getMembers()[current], msg);
                sent = true;
            }catch ( ChannelException x  ) {
                log.warn("Unable to send election message to:"+msg.getMembers()[current]);
                current = Arrays.nextIndex(msg.getMembers()[current],msg.getMembers());
                if ( current == next ) throw x;
            }
        }
    }
    
    public Member getNextInLine(MemberImpl local, MemberImpl[] others) {
        MemberImpl result = null;
        for ( int i=0; i<others.length; i++ ) {
            
        }
        return result;
    }
    
    public ChannelData createData(CoordinationMessage msg, MemberImpl local) {
        msg.write();
        ChannelData data = new ChannelData(true);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -