📄 gms.java
字号:
package org.jgroups.protocols.pbcast;import org.jgroups.*;import org.jgroups.stack.Protocol;import org.jgroups.util.*;import org.jgroups.util.Queue;import org.apache.commons.logging.Log;import java.io.*;import java.util.*;import java.util.List;/** * Group membership protocol. Handles joins/leaves/crashes (suspicions) and emits new views * accordingly. Use VIEW_ENFORCER on top of this layer to make sure new members don't receive * any messages until they are members * @author Bela Ban * @version $Id: GMS.java,v 1.68 2006/10/30 11:19:20 belaban Exp $ */public class GMS extends Protocol { private GmsImpl impl=null; Address local_addr=null; final Membership members=new Membership(); // real membership private final Membership tmp_members=new Membership(); // base for computing next view /** Members joined but for which no view has been received yet */ private final Vector joining=new Vector(7); /** Members excluded from group, but for which no view has been received yet */ private final Vector leaving=new Vector(7); View view=null; ViewId view_id=null; private long ltime=0; long join_timeout=5000; long join_retry_timeout=2000; long flush_timeout=4000; long leave_timeout=5000; private long digest_timeout=0; // time to wait for a digest (from PBCAST). should be fast long merge_timeout=10000; // time to wait for all MERGE_RSPS private final Object impl_mutex=new Object(); // synchronizes event entry into impl private final Object digest_mutex=new Object(); private final Promise digest_promise=new Promise(); // holds result of GET_DIGEST event private final Promise flush_promise = new Promise(); boolean use_flush=false; private final Hashtable impls=new Hashtable(3); private boolean shun=false; boolean merge_leader=false; // can I initiate a merge ? private boolean print_local_addr=true; boolean disable_initial_coord=false; // can the member become a coord on startup or not ? /** Setting this to false disables concurrent startups. This is only used by unit testing code * for testing merging. To everybody else: don't change it to false ! */ boolean handle_concurrent_startup=true; /** Whether view bundling (http://jira.jboss.com/jira/browse/JGRP-144) should be enabled or not. Setting this to * false forces each JOIN/LEAVE/SUPSECT request to be handled separately. By default these requests are processed * together if they are queued at approximately the same time */ private boolean view_bundling=true; private long max_bundling_time=50; // 50ms max to wait for other JOIN, LEAVE or SUSPECT requests static final String CLIENT="Client"; static final String COORD="Coordinator"; static final String PART="Participant"; TimeScheduler timer=null; /** Max number of old members to keep in history */ protected int num_prev_mbrs=50; /** Keeps track of old members (up to num_prev_mbrs) */ BoundedList prev_members=null; int num_views=0; /** Stores the last 20 views */ BoundedList prev_views=new BoundedList(20); /** Class to process JOIN, LEAVE and MERGE requests */ private final ViewHandler view_handler=new ViewHandler(); /** To collect VIEW_ACKs from all members */ final AckCollector ack_collector=new AckCollector(); /** Time in ms to wait for all VIEW acks (0 == wait forever) */ long view_ack_collection_timeout=2000; /** How long should a Resumer wait until resuming the ViewHandler */ long resume_task_timeout=20000; public static final String name="GMS"; public GMS() { initState(); } public String getName() { return name; } public String getView() {return view_id != null? view_id.toString() : "null";} public int getNumberOfViews() {return num_views;} public String getLocalAddress() {return local_addr != null? local_addr.toString() : "null";} public String getMembers() {return members != null? members.toString() : "[]";} public int getNumMembers() {return members != null? members.size() : 0;} public long getJoinTimeout() {return join_timeout;} public void setJoinTimeout(long t) {join_timeout=t;} public long getJoinRetryTimeout() {return join_retry_timeout;} public void setJoinRetryTimeout(long t) {join_retry_timeout=t;} public boolean isShun() {return shun;} public void setShun(boolean s) {shun=s;} public String printPreviousMembers() { StringBuffer sb=new StringBuffer(); if(prev_members != null) { for(Enumeration en=prev_members.elements(); en.hasMoreElements();) { sb.append(en.nextElement()).append("\n"); } } return sb.toString(); } public int viewHandlerSize() {return view_handler.size();} public boolean isViewHandlerSuspended() {return view_handler.suspended();} public String dumpViewHandlerQueue() { return view_handler.dumpQueue(); } public String dumpViewHandlerHistory() { return view_handler.dumpHistory(); } public void suspendViewHandler() { view_handler.suspend(null); } public void resumeViewHandler() { view_handler.resumeForce(); } Log getLog() {return log;} ViewHandler getViewHandler() {return view_handler;} public String printPreviousViews() { StringBuffer sb=new StringBuffer(); for(Enumeration en=prev_views.elements(); en.hasMoreElements();) { sb.append(en.nextElement()).append("\n"); } return sb.toString(); } public boolean isCoordinator() { Address coord=determineCoordinator(); return coord != null && local_addr != null && local_addr.equals(coord); } public void resetStats() { super.resetStats(); num_views=0; prev_views.removeAll(); } public Vector requiredDownServices() { Vector retval=new Vector(3); retval.addElement(new Integer(Event.GET_DIGEST)); retval.addElement(new Integer(Event.SET_DIGEST)); retval.addElement(new Integer(Event.FIND_INITIAL_MBRS)); return retval; } public Vector requiredUpServices() { Vector retval=new Vector(2); if(use_flush) { retval.addElement(new Integer(Event.SUSPEND)); retval.addElement(new Integer(Event.RESUME)); } return retval; } public void setImpl(GmsImpl new_impl) { synchronized(impl_mutex) { if(impl == new_impl) // superfluous return; impl=new_impl; if(log.isDebugEnabled()) { String msg=(local_addr != null? local_addr.toString()+" " : "") + "changed role to " + new_impl.getClass().getName(); log.debug(msg); } } } public GmsImpl getImpl() { return impl; } public void init() throws Exception { prev_members=new BoundedList(num_prev_mbrs); timer=stack != null? stack.timer : null; if(timer == null) throw new Exception("GMS.init(): timer is null"); if(impl != null) impl.init(); } public void start() throws Exception { if(impl != null) impl.start(); } public void stop() { view_handler.stop(true); if(impl != null) impl.stop(); if(prev_members != null) prev_members.removeAll(); } public void becomeCoordinator() { CoordGmsImpl tmp=(CoordGmsImpl)impls.get(COORD); if(tmp == null) { tmp=new CoordGmsImpl(this); impls.put(COORD, tmp); } try { tmp.init(); } catch(Exception e) { log.error("exception switching to coordinator role", e); } setImpl(tmp); } public void becomeParticipant() { ParticipantGmsImpl tmp=(ParticipantGmsImpl)impls.get(PART); if(tmp == null) { tmp=new ParticipantGmsImpl(this); impls.put(PART, tmp); } try { tmp.init(); } catch(Exception e) { log.error("exception switching to participant", e); } setImpl(tmp); } public void becomeClient() { ClientGmsImpl tmp=(ClientGmsImpl)impls.get(CLIENT); if(tmp == null) { tmp=new ClientGmsImpl(this); impls.put(CLIENT, tmp); } try { tmp.init(); } catch(Exception e) { log.error("exception switching to client role", e); } setImpl(tmp); } boolean haveCoordinatorRole() { return impl != null && impl instanceof CoordGmsImpl; } /** * Computes the next view. Returns a copy that has <code>old_mbrs</code> and * <code>suspected_mbrs</code> removed and <code>new_mbrs</code> added. */ public View getNextView(Collection new_mbrs, Collection old_mbrs, Collection suspected_mbrs) { Vector mbrs; long vid; View v; Membership tmp_mbrs; Address tmp_mbr; synchronized(members) { if(view_id == null) { log.error("view_id is null"); return null; // this should *never* happen ! } vid=Math.max(view_id.getId(), ltime) + 1; ltime=vid; tmp_mbrs=tmp_members.copy(); // always operate on the temporary membership tmp_mbrs.remove(suspected_mbrs); tmp_mbrs.remove(old_mbrs); tmp_mbrs.add(new_mbrs); mbrs=tmp_mbrs.getMembers(); v=new View(local_addr, vid, mbrs); // Update membership (see DESIGN for explanation): tmp_members.set(mbrs); // Update joining list (see DESIGN for explanation) if(new_mbrs != null) { for(Iterator it=new_mbrs.iterator(); it.hasNext();) { tmp_mbr=(Address)it.next(); if(!joining.contains(tmp_mbr)) joining.addElement(tmp_mbr); } } // Update leaving list (see DESIGN for explanations) if(old_mbrs != null) { for(Iterator it=old_mbrs.iterator(); it.hasNext();) { Address addr=(Address)it.next(); if(!leaving.contains(addr)) leaving.add(addr); } } if(suspected_mbrs != null) { for(Iterator it=suspected_mbrs.iterator(); it.hasNext();) { Address addr=(Address)it.next(); if(!leaving.contains(addr)) leaving.add(addr); } } return v; } } /** Compute a new view, given the current view, the new members and the suspected/left members. Then simply mcast the view to all members. This is different to the VS GMS protocol, in which we run a FLUSH protocol which tries to achive consensus on the set of messages mcast in the current view before proceeding to install the next view. The members for the new view are computed as follows: <pre> existing leaving suspected joining 1. new_view y n n y 2. tmp_view y y n y (view_dest) </pre> <ol> <li> The new view to be installed includes the existing members plus the joining ones and excludes the leaving and suspected members. <li> A temporary view is sent down the stack as an <em>event</em>. This allows the bottom layer (e.g. UDP or TCP) to determine the members to which to send a multicast message. Compared to the new view, leaving members are <em>included</em> since they have are waiting for a view in which they are not members any longer before they leave. So, if we did not set a temporary view, joining members would not receive the view (signalling that they have been joined successfully). The temporary view is essentially the current view plus the joining members (old members are still part of the current view). </ol> */ public void castViewChange(Vector new_mbrs, Vector old_mbrs, Vector suspected_mbrs) { View new_view; // next view: current mbrs + new_mbrs - old_mbrs - suspected_mbrs new_view=getNextView(new_mbrs, old_mbrs, suspected_mbrs); castViewChange(new_view, null); } public void castViewChange(View new_view, Digest digest) { castViewChangeWithDest(new_view, digest, null); } /** * Broadcasts the new view and digest, and waits for acks from all members in the list given as argument. * If the list is null, we take the members who are part of new_view * @param new_view * @param digest
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -