📄 coordgmsimpl.java
字号:
d.add(mbr, 0, 0); // ... and add the new member. it's first seqno will be 1 v=gms.getNextView(new_mbrs, null, null); if(log.isDebugEnabled()) log.debug("joined member " + mbr + ", view is " + v); join_rsp=new JoinRsp(v, d); // 2. Send down a local TMP_VIEW event. This is needed by certain layers (e.g. NAKACK) to compute correct digest // in case client's next request (e.g. getState()) reaches us *before* our own view change multicast. // Check NAKACK's TMP_VIEW handling for details if(join_rsp.getView() != null) gms.passDown(new Event(Event.TMP_VIEW, join_rsp.getView())); Vector tmp_mbrs=join_rsp.getView() != null? new Vector(join_rsp.getView().getMembers()) : null; if(gms.use_flush) { //3a. FLUSH protocol is in use. First we FLUSH current members. Then we send a // view to a joining member and we will wait for his ACK together with view // ACKs from current members (castViewChangeWithDest). After all ACKS have been // collected, FLUSH is stopped (below in finally clause) and thus members are // allowed to send messages again. gms.startFlush(join_rsp.getView()); sendJoinResponse(join_rsp, mbr); gms.castViewChangeWithDest(join_rsp.getView(), null, tmp_mbrs); } else { //3b. Broadcast the new view // we'll multicast the new view first and only, when everyone has replied with a VIEW_ACK (or timeout), // send the JOIN_RSP back to the client. This prevents the client from sending multicast messages in // view V2 which may get dropped by existing members because they're still in view V1. // (http://jira.jboss.com/jira/browse/JGRP-235) if(tmp_mbrs != null) tmp_mbrs.remove(mbr); // exclude the newly joined member from VIEW_ACKs gms.castViewChangeWithDest(join_rsp.getView(), null, tmp_mbrs); // 4. Return result to client sendJoinResponse(join_rsp, mbr); } } finally { if(gms.use_flush) gms.stopFlush(); gms.passDown(new Event(Event.RESUME_STABLE)); } }*/ /** Exclude <code>mbr</code> from the membership. If <code>suspected</code> is true, then this member crashed and therefore is forced to leave, otherwise it is leaving voluntarily. */ /*private void handleLeave(Address mbr, boolean suspected) { View new_view=null; Vector v=new Vector(1); v.addElement(mbr); // contains either leaving mbrs or suspected mbrs if(log.isDebugEnabled()) log.debug("mbr=" + mbr); if(!gms.members.contains(mbr)) { if(trace) log.trace("mbr " + mbr + " is not a member !"); return; } if(gms.view_id == null) { // we're probably not the coord anymore (we just left ourselves), let someone else do it // (client will retry when it doesn't get a response if(log.isDebugEnabled()) log.debug("gms.view_id is null, I'm not the coordinator anymore (leaving=" + leaving + "); the new coordinator will handle the leave request"); return; } try { sendLeaveResponse(mbr); // send an ack to the leaving member if(suspected) new_view=gms.getNextView(null, null, v); else new_view=gms.getNextView(null, v, null); if(gms.use_flush) { gms.startFlush(new_view); } gms.castViewChange(new_view, null); } finally { if(gms.use_flush) { gms.stopFlush(); } } if(leaving) { gms.passUp(new Event(Event.DISCONNECT_OK)); gms.initState(); // in case connect() is called again } }*/ public void handleMembershipChange(Collection new_mbrs, Collection leaving_mbrs, Collection suspected_mbrs) { if(new_mbrs == null) new_mbrs=new LinkedHashSet(0); if(suspected_mbrs == null) suspected_mbrs=new LinkedHashSet(0); if(leaving_mbrs == null) leaving_mbrs=new LinkedHashSet(0); boolean joining_mbrs=!new_mbrs.isEmpty(); new_mbrs.remove(gms.local_addr); // remove myself - cannot join myself (already joined) if(gms.view_id == null) { // we're probably not the coord anymore (we just left ourselves), let someone else do it // (client will retry when it doesn't get a response) if(log.isDebugEnabled()) log.debug("gms.view_id is null, I'm not the coordinator anymore (leaving=" + leaving + "); the new coordinator will handle the leave request"); return; } Vector current_members=gms.members.getMembers(); leaving_mbrs.retainAll(current_members); // remove all elements of leaving_mbrs which are not current members if(suspected_mbrs.remove(gms.local_addr)) { if(warn) log.warn("I am the coord and I'm being suspected -- will probably leave shortly"); } suspected_mbrs.retainAll(current_members); // remove all elements of suspected_mbrs which are not current members // for the members that have already joined, return the current digest and membership for(Iterator it=new_mbrs.iterator(); it.hasNext();) { Address mbr=(Address)it.next(); if(gms.members.contains(mbr)) { // already joined: return current digest and membership if(warn) log.warn(mbr + " already present; returning existing view " + gms.view); JoinRsp join_rsp=new JoinRsp(new View(gms.view_id, gms.members.getMembers()), gms.getDigest()); sendJoinResponse(join_rsp, mbr); it.remove(); } } if(new_mbrs.isEmpty() && leaving_mbrs.isEmpty() && suspected_mbrs.isEmpty()) { if(trace) log.trace("found no members to add or remove, will not create new view"); return; } JoinRsp join_rsp=null; try { View new_view=gms.getNextView(new_mbrs, leaving_mbrs, suspected_mbrs); if(log.isDebugEnabled()) log.debug("new=" + new_mbrs + ", suspected=" + suspected_mbrs + ", leaving=" + leaving_mbrs + ", new view: " + new_view); // we cannot garbage collect during joining a new member *if* we're the only member // Example: {A}, B joins, after returning JoinRsp to B, A garbage collects messages higher than those // in the digest returned to the client, so the client will *not* be able to ask for retransmission // of those messages if he misses them if(joining_mbrs) { gms.passDown(new Event(Event.SUSPEND_STABLE, MAX_SUSPEND_TIMEOUT)); Digest d=null, tmp=gms.getDigest(); // get existing digest if(tmp == null) log.error("received null digest from GET_DIGEST: will cause JOIN to fail"); else { // create a new digest, which contains the new member d=new Digest(tmp.size() + new_mbrs.size()); d.add(tmp); // add the existing digest to the new one for(Iterator i=new_mbrs.iterator(); i.hasNext();) d.add((Address)i.next(), 0, 0); // ... and add the new members. their first seqno will be 1 } join_rsp=new JoinRsp(new_view, d); } sendLeaveResponses(leaving_mbrs); // no-op if no leaving members // Send down a local TMP_VIEW event. This is needed by certain layers (e.g. NAKACK) to compute correct digest // in case client's next request (e.g. getState()) reaches us *before* our own view change multicast. // Check NAKACK's TMP_VIEW handling for details if(new_view != null) gms.passDown(new Event(Event.TMP_VIEW, new_view)); Vector tmp_mbrs=new_view != null? new Vector(new_view.getMembers()) : null; if(gms.use_flush) { // First we flush current members. Then we send a view to all joining member and we wait for their ACKs // together with ACKs from current members. After all ACKS have been collected, FLUSH is stopped // (below in finally clause) and members are allowed to send messages again gms.startFlush(new_view); sendJoinResponses(join_rsp, new_mbrs); // might be a no-op if no joining members gms.castViewChangeWithDest(new_view, null, tmp_mbrs); } else { if(tmp_mbrs != null) // exclude the newly joined member from VIEW_ACKs tmp_mbrs.removeAll(new_mbrs); // Broadcast the new view // we'll multicast the new view first and only, when everyone has replied with a VIEW_ACK (or timeout), // send the JOIN_RSP back to the client. This prevents the client from sending multicast messages in // view V2 which may get dropped by existing members because they're still in view V1. // (http://jira.jboss.com/jira/browse/JGRP-235) gms.castViewChangeWithDest(new_view, null, tmp_mbrs); sendJoinResponses(join_rsp, new_mbrs); // Return result to newly joined clients (if there are any) } } finally { if(joining_mbrs) gms.passDown(new Event(Event.RESUME_STABLE)); if(gms.use_flush) gms.stopFlush(); if(leaving) { gms.passUp(new Event(Event.DISCONNECT_OK)); gms.initState(); // in case connect() is called again } } } /** * Called by the GMS when a VIEW is received. * @param new_view The view to be installed * @param digest If view is a MergeView, digest contains the seqno digest of all members and has to * be set by GMS */ public void handleViewChange(View new_view, Digest digest) { Vector mbrs=new_view.getMembers(); if(log.isDebugEnabled()) { if(digest != null) log.debug("view=" + new_view + ", digest=" + digest); else log.debug("view=" + new_view); } if(leaving && !mbrs.contains(gms.local_addr)) return; gms.installView(new_view, digest); } public void handleExit() { cancelMerge(); } public void stop() { super.stop(); // sets leaving=false stopMergeTask(); } /* ------------------------------------------ Private methods ----------------------------------------- */ void startMergeTask(Vector coords) { synchronized(merge_task) { merge_task.start(coords); } } void stopMergeTask() { synchronized(merge_task) { merge_task.stop(); } } private void sendJoinResponses(JoinRsp rsp, Collection c) { if(c != null && rsp != null) { for(Iterator it=c.iterator(); it.hasNext();) { sendJoinResponse(rsp, (Address)it.next()); } } } private void sendJoinResponse(JoinRsp rsp, Address dest) { Message m=new Message(dest, null, null); GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.JOIN_RSP, rsp); m.putHeader(gms.getName(), hdr); gms.passDown(new Event(Event.MSG, m)); } private void sendLeaveResponses(Collection c) { for(Iterator i=c.iterator(); i.hasNext();) { Message msg=new Message((Address)i.next(), null, null); // send an ack to the leaving member GMS.GmsHeader hdr=new GMS.GmsHeader(GMS.GmsHeader.LEAVE_RSP); msg.putHeader(gms.getName(), hdr); gms.passDown(new Event(Event.MSG, msg)); } } /** * Sends a MERGE_REQ to all coords and populates a list of MergeData (in merge_rsps). Returns after coords.size() * response have been received, or timeout msecs have elapsed (whichever is first).<p> * If a subgroup coordinator rejects the MERGE_REQ (e.g. because of participation in a different merge), * <em>that member will be removed from coords !</em> * @param coords A list of Addresses of subgroup coordinators (inluding myself) * @param timeout Max number of msecs to wait for the merge responses from the subgroup coords */ private void getMergeDataFromSubgroupCoordinators(Vector coords, long timeout) { Message msg; GMS.GmsHeader hdr; long curr_time, time_to_wait, end_time, start, stop; int num_rsps_expected; if(coords == null || coords.size() <= 1) { if(log.isErrorEnabled()) log.error("coords == null or size <= 1"); return; } start=System.currentTimeMillis(); MergeData tmp; synchronized(merge_rsps) { merge_rsps.removeAllElements(); if(log.isDebugEnabled()) log.debug("sending MERGE_REQ to " + coords); Address coord; for(int i=0; i < coords.size(); i++) { coord=(Address)coords.elementAt(i); if(gms.local_addr != null && gms.local_addr.equals(coord)) { tmp=getMergeResponse(gms.local_addr, merge_id); if(tmp != null) merge_rsps.add(tmp); continue; } // this allows UNICAST to remove coord from previous_members in case of a merge gms.passDown(new Event(Event.ENABLE_UNICASTS_TO, coord)); msg=new Message(coord, null, null); hdr=new GMS.GmsHeader(GMS.GmsHeader.MERGE_REQ); hdr.mbr=gms.local_addr; hdr.merge_id=merge_id; msg.putHeader(gms.getName(), hdr); gms.passDown(new Event(Event.MSG, msg)); } // wait until num_rsps_expected >= num_rsps or timeout elapsed num_rsps_expected=coords.size(); curr_time=System.currentTimeMillis(); end_time=curr_time + timeout; while(end_time > curr_time) { time_to_wait=end_time - curr_time; if(log.isDebugEnabled()) log.debug("waiting " + time_to_wait + " msecs for merge responses"); if(merge_rsps.size() < num_rsps_expected) { try { merge_rsps.wait(time_to_wait);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -