📄 gms.java
字号:
// $Id: GMS.java,v 1.16 2005/10/26 16:07:33 belaban Exp $package org.jgroups.protocols;import org.jgroups.*;import org.jgroups.blocks.GroupRequest;import org.jgroups.blocks.MethodCall;import org.jgroups.stack.Protocol;import org.jgroups.stack.RpcProtocol;import org.jgroups.util.Queue;import org.jgroups.util.QueueClosedException;import java.util.Hashtable;import java.util.Properties;import java.util.Vector;/** * Group membership protocol. Handles joins/leaves/crashes (suspicions) and emits new views * accordingly. Use VIEW_ENFORCER on top of this layer to make sure new members don't receive * any messages until they are members. * * @author Bela Ban */public class GMS extends RpcProtocol implements Runnable { private GmsImpl impl=null; public Address local_addr=null; public String group_addr=null; public final Membership mbrs=new Membership(); public ViewId view_id=null; public long ltime=0; public long join_timeout=5000; public long join_retry_timeout=2000; private long flush_timeout=0; // 0=wait forever until FLUSH completes private long rebroadcast_timeout=0; // 0=wait forever until REBROADCAST completes private long view_change_timeout=10000; // until all handleViewChange() RPCs have returned public long leave_timeout=5000; public final Object impl_mutex=new Object(); // synchronizes event entry into impl public final Object view_mutex=new Object(); // synchronizes view installations private Queue event_queue=new Queue(); // stores SUSPECT, MERGE events private Thread evt_thread=null; private final Object flush_mutex=new Object(); private FlushRsp flush_rsp=null; private final Object rebroadcast_mutex=new Object(); private boolean rebroadcast_unstable_msgs=true; private boolean print_local_addr=true; boolean disable_initial_coord=false; // can the member become a coord on startup or not ? private final Hashtable impls=new Hashtable(); static final String CLIENT="Client"; static final String COORD="Coordinator"; static final String PART="Participant"; public static final String name="GMS"; public GMS() { initState(); } public String getName() { return name; } public Vector requiredDownServices() { Vector retval=new Vector(); retval.addElement(new Integer(Event.FLUSH)); retval.addElement(new Integer(Event.FIND_INITIAL_MBRS)); return retval; } public void setImpl(GmsImpl new_impl) { synchronized(impl_mutex) { impl=new_impl; if(log.isInfoEnabled()) log.info("changed role to " + new_impl.getClass().getName()); } } public void start() throws Exception { super.start(); if(checkForViewEnforcer(up_prot) == false) { if(warn) log.warn("I need protocol layer " + "VIEW_ENFORCER above me to discard messages sent to me while I'm " + "not yet a group member ! Otherwise, these messages will be delivered " + "to the application without checking...\n"); } if(_corr != null) _corr.setDeadlockDetection(true); else throw new Exception("GMS.start(): cannot set deadlock detection in corr, as it is null !"); } public void becomeCoordinator() { CoordGmsImpl tmp=(CoordGmsImpl)impls.get(COORD); if(tmp == null) { tmp=new CoordGmsImpl(this); tmp.leaving=false; tmp.received_last_view=false; // +++ ? impls.put(COORD, tmp); } setImpl(tmp); } public void becomeParticipant() { ParticipantGmsImpl tmp=(ParticipantGmsImpl)impls.get(PART); if(tmp == null) { tmp=new ParticipantGmsImpl(this); tmp.leaving=false; tmp.received_final_view=false; impls.put(PART, tmp); } setImpl(tmp); } public void becomeClient() { ClientGmsImpl tmp=(ClientGmsImpl)impls.get(CLIENT); if(tmp == null) { tmp=new ClientGmsImpl(this); impls.put(CLIENT, tmp); } else tmp.init(); setImpl(tmp); } boolean haveCoordinatorRole() { return impl != null && impl instanceof CoordGmsImpl; } /** * Computes the next view. Returns a copy that has <code>old_mbrs</code> and * <code>suspected_mbrs</code> removed and <code>new_mbrs</code> added. */ public View getNextView(Vector new_mbrs, Vector old_mbrs, Vector suspected_mbrs) { Vector members; long vid; View v; Membership tmp_mbrs; Vector mbrs_to_remove=new Vector(); if(old_mbrs != null && old_mbrs.size() > 0) for(int i=0; i < old_mbrs.size(); i++) mbrs_to_remove.addElement(old_mbrs.elementAt(i)); if(suspected_mbrs != null && suspected_mbrs.size() > 0) for(int i=0; i < suspected_mbrs.size(); i++) if(!mbrs_to_remove.contains(suspected_mbrs.elementAt(i))) mbrs_to_remove.addElement(suspected_mbrs.elementAt(i)); synchronized(view_mutex) { vid=Math.max(view_id.getId(), ltime) + 1; ltime=vid; tmp_mbrs=this.mbrs.copy(); tmp_mbrs.merge(new_mbrs, mbrs_to_remove); members=(Vector)tmp_mbrs.getMembers().clone(); v=new View(local_addr, vid, members); return v; } } /** * Return a copy of the current membership minus the suspected members: FLUSH request is not sent * to suspected members (because they won't respond, and not to joining members either. * It IS sent to leaving members (before they are allowed to leave). */ Vector computeFlushDestination(Vector suspected_mbrs) { Vector ret=mbrs.getMembers(); // *copy* of current membership if(suspected_mbrs != null && suspected_mbrs.size() > 0) for(int i=0; i < suspected_mbrs.size(); i++) ret.removeElement(suspected_mbrs.elementAt(i)); return ret; } /** * Compute the destination set to which to send a VIEW_CHANGE message. This is the current * members + the leaving members (old_mbrs) + the joining members (new_mbrs) - the suspected * members. */ private Vector computeViewDestination(Vector new_mbrs, Vector suspected_mbrs) { Vector ret=mbrs.getMembers(); // **copy* of current membership Address mbr; // add new members if(new_mbrs != null) { for(int i=0; i < new_mbrs.size(); i++) { mbr=(Address)new_mbrs.elementAt(i); if(!ret.contains(mbr)) ret.addElement(new_mbrs.elementAt(i)); } } // old members are still in existing membership, don't need to add them explicitely // remove suspected members if(suspected_mbrs != null) { for(int i=0; i < suspected_mbrs.size(); i++) { mbr=(Address)suspected_mbrs.elementAt(i); ret.removeElement(mbr); } } return ret; } /** * FLUSH protocol. * Send to current mbrs - suspected_mbrs (not including new_mbrs, but including old_mbr) * Send TMP_VIEW event down, * this allows FLUSH/NAKACK to set membership correctly */ public void flush(Vector flush_dest, Vector suspected_mbrs) { Vector rebroadcast_msgs=new Vector(); if(suspected_mbrs == null) suspected_mbrs=new Vector(); while(flush_dest.size() > 0) { flush_rsp=null; synchronized(flush_mutex) { passDown(new Event(Event.FLUSH, flush_dest)); // send FLUSH to members in flush_dest if(flush_rsp == null) { try { flush_mutex.wait(flush_timeout); } catch(Exception e) { } } } if(flush_rsp == null) { break; } if(rebroadcast_unstable_msgs && flush_rsp.unstable_msgs != null && flush_rsp.unstable_msgs.size() > 0) { Message m; for(int i=0; i < flush_rsp.unstable_msgs.size(); i++) { m=(Message)flush_rsp.unstable_msgs.elementAt(i); // just add msg, NAKACK.RESEND will weed out duplicates based on // <sender:id> before re-broadcasting msgs rebroadcast_msgs.addElement(m); } } if(flush_rsp.result == true) break; else { if(flush_rsp.failed_mbrs != null) { for(int i=0; i < flush_rsp.failed_mbrs.size(); i++) { flush_dest.removeElement(flush_rsp.failed_mbrs.elementAt(i)); suspected_mbrs.addElement(flush_rsp.failed_mbrs.elementAt(i)); } } } } // while if(log.isInfoEnabled()) log.info("flushing completed."); // Rebroadcast unstable messages if(rebroadcast_unstable_msgs && rebroadcast_msgs.size() > 0) { if(log.isInfoEnabled()) log.info("re-broadcasting unstable messages (" + rebroadcast_msgs.size() + ')'); // NAKACK layer will rebroadcast the msgs (using the same seqnos assigned earlier) synchronized(rebroadcast_mutex) { passDown(new Event(Event.REBROADCAST_MSGS, rebroadcast_msgs)); try { rebroadcast_mutex.wait(rebroadcast_timeout); } catch(Exception e) { } } if(log.isInfoEnabled()) log.info("re-broadcasting messages completed"); } } /** * Compute a new view, given the current view, the new members and the suspected/left * members. Run view update protocol to install a new view in all members (this involves * casting the new view to all members). The targets for FLUSH and VIEW mcasts are * computed as follows:<p> * <pre> * existing leaving suspected joining * <p/> * 1. FLUSH y y n n * 2. new_view y n n y * 3. tmp_view y y n y * (view_dest) * </pre> * <p/> * <ol> * <li> * The FLUSH is only sent to the existing and leaving members (they are the only ones that might have * old messages not yet seen by the group. The suspected members would not answer anyway (because they * have failed) and the joining members have certainly no old messages. * <li> * The new view to be installed includes the existing members plus the joining ones and * excludes the leaving and suspected members. * <li> * A temporary view is sent down the stack as an <em>event</em>. This allows the bottom layer * (e.g. UDP or TCP) to determine the members to which to send a multicast message. Compared * to the new view, leaving members are <em>included</em> since they have are waiting for a * view in which they are not members any longer before they leave. So, if we did not set a * temporary view, joining members would not receive the view (signalling that they have been * joined successfully). The temporary view is essentially the current view plus the joining * members (old members are still part of the current view). * </ol> */ public void castViewChange(Vector new_mbrs, Vector old_mbrs, Vector suspected_mbrs) { View new_view, tmp_view; ViewId new_vid; Vector flush_dest=computeFlushDestination(suspected_mbrs); // members to which FLUSH/VIEW is sent Vector view_dest=computeViewDestination(new_mbrs, suspected_mbrs); // dest for view change // next view: current mbrs + new_mbrs - old_mbrs - suspected_mbrs new_view=getNextView(new_mbrs, old_mbrs, suspected_mbrs); new_vid=new_view.getVid(); if(log.isInfoEnabled()) log.info("FLUSH phase, flush_dest: " + flush_dest + "\n\tview_dest: " + view_dest + "\n\tnew_view: " + new_view + '\n'); flush(flush_dest, suspected_mbrs); if(log.isInfoEnabled()) log.info("FLUSH phase done"); /* VIEW protocol. Send to current mbrs + new_mbrs + old_mbrs - suspected_mbrs. Since suspected members were removed from view_dest during the previous FLUSH round(s), we only need to add the new members. Send TMP_VIEW event down, this allows FLUSH/NAKACK to set membership correctly */ view_dest=computeViewDestination(new_mbrs, suspected_mbrs); tmp_view=new View(null, view_dest); Event view_event=new Event(Event.TMP_VIEW, tmp_view); // so the VIEW msg is sent to the correct mbrs passDown(view_event); // needed e.g. by failure detector or UDP if(log.isInfoEnabled()) log.info("mcasting view {" + new_vid + ", " + view_dest + '}'); passDown(new Event(Event.SWITCH_NAK_ACK)); // use ACK scheme for view bcast Object[] args=new Object[]{new_vid, new_view.getMembers() /* these are the mbrs in the new view */}; MethodCall call=new MethodCall("handleViewChange", args, new String[]{ViewId.class.getName(), Vector.class.getName()}); callRemoteMethods(view_dest, // send to all members in 'view_dest' call, GroupRequest.GET_ALL, view_change_timeout); if(log.isInfoEnabled()) log.info("mcasting view completed"); passDown(new Event(Event.SWITCH_NAK)); // back to normal NAKs ... } /** * Assigns the new ltime. Installs view and view_id. Changes role to coordinator if necessary. * Sends VIEW_CHANGE event up and down the stack. */ public void installView(ViewId new_view, Vector mbrs) { Object coord; int rc; synchronized(view_mutex) { // serialize access to views ltime=Math.max(new_view.getId(), ltime); // compute Lamport logical time if(log.isInfoEnabled()) log.info("received view change, vid=" + new_view); /* Check for self-inclusion: if I'm not part of the new membership, I just discard it. This ensures that messages sent in view V1 are only received by members of V1 */ if(checkSelfInclusion(mbrs) == false) { if(warn) log.warn("I'm not member of " + mbrs + ", discarding"); return; } if(view_id == null) { view_id=(ViewId)new_view.clone(); } else { rc=new_view.compareTo(view_id); // rc should always be a positive number if(rc <= 0) { // don't accept view id lower than our own if(warn) log.warn("received view <= current view; discarding it ! " + "(view_id: " + view_id + ", new_view: " + new_view + ')'); return; } else { // the check for vid equality was okay, assign new_view to view_id if(new_view.getCoordAddress() != null) { view_id=new ViewId(new_view.getCoordAddress(), new_view.getId()); } else { view_id=new ViewId(view_id.getCoordAddress(), new_view.getId()); } } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -