📄 coordgmsimpl.java
字号:
} catch(Exception ex) { } } if(log.isDebugEnabled()) log.debug("num_rsps_expected=" + num_rsps_expected + ", actual responses=" + merge_rsps.size()); if(merge_rsps.size() >= num_rsps_expected) break; curr_time=System.currentTimeMillis(); } stop=System.currentTimeMillis(); if(trace) log.trace("collected " + merge_rsps.size() + " merge response(s) in " + (stop-start) + "ms"); } } /** * Generates a unique merge id by taking the local address and the current time */ private ViewId generateMergeId() { return new ViewId(gms.local_addr, System.currentTimeMillis()); // we're (ab)using ViewId as a merge id } /** * Merge all MergeData. All MergeData elements should be disjunct (both views and digests). However, * this method is prepared to resolve duplicate entries (for the same member). Resolution strategy for * views is to merge only 1 of the duplicate members. Resolution strategy for digests is to take the higher * seqnos for duplicate digests.<p> * After merging all members into a Membership and subsequent sorting, the first member of the sorted membership * will be the new coordinator. * @param v A list of MergeData items. Elements with merge_rejected=true were removed before. Is guaranteed * not to be null and to contain at least 1 member. */ private MergeData consolidateMergeData(Vector v) { MergeData ret; MergeData tmp_data; long logical_time=0; // for new_vid ViewId new_vid, tmp_vid; MergeView new_view; View tmp_view; Membership new_mbrs=new Membership(); int num_mbrs; Digest new_digest; Address new_coord; Vector subgroups=new Vector(11); // contains a list of Views, each View is a subgroup for(int i=0; i < v.size(); i++) { tmp_data=(MergeData)v.elementAt(i); if(log.isDebugEnabled()) log.debug("merge data is " + tmp_data); tmp_view=tmp_data.getView(); if(tmp_view != null) { tmp_vid=tmp_view.getVid(); if(tmp_vid != null) { // compute the new view id (max of all vids +1) logical_time=Math.max(logical_time, tmp_vid.getId()); } // merge all membership lists into one (prevent duplicates) new_mbrs.add(tmp_view.getMembers()); subgroups.addElement(tmp_view.clone()); } } // the new coordinator is the first member of the consolidated & sorted membership list new_mbrs.sort(); num_mbrs=new_mbrs.size(); new_coord=num_mbrs > 0? (Address)new_mbrs.elementAt(0) : null; if(new_coord == null) { if(log.isErrorEnabled()) log.error("new_coord == null"); return null; } // should be the highest view ID seen up to now plus 1 new_vid=new ViewId(new_coord, logical_time + 1); // determine the new view new_view=new MergeView(new_vid, new_mbrs.getMembers(), subgroups); if(log.isDebugEnabled()) log.debug("new merged view will be " + new_view); // determine the new digest new_digest=consolidateDigests(v, num_mbrs); if(new_digest == null) { if(log.isErrorEnabled()) log.error("digest could not be consolidated"); return null; } if(log.isDebugEnabled()) log.debug("consolidated digest=" + new_digest); ret=new MergeData(gms.local_addr, new_view, new_digest); return ret; } /** * Merge all digests into one. For each sender, the new value is min(low_seqno), max(high_seqno), * max(high_seqno_seen) */ private Digest consolidateDigests(Vector v, int num_mbrs) { MergeData data; Digest tmp_digest, retval=new Digest(num_mbrs); for(int i=0; i < v.size(); i++) { data=(MergeData)v.elementAt(i); tmp_digest=data.getDigest(); if(tmp_digest == null) { if(log.isErrorEnabled()) log.error("tmp_digest == null; skipping"); continue; } retval.merge(tmp_digest); } return retval; } /** * Sends the new view and digest to all subgroup coordinors in coords. Each coord will in turn * <ol> * <li>cast the new view and digest to all the members of its subgroup (MergeView) * <li>on reception of the view, if it is a MergeView, each member will set the digest and install * the new view * </ol> */ private void sendMergeView(Vector coords, MergeData combined_merge_data) { Message msg; GMS.GmsHeader hdr; Address coord; View v; Digest d; if(coords == null || combined_merge_data == null) return; v=combined_merge_data.view; d=combined_merge_data.digest; if(v == null || d == null) { if(log.isErrorEnabled()) log.error("view or digest is null, cannot send consolidated merge view/digest"); return; } if(trace) log.trace("sending merge view " + v.getVid() + " to coordinators " + coords); for(int i=0; i < coords.size(); i++) { coord=(Address)coords.elementAt(i); msg=new Message(coord, null, null); hdr=new GMS.GmsHeader(GMS.GmsHeader.INSTALL_MERGE_VIEW); hdr.view=v; hdr.my_digest=d; hdr.merge_id=merge_id; msg.putHeader(gms.getName(), hdr); gms.passDown(new Event(Event.MSG, msg)); } } /** * Send back a response containing view and digest to sender */ private void sendMergeResponse(Address sender, View view, Digest digest) { Message msg=new Message(sender, null, null); GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.MERGE_RSP); hdr.merge_id=merge_id; hdr.view=view; hdr.my_digest=digest; msg.putHeader(gms.getName(), hdr); if(log.isDebugEnabled()) log.debug("response=" + hdr); gms.passDown(new Event(Event.MSG, msg)); } private void sendMergeCancelledMessage(Vector coords, ViewId merge_id) { Message msg; GMS.GmsHeader hdr; Address coord; if(coords == null || merge_id == null) { if(log.isErrorEnabled()) log.error("coords or merge_id == null"); return; } for(int i=0; i < coords.size(); i++) { coord=(Address)coords.elementAt(i); msg=new Message(coord, null, null); hdr=new GMS.GmsHeader(GMS.GmsHeader.CANCEL_MERGE); hdr.merge_id=merge_id; msg.putHeader(gms.getName(), hdr); gms.passDown(new Event(Event.MSG, msg)); } } /** Removed rejected merge requests from merge_rsps and coords */ private void removeRejectedMergeRequests(Vector coords) { MergeData data; for(Iterator it=merge_rsps.iterator(); it.hasNext();) { data=(MergeData)it.next(); if(data.merge_rejected) { if(data.getSender() != null && coords != null) coords.removeElement(data.getSender()); it.remove(); if(log.isDebugEnabled()) log.debug("removed element " + data); } } } /* --------------------------------------- End of Private methods ------------------------------------- */ /** * Starts the merge protocol (only run by the merge leader). Essentially sends a MERGE_REQ to all * coordinators of all subgroups found. Each coord receives its digest and view and returns it. * The leader then computes the digest and view for the new group from the return values. Finally, it * sends this merged view/digest to all subgroup coordinators; each coordinator will install it in their * subgroup. */ private class MergeTask implements Runnable { Thread t=null; Vector coords=null; // list of subgroup coordinators to be contacted public void start(Vector coords) { if(t == null || !t.isAlive()) { this.coords=(Vector)(coords != null? coords.clone() : null); t=new Thread(this, "MergeTask"); t.setDaemon(true); t.start(); } } public void stop() { Thread tmp=t; if(isRunning()) { t=null; tmp.interrupt(); } t=null; coords=null; } public boolean isRunning() { return t != null && t.isAlive(); } /** * Runs the merge protocol as a leader */ public void run() { MergeData combined_merge_data; if(merging == true) { if(warn) log.warn("merge is already in progress, terminating"); return; } if(log.isDebugEnabled()) log.debug("merge task started, coordinators are " + this.coords); try { /* 1. Generate a merge_id that uniquely identifies the merge in progress */ setMergeId(generateMergeId()); /* 2. Fetch the current Views/Digests from all subgroup coordinators */ getMergeDataFromSubgroupCoordinators(coords, gms.merge_timeout); /* 3. Remove rejected MergeData elements from merge_rsp and coords (so we'll send the new view only to members who accepted the merge request) */ removeRejectedMergeRequests(coords); if(merge_rsps.size() <= 1) { if(warn) log.warn("merge responses from subgroup coordinators <= 1 (" + merge_rsps + "). Cancelling merge"); sendMergeCancelledMessage(coords, merge_id); return; } /* 4. Combine all views and digests into 1 View/1 Digest */ combined_merge_data=consolidateMergeData(merge_rsps); if(combined_merge_data == null) { if(log.isErrorEnabled()) log.error("combined_merge_data == null"); sendMergeCancelledMessage(coords, merge_id); return; } /* 5. Don't allow JOINs or LEAVEs until we are done with the merge. Suspend() will clear the view handler queue, so no requests beyond this current MERGE request will be processed */ gms.getViewHandler().suspend(merge_id); /* 6. Send the new View/Digest to all coordinators (including myself). On reception, they will install the digest and view in all of their subgroup members */ sendMergeView(coords, combined_merge_data); } catch(Throwable ex) { if(log.isErrorEnabled()) log.error("exception while merging", ex); } finally { sendMergeCancelledMessage(coords, merge_id); stopMergeCanceller(); // this is probably not necessary merging=false; merge_leader=null; if(log.isDebugEnabled()) log.debug("merge task terminated"); t=null; } } } private class MergeCanceller implements TimeScheduler.Task { private Object my_merge_id=null; private long timeout; private boolean cancelled=false; MergeCanceller(Object my_merge_id, long timeout) { this.my_merge_id=my_merge_id; this.timeout=timeout; } public boolean cancelled() { return cancelled; } public void cancel() { cancelled=true; } public long nextInterval() { return timeout; } public void run() { if(merge_id != null && my_merge_id.equals(merge_id)) { if(trace) log.trace("cancelling merge due to timer timeout (" + timeout + " ms)"); cancelMerge(); cancelled=true; } else { if(trace) log.trace("timer kicked in after " + timeout + " ms, but no (or different) merge was in progress: " + "merge_id=" + merge_id + ", my_merge_id=" + my_merge_id); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -