📄 coordgmsimpl.java
字号:
// $Id: CoordGmsImpl.java,v 1.52 2006/10/23 16:12:23 vlada Exp $package org.jgroups.protocols.pbcast;import org.jgroups.*;import org.jgroups.util.TimeScheduler;import java.util.*;/** * Coordinator role of the Group MemberShip (GMS) protocol. Accepts JOIN and LEAVE requests and emits view changes * accordingly. * @author Bela Ban */public class CoordGmsImpl extends GmsImpl { private boolean merging=false; private final MergeTask merge_task=new MergeTask(); private final Vector merge_rsps=new Vector(11); // for MERGE_REQ/MERGE_RSP correlation, contains MergeData elements private ViewId merge_id=null; private Address merge_leader=null; private MergeCanceller merge_canceller=null; private final Object merge_canceller_mutex=new Object(); /** the max time in ms to suspend message garbage collection */ private final Long MAX_SUSPEND_TIMEOUT=new Long(30000); public CoordGmsImpl(GMS g) { super(g); } private void setMergeId(ViewId merge_id) { this.merge_id=merge_id; synchronized(merge_canceller_mutex) { if(this.merge_id != null) { stopMergeCanceller(); merge_canceller=new MergeCanceller(this.merge_id, gms.merge_timeout); gms.timer.add(merge_canceller); } else { // merge completed stopMergeCanceller(); } } } private void stopMergeCanceller() { synchronized(merge_canceller_mutex) { if(merge_canceller != null) { merge_canceller.cancel(); merge_canceller=null; } } } public void init() throws Exception { super.init(); cancelMerge(); } public void join(Address mbr) { wrongMethod("join"); } /** The coordinator itself wants to leave the group */ public void leave(Address mbr) { if(mbr == null) { if(log.isErrorEnabled()) log.error("member's address is null !"); return; } if(mbr.equals(gms.local_addr)) leaving=true; gms.getViewHandler().add(new GMS.Request(GMS.Request.LEAVE, mbr, false, null)); gms.getViewHandler().stop(true); // wait until all requests have been processed, then close the queue and leave gms.getViewHandler().waitUntilCompleted(gms.leave_timeout); } public void handleJoinResponse(JoinRsp join_rsp) { } public void handleLeaveResponse() { } public void suspect(Address mbr) { if(mbr.equals(gms.local_addr)) { if(warn) log.warn("I am the coord and I'm being am suspected -- will probably leave shortly"); return; } Collection emptyVector=new LinkedHashSet(0); Collection suspected=new LinkedHashSet(1); suspected.add(mbr); handleMembershipChange(emptyVector, emptyVector, suspected); } public void unsuspect(Address mbr) { } /** * Invoked upon receiving a MERGE event from the MERGE layer. Starts the merge protocol. * See description of protocol in DESIGN. * @param other_coords A list of coordinators (including myself) found by MERGE protocol */ public void merge(Vector other_coords) { Membership tmp; if(merging) { if(warn) log.warn("merge already in progress, discarded MERGE event (I am " + gms.local_addr + ")"); return; } merge_leader=null; if(other_coords == null) { if(warn) log.warn("list of other coordinators is null. Will not start merge."); return; } if(other_coords.size() <= 1) { if(log.isErrorEnabled()) log.error("number of coordinators found is " + other_coords.size() + "; will not perform merge"); return; } /* Establish deterministic order, so that coords can elect leader */ tmp=new Membership(other_coords); tmp.sort(); merge_leader=(Address)tmp.elementAt(0); // if(log.isDebugEnabled()) log.debug("coordinators in merge protocol are: " + tmp); if(merge_leader.equals(gms.local_addr) || gms.merge_leader) { if(trace) log.trace("I (" + gms.local_addr + ") will be the leader. Starting the merge task"); startMergeTask(other_coords); } else { if(trace) log.trace("I (" + gms.local_addr + ") am not the merge leader, " + "waiting for merge leader (" + merge_leader + ")to initiate merge"); } } /** * Get the view and digest and send back both (MergeData) in the form of a MERGE_RSP to the sender. * If a merge is already in progress, send back a MergeData with the merge_rejected field set to true. */ public void handleMergeRequest(Address sender, ViewId merge_id) { Digest digest; View view; if(sender == null) { if(log.isErrorEnabled()) log.error("sender == null; cannot send back a response"); return; } if(merging) { if(log.isErrorEnabled()) log.error("merge already in progress"); sendMergeRejectedResponse(sender, merge_id); return; } merging=true; /* Clears the view handler queue and discards all JOIN/LEAVE/MERGE requests until after the MERGE */ gms.getViewHandler().suspend(merge_id); setMergeId(merge_id); if(log.isDebugEnabled()) log.debug("sender=" + sender + ", merge_id=" + merge_id); digest=gms.getDigest(); view=new View(gms.view_id.copy(), gms.members.getMembers()); gms.passDown(new Event(Event.ENABLE_UNICASTS_TO, sender)); sendMergeResponse(sender, view, digest); } private MergeData getMergeResponse(Address sender, ViewId merge_id) { Digest digest; View view; MergeData retval; if(sender == null) { if(log.isErrorEnabled()) log.error("sender == null; cannot send back a response"); return null; } if(merging) { if(log.isErrorEnabled()) log.error("merge already in progress"); retval=new MergeData(sender, null, null); retval.merge_rejected=true; return retval; } merging=true; setMergeId(merge_id); if(log.isDebugEnabled()) log.debug("sender=" + sender + ", merge_id=" + merge_id); try { digest=gms.getDigest(); view=new View(gms.view_id.copy(), gms.members.getMembers()); retval=new MergeData(sender, view, digest); retval.view=view; retval.digest=digest; } catch(NullPointerException null_ex) { return null; } return retval; } public void handleMergeResponse(MergeData data, ViewId merge_id) { if(data == null) { if(log.isErrorEnabled()) log.error("merge data is null"); return; } if(merge_id == null || this.merge_id == null) { if(log.isErrorEnabled()) log.error("merge_id (" + merge_id + ") or this.merge_id (" + this.merge_id + ") is null (sender=" + data.getSender() + ")."); return; } if(!this.merge_id.equals(merge_id)) { if(log.isErrorEnabled()) log.error("this.merge_id (" + this.merge_id + ") is different from merge_id (" + merge_id + ')'); return; } synchronized(merge_rsps) { if(!merge_rsps.contains(data)) { merge_rsps.addElement(data); merge_rsps.notifyAll(); } } } /** * If merge_id is not equal to this.merge_id then discard. * Else cast the view/digest to all members of this group. */ public void handleMergeView(MergeData data, ViewId merge_id) { if(merge_id == null || this.merge_id == null || !this.merge_id.equals(merge_id)) { if(log.isErrorEnabled()) log.error("merge_ids don't match (or are null); merge view discarded"); return; } java.util.List my_members=gms.view != null? gms.view.getMembers() : null; // only send to our *current* members, if we have A and B being merged (we are B), then we would *not* // receive a VIEW_ACK from A because A doesn't see us in the pre-merge view yet and discards the view GMS.Request req=new GMS.Request(GMS.Request.VIEW); req.view=data.view; req.digest=data.digest; req.target_members=my_members; gms.getViewHandler().add(req, true, // at head so it is processed next true); // un-suspend the queue merging=false; } public void handleMergeCancelled(ViewId merge_id) { if(merge_id != null && this.merge_id != null && this.merge_id.equals(merge_id)) { if(log.isDebugEnabled()) log.debug("merge was cancelled (merge_id=" + merge_id + ", local_addr=" + gms.local_addr +")"); setMergeId(null); this.merge_leader=null; merging=false; gms.getViewHandler().resume(merge_id); } } private void cancelMerge() { Object tmp=merge_id; if(merge_id != null && log.isDebugEnabled()) log.debug("cancelling merge (merge_id=" + merge_id + ')'); setMergeId(null); this.merge_leader=null; stopMergeTask(); merging=false; synchronized(merge_rsps) { merge_rsps.clear(); } gms.getViewHandler().resume(tmp); } /** * Computes the new view (including the newly joined member) and get the digest from PBCAST. * Returns both in the form of a JoinRsp */ /*private synchronized void handleJoin(Address mbr) { View v; Digest d, tmp; JoinRsp join_rsp; if(mbr == null) { if(log.isErrorEnabled()) log.error("mbr is null"); return; } if(gms.local_addr.equals(mbr)) { if(log.isErrorEnabled()) log.error("cannot join myself !"); return; } if(log.isDebugEnabled()) log.debug("mbr=" + mbr); if(gms.members.contains(mbr)) { // already joined: return current digest and membership if(log.isWarnEnabled()) log.warn(mbr + " already present; returning existing view " + gms.view); join_rsp=new JoinRsp(new View(gms.view_id, gms.members.getMembers()), gms.getDigest()); sendJoinResponse(join_rsp, mbr); return; } try { // we cannot garbage collect during joining a new member *if* we're the only member // Example: {A}, B joins, after returning JoinRsp to B, A garbage collects messages higher than those in the // digest returned to the client, so the client will *not* be able to ask for retransmission of those // messages if he misses them gms.passDown(new Event(Event.SUSPEND_STABLE, MAX_SUSPEND_TIMEOUT)); Vector new_mbrs=new Vector(1); new_mbrs.addElement(mbr); tmp=gms.getDigest(); // get existing digest if(tmp == null) { if(log.isErrorEnabled()) log.error("received null digest from GET_DIGEST: will cause JOIN to fail"); return; } d=new Digest(tmp.size() + 1); // create a new digest, which contains 1 more member d.add(tmp); // add the existing digest to the new one
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -