📄 gms.java
字号:
* @param members */ public void castViewChangeWithDest(View new_view, Digest digest, java.util.List members) { Message view_change_msg; GmsHeader hdr; long start, stop; ViewId vid=new_view.getVid(); int size=-1; if(members == null || members.size() == 0) members=new_view.getMembers(); if(log.isTraceEnabled()) log.trace("mcasting view {" + new_view + "} (" + new_view.size() + " mbrs)\n"); start=System.currentTimeMillis(); view_change_msg=new Message(); // bcast to all members hdr=new GmsHeader(GmsHeader.VIEW, new_view); hdr.my_digest=digest; view_change_msg.putHeader(name, hdr); ack_collector.reset(vid, members); size=ack_collector.size(); passDown(new Event(Event.MSG, view_change_msg)); try { ack_collector.waitForAllAcks(view_ack_collection_timeout); stop=System.currentTimeMillis(); if(trace) log.trace("received all ACKs (" + size + ") for " + vid + " in " + (stop-start) + "ms"); } catch(TimeoutException e) { log.warn("failed to collect all ACKs (" + size + ") for view " + new_view + " after " + view_ack_collection_timeout + "ms, missing ACKs from " + ack_collector.printMissing() + " (received=" + ack_collector.printReceived() + "), local_addr=" + local_addr); } } /** * Sets the new view and sends a VIEW_CHANGE event up and down the stack. If the view is a MergeView (subclass * of View), then digest will be non-null and has to be set before installing the view. */ public void installView(View new_view, Digest digest) { if(digest != null) mergeDigest(digest); installView(new_view); } /** * Sets the new view and sends a VIEW_CHANGE event up and down the stack. */ public void installView(View new_view) { Address coord; int rc; ViewId vid=new_view.getVid(); Vector mbrs=new_view.getMembers(); if(log.isDebugEnabled()) log.debug("[local_addr=" + local_addr + "] view is " + new_view); if(stats) { num_views++; prev_views.add(new_view); } ack_collector.handleView(new_view); // Discards view with id lower than our own. Will be installed without check if first view if(view_id != null) { rc=vid.compareTo(view_id); if(rc <= 0) { if(log.isTraceEnabled() && rc < 0) // only scream if view is smaller, silently discard same views log.trace("[" + local_addr + "] received view < current view;" + " discarding it (current vid: " + view_id + ", new vid: " + vid + ')'); return; } } ltime=Math.max(vid.getId(), ltime); // compute Lamport logical time /* Check for self-inclusion: if I'm not part of the new membership, I just discard it. This ensures that messages sent in view V1 are only received by members of V1 */ if(checkSelfInclusion(mbrs) == false) { // only shun if this member was previously part of the group. avoids problem where multiple // members (e.g. X,Y,Z) join {A,B} concurrently, X is joined first, and Y and Z get view // {A,B,X}, which would cause Y and Z to be shunned as they are not part of the membership // bela Nov 20 2003 if(shun && local_addr != null && prev_members.contains(local_addr)) { if(warn) log.warn("I (" + local_addr + ") am not a member of view " + new_view + ", shunning myself and leaving the group (prev_members are " + prev_members + ", current view is " + view + ")"); if(impl != null) impl.handleExit(); passUp(new Event(Event.EXIT)); } else { if(warn) log.warn("I (" + local_addr + ") am not a member of view " + new_view + "; discarding view"); } return; } synchronized(members) { // serialize access to views // assign new_view to view_id if(new_view instanceof MergeView) view=new View(new_view.getVid(), new_view.getMembers()); else view=new_view; view_id=vid.copy(); // Set the membership. Take into account joining members if(mbrs != null && mbrs.size() > 0) { members.set(mbrs); tmp_members.set(members); joining.removeAll(mbrs); // remove all members in mbrs from joining // remove all elements from 'leaving' that are not in 'mbrs' leaving.retainAll(mbrs); tmp_members.add(joining); // add members that haven't yet shown up in the membership tmp_members.remove(leaving); // remove members that haven't yet been removed from the membership // add to prev_members for(Iterator it=mbrs.iterator(); it.hasNext();) { Address addr=(Address)it.next(); if(!prev_members.contains(addr)) prev_members.add(addr); } } // Send VIEW_CHANGE event up and down the stack: Event view_event=new Event(Event.VIEW_CHANGE, new_view.clone()); // changed order of passing view up and down (http://jira.jboss.com/jira/browse/JGRP-347) passUp(view_event); passDown(view_event); // needed e.g. by failure detector or UDP coord=determineCoordinator(); // if(coord != null && coord.equals(local_addr) && !(coord.equals(vid.getCoordAddress()))) { // changed on suggestion by yaronr and Nicolas Piedeloupe if(coord != null && coord.equals(local_addr) && !haveCoordinatorRole()) { becomeCoordinator(); } else { if(haveCoordinatorRole() && !local_addr.equals(coord)) becomeParticipant(); } } } protected Address determineCoordinator() { synchronized(members) { return members != null && members.size() > 0? (Address)members.elementAt(0) : null; } } /** Checks whether the potential_new_coord would be the new coordinator (2nd in line) */ protected boolean wouldBeNewCoordinator(Address potential_new_coord) { Address new_coord; if(potential_new_coord == null) return false; synchronized(members) { if(members.size() < 2) return false; new_coord=(Address)members.elementAt(1); // member at 2nd place return new_coord != null && new_coord.equals(potential_new_coord); } } /** Returns true if local_addr is member of mbrs, else false */ protected boolean checkSelfInclusion(Vector mbrs) { Object mbr; if(mbrs == null) return false; for(int i=0; i < mbrs.size(); i++) { mbr=mbrs.elementAt(i); if(mbr != null && local_addr.equals(mbr)) return true; } return false; } public View makeView(Vector mbrs) { Address coord=null; long id=0; if(view_id != null) { coord=view_id.getCoordAddress(); id=view_id.getId(); } return new View(coord, id, mbrs); } public View makeView(Vector mbrs, ViewId vid) { Address coord=null; long id=0; if(vid != null) { coord=vid.getCoordAddress(); id=vid.getId(); } return new View(coord, id, mbrs); } /** Send down a SET_DIGEST event */ public void setDigest(Digest d) { passDown(new Event(Event.SET_DIGEST, d)); } /** Send down a MERGE_DIGEST event */ public void mergeDigest(Digest d) { passDown(new Event(Event.MERGE_DIGEST, d)); } /** Sends down a GET_DIGEST event and waits for the GET_DIGEST_OK response, or timeout, whichever occurs first */ public Digest getDigest() { Digest ret=null; synchronized(digest_mutex) { digest_promise.reset(); passDown(Event.GET_DIGEST_EVT); try { ret=(Digest)digest_promise.getResultWithTimeout(digest_timeout); } catch(TimeoutException e) { if(log.isErrorEnabled()) log.error("digest could not be fetched from below"); } return ret; } } void startFlush(View new_view) { if (log.isDebugEnabled()) { log.debug("starting FLUSH, sending SUSPEND event"); } passUp(new Event(Event.SUSPEND,new_view)); try { flush_promise.getResultWithTimeout(flush_timeout); } catch(TimeoutException e) { log.warn("Initiator of flush and group coordinator " + local_addr + " timed out waiting for flush responses after " + flush_timeout + " msec"); } } void stopFlush() { if (log.isDebugEnabled()) { log.debug("sending RESUME event"); } passUp(new Event(Event.RESUME)); } public void up(Event evt) { Object obj; Message msg; GmsHeader hdr; MergeData merge_data; switch(evt.getType()) { case Event.MSG: msg=(Message)evt.getArg(); obj=msg.getHeader(name); if(obj == null || !(obj instanceof GmsHeader)) break; hdr=(GmsHeader)msg.removeHeader(name); switch(hdr.type) { case GmsHeader.JOIN_REQ: view_handler.add(new Request(Request.JOIN, hdr.mbr, false, null)); break; case GmsHeader.JOIN_RSP: impl.handleJoinResponse(hdr.join_rsp); break; case GmsHeader.LEAVE_REQ: if(log.isDebugEnabled()) log.debug("received LEAVE_REQ for " + hdr.mbr + " from " + msg.getSrc()); if(hdr.mbr == null) { if(log.isErrorEnabled()) log.error("LEAVE_REQ's mbr field is null"); return; } view_handler.add(new Request(Request.LEAVE, hdr.mbr, false, null)); break; case GmsHeader.LEAVE_RSP: impl.handleLeaveResponse(); break; case GmsHeader.VIEW: if(hdr.view == null) { if(log.isErrorEnabled()) log.error("[VIEW]: view == null"); return; } // send VIEW_ACK to sender of view Address coord=msg.getSrc(); Message view_ack=new Message(coord, null, null); GmsHeader tmphdr=new GmsHeader(GmsHeader.VIEW_ACK, hdr.view); view_ack.putHeader(name, tmphdr); if(trace) log.trace("sending VIEW_ACK to " + coord); passDown(new Event(Event.MSG, view_ack)); impl.handleViewChange(hdr.view, hdr.my_digest); break; case GmsHeader.VIEW_ACK: Object sender=msg.getSrc(); ack_collector.ack(sender); return; // don't pass further up case GmsHeader.MERGE_REQ: impl.handleMergeRequest(msg.getSrc(), hdr.merge_id); break; case GmsHeader.MERGE_RSP: merge_data=new MergeData(msg.getSrc(), hdr.view, hdr.my_digest); merge_data.merge_rejected=hdr.merge_rejected; impl.handleMergeResponse(merge_data, hdr.merge_id); break; case GmsHeader.INSTALL_MERGE_VIEW: impl.handleMergeView(new MergeData(msg.getSrc(), hdr.view, hdr.my_digest), hdr.merge_id); break; case GmsHeader.CANCEL_MERGE: impl.handleMergeCancelled(hdr.merge_id); break; default: if(log.isErrorEnabled()) log.error("GmsHeader with type=" + hdr.type + " not known"); } return; // don't pass up case Event.CONNECT_OK: // sent by someone else, but WE are responsible for sending this ! case Event.DISCONNECT_OK: // dito (e.g. sent by TP layer). Don't send up the stack return; case Event.SET_LOCAL_ADDRESS: local_addr=(Address)evt.getArg(); if(print_local_addr) { System.out.println("\n-------------------------------------------------------\n" + "GMS: address is " + local_addr + "\n-------------------------------------------------------"); } break; // pass up case Event.SUSPECT: Address suspected=(Address)evt.getArg(); view_handler.add(new Request(Request.SUSPECT, suspected, true, null)); ack_collector.suspect(suspected); break; // pass up case Event.UNSUSPECT: impl.unsuspect((Address)evt.getArg()); return; // discard case Event.MERGE: view_handler.add(new Request(Request.MERGE, null, false, (Vector)evt.getArg())); return; // don't pass up } if(impl.handleUpEvent(evt)) passUp(evt); } /** This method is overridden to avoid hanging on getDigest(): when a JOIN is received, the coordinator needs to retrieve the digest from the NAKACK layer. It therefore sends down a GET_DIGEST event, to which the NAKACK layer responds with a GET_DIGEST_OK event.<p> However, the GET_DIGEST_OK event will not be processed because the thread handling the JOIN request won't process the GET_DIGEST_OK event until the JOIN event returns. The receiveUpEvent() method is executed by the up-handler
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -