⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 gms.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
// $Id: GMS.java,v 1.16 2005/10/26 16:07:33 belaban Exp $package org.jgroups.protocols;import org.jgroups.*;import org.jgroups.blocks.GroupRequest;import org.jgroups.blocks.MethodCall;import org.jgroups.stack.Protocol;import org.jgroups.stack.RpcProtocol;import org.jgroups.util.Queue;import org.jgroups.util.QueueClosedException;import java.util.Hashtable;import java.util.Properties;import java.util.Vector;/** * Group membership protocol. Handles joins/leaves/crashes (suspicions) and emits new views * accordingly. Use VIEW_ENFORCER on top of this layer to make sure new members don't receive * any messages until they are members. *  * @author Bela Ban */public class GMS extends RpcProtocol implements Runnable {    private GmsImpl impl=null;    public Address local_addr=null;    public String group_addr=null;    public final Membership mbrs=new Membership();    public ViewId view_id=null;    public long ltime=0;    public long join_timeout=5000;    public long join_retry_timeout=2000;    private long flush_timeout=0;             // 0=wait forever until FLUSH completes    private long rebroadcast_timeout=0;       // 0=wait forever until REBROADCAST completes    private long view_change_timeout=10000;   // until all handleViewChange() RPCs have returned    public long leave_timeout=5000;    public final Object impl_mutex=new Object();     // synchronizes event entry into impl    public final Object view_mutex=new Object();     // synchronizes view installations    private Queue event_queue=new Queue();     // stores SUSPECT, MERGE events    private Thread evt_thread=null;    private final Object flush_mutex=new Object();    private FlushRsp flush_rsp=null;    private final Object rebroadcast_mutex=new Object();    private boolean rebroadcast_unstable_msgs=true;    private boolean print_local_addr=true;    boolean disable_initial_coord=false; // can the member become a coord on startup or not ?    private final Hashtable impls=new Hashtable();    static final String CLIENT="Client";    static final String COORD="Coordinator";    static final String PART="Participant";    public static final String name="GMS";    public GMS() {        initState();    }    public String getName() {        return name;    }    public Vector requiredDownServices() {        Vector retval=new Vector();        retval.addElement(new Integer(Event.FLUSH));        retval.addElement(new Integer(Event.FIND_INITIAL_MBRS));        return retval;    }    public void setImpl(GmsImpl new_impl) {        synchronized(impl_mutex) {            impl=new_impl;             if(log.isInfoEnabled()) log.info("changed role to " + new_impl.getClass().getName());        }    }    public void start() throws Exception {        super.start();        if(checkForViewEnforcer(up_prot) == false) {            if(warn) log.warn("I need protocol layer " +                    "VIEW_ENFORCER above me to discard messages sent to me while I'm " +                    "not yet a group member ! Otherwise, these messages will be delivered " +                    "to the application without checking...\n");        }        if(_corr != null)            _corr.setDeadlockDetection(true);        else            throw new Exception("GMS.start(): cannot set deadlock detection in corr, as it is null !");    }    public void becomeCoordinator() {        CoordGmsImpl tmp=(CoordGmsImpl)impls.get(COORD);        if(tmp == null) {            tmp=new CoordGmsImpl(this);            tmp.leaving=false;            tmp.received_last_view=false; // +++ ?            impls.put(COORD, tmp);        }        setImpl(tmp);    }    public void becomeParticipant() {        ParticipantGmsImpl tmp=(ParticipantGmsImpl)impls.get(PART);        if(tmp == null) {            tmp=new ParticipantGmsImpl(this);            tmp.leaving=false;            tmp.received_final_view=false;            impls.put(PART, tmp);        }        setImpl(tmp);    }    public void becomeClient() {        ClientGmsImpl tmp=(ClientGmsImpl)impls.get(CLIENT);        if(tmp == null) {            tmp=new ClientGmsImpl(this);            impls.put(CLIENT, tmp);        }        else            tmp.init();        setImpl(tmp);    }      boolean haveCoordinatorRole() {        return impl != null && impl instanceof CoordGmsImpl;    }    /**     * Computes the next view. Returns a copy that has <code>old_mbrs</code> and     * <code>suspected_mbrs</code> removed and <code>new_mbrs</code> added.     */    public View getNextView(Vector new_mbrs, Vector old_mbrs, Vector suspected_mbrs) {        Vector members;        long vid;        View v;        Membership tmp_mbrs;        Vector mbrs_to_remove=new Vector();        if(old_mbrs != null && old_mbrs.size() > 0)            for(int i=0; i < old_mbrs.size(); i++)                mbrs_to_remove.addElement(old_mbrs.elementAt(i));        if(suspected_mbrs != null && suspected_mbrs.size() > 0)            for(int i=0; i < suspected_mbrs.size(); i++)                if(!mbrs_to_remove.contains(suspected_mbrs.elementAt(i)))                    mbrs_to_remove.addElement(suspected_mbrs.elementAt(i));        synchronized(view_mutex) {            vid=Math.max(view_id.getId(), ltime) + 1;            ltime=vid;            tmp_mbrs=this.mbrs.copy();            tmp_mbrs.merge(new_mbrs, mbrs_to_remove);            members=(Vector)tmp_mbrs.getMembers().clone();            v=new View(local_addr, vid, members);            return v;        }    }    /**     * Return a copy of the current membership minus the suspected members: FLUSH request is not sent     * to suspected members (because they won't respond, and not to joining members either.     * It IS sent to leaving members (before they are allowed to leave).     */    Vector computeFlushDestination(Vector suspected_mbrs) {        Vector ret=mbrs.getMembers(); // *copy* of current membership        if(suspected_mbrs != null && suspected_mbrs.size() > 0)            for(int i=0; i < suspected_mbrs.size(); i++)                ret.removeElement(suspected_mbrs.elementAt(i));        return ret;    }    /**     * Compute the destination set to which to send a VIEW_CHANGE message. This is the current     * members + the leaving members (old_mbrs) + the joining members (new_mbrs) - the suspected     * members.     */    private Vector computeViewDestination(Vector new_mbrs, Vector suspected_mbrs) {        Vector ret=mbrs.getMembers(); // **copy* of current membership        Address mbr;        // add new members        if(new_mbrs != null) {            for(int i=0; i < new_mbrs.size(); i++) {                mbr=(Address)new_mbrs.elementAt(i);                if(!ret.contains(mbr))                    ret.addElement(new_mbrs.elementAt(i));            }        }        // old members are still in existing membership, don't need to add them explicitely        // remove suspected members        if(suspected_mbrs != null) {            for(int i=0; i < suspected_mbrs.size(); i++) {                mbr=(Address)suspected_mbrs.elementAt(i);                ret.removeElement(mbr);            }        }        return ret;    }    /**     * FLUSH protocol.     * Send to current mbrs - suspected_mbrs (not including new_mbrs, but including old_mbr)     * Send TMP_VIEW event down,     * this allows FLUSH/NAKACK to set membership correctly     */    public void flush(Vector flush_dest, Vector suspected_mbrs) {        Vector rebroadcast_msgs=new Vector();        if(suspected_mbrs == null)            suspected_mbrs=new Vector();        while(flush_dest.size() > 0) {            flush_rsp=null;            synchronized(flush_mutex) {                passDown(new Event(Event.FLUSH, flush_dest));  // send FLUSH to members in flush_dest                if(flush_rsp == null) {                    try {                        flush_mutex.wait(flush_timeout);                    }                    catch(Exception e) {                    }                }            }            if(flush_rsp == null) {                break;            }            if(rebroadcast_unstable_msgs && flush_rsp.unstable_msgs != null &&                    flush_rsp.unstable_msgs.size() > 0) {                Message m;                for(int i=0; i < flush_rsp.unstable_msgs.size(); i++) {                    m=(Message)flush_rsp.unstable_msgs.elementAt(i);                    // just add msg, NAKACK.RESEND will weed out duplicates based on                    // <sender:id> before re-broadcasting msgs                    rebroadcast_msgs.addElement(m);                }            }            if(flush_rsp.result == true)                break;            else {                if(flush_rsp.failed_mbrs != null) {                    for(int i=0; i < flush_rsp.failed_mbrs.size(); i++) {                        flush_dest.removeElement(flush_rsp.failed_mbrs.elementAt(i));                        suspected_mbrs.addElement(flush_rsp.failed_mbrs.elementAt(i));                    }                }            }        } // while         if(log.isInfoEnabled()) log.info("flushing completed.");        // Rebroadcast unstable messages        if(rebroadcast_unstable_msgs && rebroadcast_msgs.size() > 0) {                if(log.isInfoEnabled()) log.info("re-broadcasting unstable messages (" +                        rebroadcast_msgs.size() + ')');            // NAKACK layer will rebroadcast the msgs (using the same seqnos assigned earlier)            synchronized(rebroadcast_mutex) {                passDown(new Event(Event.REBROADCAST_MSGS, rebroadcast_msgs));                try {                    rebroadcast_mutex.wait(rebroadcast_timeout);                }                catch(Exception e) {                }            }             if(log.isInfoEnabled()) log.info("re-broadcasting messages completed");        }    }    /**     * Compute a new view, given the current view, the new members and the suspected/left     * members.  Run view update protocol to install a new view in all members (this involves     * casting the new view to all members). The targets for FLUSH and VIEW mcasts are     * computed as follows:<p>     * <pre>     * existing          leaving        suspected          joining     * <p/>     * 1. FLUSH         y                 y               n                 n     * 2. new_view      y                 n               n                 y     * 3. tmp_view      y                 y               n                 y     * (view_dest)     * </pre>     * <p/>     * <ol>     * <li>     * The FLUSH is only sent to the existing and leaving members (they are the only ones that might have     * old messages not yet seen by the group. The suspected members would not answer anyway (because they     * have failed) and the joining members have certainly no old messages.     * <li>     * The new view to be installed includes the existing members plus the joining ones and     * excludes the leaving and suspected members.     * <li>     * A temporary view is sent down the stack as an <em>event</em>. This allows the bottom layer     * (e.g. UDP or TCP) to determine the members to which to send a multicast message. Compared     * to the new view, leaving members are <em>included</em> since they have are waiting for a     * view in which they are not members any longer before they leave. So, if we did not set a     * temporary view, joining members would not receive the view (signalling that they have been     * joined successfully). The temporary view is essentially the current view plus the joining     * members (old members are still part of the current view).     * </ol>     */    public void castViewChange(Vector new_mbrs, Vector old_mbrs, Vector suspected_mbrs) {        View new_view, tmp_view;        ViewId new_vid;        Vector flush_dest=computeFlushDestination(suspected_mbrs);  // members to which FLUSH/VIEW is sent        Vector view_dest=computeViewDestination(new_mbrs, suspected_mbrs); // dest for view change        // next view: current mbrs + new_mbrs - old_mbrs - suspected_mbrs        new_view=getNextView(new_mbrs, old_mbrs, suspected_mbrs);        new_vid=new_view.getVid();        if(log.isInfoEnabled()) log.info("FLUSH phase, flush_dest: " + flush_dest +                                         "\n\tview_dest: " + view_dest + "\n\tnew_view: " + new_view + '\n');        flush(flush_dest, suspected_mbrs);        if(log.isInfoEnabled())            log.info("FLUSH phase done");        /* VIEW protocol. Send to current mbrs + new_mbrs + old_mbrs - suspected_mbrs.  Since           suspected members were removed from view_dest during the previous FLUSH round(s), we           only need to add the new members.  Send TMP_VIEW event down, this allows           FLUSH/NAKACK to set membership correctly */        view_dest=computeViewDestination(new_mbrs, suspected_mbrs);        tmp_view=new View(null, view_dest);        Event view_event=new Event(Event.TMP_VIEW, tmp_view); // so the VIEW msg is sent to the correct mbrs        passDown(view_event); // needed e.g. by failure detector or UDP         if(log.isInfoEnabled()) log.info("mcasting view {" + new_vid + ", " + view_dest + '}');        passDown(new Event(Event.SWITCH_NAK_ACK));  // use ACK scheme for view bcast        Object[] args=new Object[]{new_vid, new_view.getMembers() /* these are the mbrs in the new view */};        MethodCall call=new MethodCall("handleViewChange", args, new String[]{ViewId.class.getName(), Vector.class.getName()});        callRemoteMethods(view_dest, // send to all members in 'view_dest'                call,                GroupRequest.GET_ALL, view_change_timeout);         if(log.isInfoEnabled()) log.info("mcasting view completed");        passDown(new Event(Event.SWITCH_NAK));  // back to normal NAKs ...    }    /**     * Assigns the new ltime. Installs view and view_id. Changes role to coordinator if necessary.     * Sends VIEW_CHANGE event up and down the stack.     */    public void installView(ViewId new_view, Vector mbrs) {        Object coord;        int rc;        synchronized(view_mutex) {                    // serialize access to views            ltime=Math.max(new_view.getId(), ltime);  // compute Lamport logical time            if(log.isInfoEnabled()) log.info("received view change, vid=" + new_view);            /* Check for self-inclusion: if I'm not part of the new membership, I just discard it.               This ensures that messages sent in view V1 are only received by members of V1 */            if(checkSelfInclusion(mbrs) == false) {                if(warn) log.warn("I'm not member of " + mbrs + ", discarding");                return;            }            if(view_id == null) {                view_id=(ViewId)new_view.clone();            }            else {                rc=new_view.compareTo(view_id);  // rc should always be a positive number                if(rc <= 0) {  // don't accept view id lower than our own                    if(warn) log.warn("received view <= current view; discarding it ! " +                            "(view_id: " + view_id + ", new_view: " + new_view + ')');                    return;                }                else {  // the check for vid equality was okay, assign new_view to view_id                    if(new_view.getCoordAddress() != null) {                        view_id=new ViewId(new_view.getCoordAddress(), new_view.getId());                    }                    else {                        view_id=new ViewId(view_id.getCoordAddress(), new_view.getId());                    }                }            }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -