⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 gms.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 4 页
字号:
package org.jgroups.protocols.pbcast;import org.jgroups.*;import org.jgroups.stack.Protocol;import org.jgroups.util.*;import org.jgroups.util.Queue;import org.apache.commons.logging.Log;import java.io.*;import java.util.*;import java.util.List;/** * Group membership protocol. Handles joins/leaves/crashes (suspicions) and emits new views * accordingly. Use VIEW_ENFORCER on top of this layer to make sure new members don't receive * any messages until they are members * @author Bela Ban * @version $Id: GMS.java,v 1.68 2006/10/30 11:19:20 belaban Exp $ */public class GMS extends Protocol {    private GmsImpl           impl=null;    Address                   local_addr=null;    final Membership          members=new Membership();     // real membership    private final Membership  tmp_members=new Membership(); // base for computing next view    /** Members joined but for which no view has been received yet */    private final Vector      joining=new Vector(7);    /** Members excluded from group, but for which no view has been received yet */    private final Vector      leaving=new Vector(7);    View                      view=null;    ViewId                    view_id=null;    private long              ltime=0;    long                      join_timeout=5000;    long                      join_retry_timeout=2000;    long 					  flush_timeout=4000;    long                      leave_timeout=5000;    private long              digest_timeout=0;              // time to wait for a digest (from PBCAST). should be fast    long                      merge_timeout=10000;           // time to wait for all MERGE_RSPS    private final Object      impl_mutex=new Object();       // synchronizes event entry into impl    private final Object      digest_mutex=new Object();    private final Promise     digest_promise=new Promise();  // holds result of GET_DIGEST event        private final Promise     flush_promise = new Promise();    boolean 				  use_flush=false;	    private final Hashtable   impls=new Hashtable(3);    private boolean           shun=false;    boolean                   merge_leader=false;         // can I initiate a merge ?    private boolean           print_local_addr=true;    boolean                   disable_initial_coord=false; // can the member become a coord on startup or not ?    /** Setting this to false disables concurrent startups. This is only used by unit testing code     * for testing merging. To everybody else: don't change it to false ! */    boolean                   handle_concurrent_startup=true;    /** Whether view bundling (http://jira.jboss.com/jira/browse/JGRP-144) should be enabled or not. Setting this to     * false forces each JOIN/LEAVE/SUPSECT request to be handled separately. By default these requests are processed     * together if they are queued at approximately the same time */    private boolean           view_bundling=true;    private long              max_bundling_time=50; // 50ms max to wait for other JOIN, LEAVE or SUSPECT requests    static final String       CLIENT="Client";    static final String       COORD="Coordinator";    static final String       PART="Participant";    TimeScheduler             timer=null;    /** Max number of old members to keep in history */    protected int             num_prev_mbrs=50;    /** Keeps track of old members (up to num_prev_mbrs) */    BoundedList               prev_members=null;    int num_views=0;    /** Stores the last 20 views */    BoundedList               prev_views=new BoundedList(20);    /** Class to process JOIN, LEAVE and MERGE requests */    private final ViewHandler  view_handler=new ViewHandler();    /** To collect VIEW_ACKs from all members */    final AckCollector ack_collector=new AckCollector();    /** Time in ms to wait for all VIEW acks (0 == wait forever) */    long                      view_ack_collection_timeout=2000;    /** How long should a Resumer wait until resuming the ViewHandler */    long                      resume_task_timeout=20000;    public static final String       name="GMS";    public GMS() {        initState();    }    public String getName() {        return name;    }    public String getView() {return view_id != null? view_id.toString() : "null";}    public int getNumberOfViews() {return num_views;}    public String getLocalAddress() {return local_addr != null? local_addr.toString() : "null";}    public String getMembers() {return members != null? members.toString() : "[]";}    public int getNumMembers() {return members != null? members.size() : 0;}    public long getJoinTimeout() {return join_timeout;}    public void setJoinTimeout(long t) {join_timeout=t;}    public long getJoinRetryTimeout() {return join_retry_timeout;}    public void setJoinRetryTimeout(long t) {join_retry_timeout=t;}    public boolean isShun() {return shun;}    public void setShun(boolean s) {shun=s;}    public String printPreviousMembers() {        StringBuffer sb=new StringBuffer();        if(prev_members != null) {            for(Enumeration en=prev_members.elements(); en.hasMoreElements();) {                sb.append(en.nextElement()).append("\n");            }        }        return sb.toString();    }    public int viewHandlerSize() {return view_handler.size();}    public boolean isViewHandlerSuspended() {return view_handler.suspended();}    public String dumpViewHandlerQueue() {        return view_handler.dumpQueue();    }    public String dumpViewHandlerHistory() {        return view_handler.dumpHistory();    }    public void suspendViewHandler() {        view_handler.suspend(null);    }    public void resumeViewHandler() {        view_handler.resumeForce();    }    Log getLog() {return log;}    ViewHandler getViewHandler() {return view_handler;}    public String printPreviousViews() {        StringBuffer sb=new StringBuffer();        for(Enumeration en=prev_views.elements(); en.hasMoreElements();) {            sb.append(en.nextElement()).append("\n");        }        return sb.toString();    }    public boolean isCoordinator() {        Address coord=determineCoordinator();        return coord != null && local_addr != null && local_addr.equals(coord);    }    public void resetStats() {        super.resetStats();        num_views=0;        prev_views.removeAll();    }    public Vector requiredDownServices() {        Vector retval=new Vector(3);        retval.addElement(new Integer(Event.GET_DIGEST));        retval.addElement(new Integer(Event.SET_DIGEST));        retval.addElement(new Integer(Event.FIND_INITIAL_MBRS));        return retval;    }    public Vector requiredUpServices() {        Vector retval=new Vector(2);        if(use_flush) {        	retval.addElement(new Integer(Event.SUSPEND));        	retval.addElement(new Integer(Event.RESUME));        }        return retval;    }    public void setImpl(GmsImpl new_impl) {        synchronized(impl_mutex) {            if(impl == new_impl) // superfluous                return;            impl=new_impl;            if(log.isDebugEnabled()) {                String msg=(local_addr != null? local_addr.toString()+" " : "") + "changed role to " + new_impl.getClass().getName();                log.debug(msg);            }        }    }    public GmsImpl getImpl() {        return impl;    }    public void init() throws Exception {        prev_members=new BoundedList(num_prev_mbrs);        timer=stack != null? stack.timer : null;        if(timer == null)            throw new Exception("GMS.init(): timer is null");        if(impl != null)            impl.init();    }    public void start() throws Exception {        if(impl != null) impl.start();    }    public void stop() {        view_handler.stop(true);        if(impl != null) impl.stop();        if(prev_members != null)            prev_members.removeAll();    }    public void becomeCoordinator() {        CoordGmsImpl tmp=(CoordGmsImpl)impls.get(COORD);        if(tmp == null) {            tmp=new CoordGmsImpl(this);            impls.put(COORD, tmp);        }        try {            tmp.init();        }        catch(Exception e) {            log.error("exception switching to coordinator role", e);        }        setImpl(tmp);    }    public void becomeParticipant() {        ParticipantGmsImpl tmp=(ParticipantGmsImpl)impls.get(PART);        if(tmp == null) {            tmp=new ParticipantGmsImpl(this);            impls.put(PART, tmp);        }        try {            tmp.init();        }        catch(Exception e) {            log.error("exception switching to participant", e);        }        setImpl(tmp);    }    public void becomeClient() {        ClientGmsImpl tmp=(ClientGmsImpl)impls.get(CLIENT);        if(tmp == null) {            tmp=new ClientGmsImpl(this);            impls.put(CLIENT, tmp);        }        try {            tmp.init();        }        catch(Exception e) {            log.error("exception switching to client role", e);        }        setImpl(tmp);    }    boolean haveCoordinatorRole() {        return impl != null && impl instanceof CoordGmsImpl;    }    /**     * Computes the next view. Returns a copy that has <code>old_mbrs</code> and     * <code>suspected_mbrs</code> removed and <code>new_mbrs</code> added.     */    public View getNextView(Collection new_mbrs, Collection old_mbrs, Collection suspected_mbrs) {        Vector mbrs;        long vid;        View v;        Membership tmp_mbrs;        Address tmp_mbr;        synchronized(members) {            if(view_id == null) {                log.error("view_id is null");                return null; // this should *never* happen !            }            vid=Math.max(view_id.getId(), ltime) + 1;            ltime=vid;            tmp_mbrs=tmp_members.copy();  // always operate on the temporary membership            tmp_mbrs.remove(suspected_mbrs);            tmp_mbrs.remove(old_mbrs);            tmp_mbrs.add(new_mbrs);            mbrs=tmp_mbrs.getMembers();            v=new View(local_addr, vid, mbrs);            // Update membership (see DESIGN for explanation):            tmp_members.set(mbrs);            // Update joining list (see DESIGN for explanation)            if(new_mbrs != null) {                for(Iterator it=new_mbrs.iterator(); it.hasNext();) {                    tmp_mbr=(Address)it.next();                    if(!joining.contains(tmp_mbr))                        joining.addElement(tmp_mbr);                }            }            // Update leaving list (see DESIGN for explanations)            if(old_mbrs != null) {                for(Iterator it=old_mbrs.iterator(); it.hasNext();) {                    Address addr=(Address)it.next();                    if(!leaving.contains(addr))                        leaving.add(addr);                }            }            if(suspected_mbrs != null) {                for(Iterator it=suspected_mbrs.iterator(); it.hasNext();) {                    Address addr=(Address)it.next();                    if(!leaving.contains(addr))                        leaving.add(addr);                }            }            return v;        }    }    /**     Compute a new view, given the current view, the new members and the suspected/left     members. Then simply mcast the view to all members. This is different to the VS GMS protocol,     in which we run a FLUSH protocol which tries to achive consensus on the set of messages mcast in     the current view before proceeding to install the next view.     The members for the new view are computed as follows:     <pre>                   existing          leaving        suspected          joining     1. new_view      y                 n               n                 y     2. tmp_view      y                 y               n                 y     (view_dest)     </pre>     <ol>     <li>     The new view to be installed includes the existing members plus the joining ones and     excludes the leaving and suspected members.     <li>     A temporary view is sent down the stack as an <em>event</em>. This allows the bottom layer     (e.g. UDP or TCP) to determine the members to which to send a multicast message. Compared     to the new view, leaving members are <em>included</em> since they have are waiting for a     view in which they are not members any longer before they leave. So, if we did not set a     temporary view, joining members would not receive the view (signalling that they have been     joined successfully). The temporary view is essentially the current view plus the joining     members (old members are still part of the current view).     </ol>     */    public void castViewChange(Vector new_mbrs, Vector old_mbrs, Vector suspected_mbrs) {        View new_view;        // next view: current mbrs + new_mbrs - old_mbrs - suspected_mbrs        new_view=getNextView(new_mbrs, old_mbrs, suspected_mbrs);        castViewChange(new_view, null);    }    public void castViewChange(View new_view, Digest digest) {        castViewChangeWithDest(new_view, digest, null);    }    /**     * Broadcasts the new view and digest, and waits for acks from all members in the list given as argument.     * If the list is null, we take the members who are part of new_view     * @param new_view     * @param digest

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -