📄 gms.java
字号:
if(mbrs != null && mbrs.size() > 0) this.mbrs.set(mbrs); // Send VIEW_CHANGE event up and down the stack: Event view_event=new Event(Event.VIEW_CHANGE, makeView(this.mbrs.getMembers())); passDown(view_event); // needed e.g. by failure detector or UDP passUp(view_event); coord=determineCoordinator(); if(coord != null && coord.equals(local_addr)) { if (! haveCoordinatorRole()) // this avoids deadlock on coordinator - when suspect/join occurs simultaneously becomeCoordinator(); } else { if(haveCoordinatorRole() && !local_addr.equals(coord)) becomeParticipant(); } } } protected Address determineCoordinator() { synchronized(mbrs) { return mbrs != null && mbrs.size() > 0? (Address)mbrs.elementAt(0) : null; } } /** * Returns true if local_addr is member of mbrs, else false */ protected boolean checkSelfInclusion(Vector mbrs) { Object mbr; if(mbrs == null) return false; for(int i=0; i < mbrs.size(); i++) { mbr=mbrs.elementAt(i); if(mbr != null && local_addr.equals(mbr)) return true; } return false; } public View makeView(Vector mbrs) { Address coord=null; long id=0; if(view_id != null) { coord=view_id.getCoordAddress(); id=view_id.getId(); } return new View(coord, id, mbrs); } public static View makeView(Vector mbrs, ViewId vid) { Address coord=null; long id=0; if(vid != null) { coord=vid.getCoordAddress(); id=vid.getId(); } return new View(coord, id, mbrs); } /* ------------------------- Request handler methods ----------------------------- */ public void join(Address mbr) { synchronized(impl_mutex) { impl.join(mbr); } } public void leave(Address mbr) { synchronized(impl_mutex) { impl.leave(mbr); } } public void suspect(Address mbr) { synchronized(impl_mutex) { impl.suspect(mbr); } } public void merge(Vector other_coords) { synchronized(impl_mutex) { impl.merge(other_coords); } } public boolean handleJoin(Address mbr) { synchronized(impl_mutex) { return impl.handleJoin(mbr); } } public void handleLeave(Address mbr, boolean suspected) { synchronized(impl_mutex) { impl.handleLeave(mbr, suspected); } } public void handleViewChange(ViewId new_view, Vector mbrs) {// synchronized (impl_mutex ) { impl.handleViewChange(new_view, mbrs);// } } public View handleMerge(ViewId other_vid, Vector other_members) { synchronized(impl_mutex) { if(trace) { View v=impl.handleMerge(other_vid, other_members); if(log.isInfoEnabled()) log.info("returning view: " + v); return v; } return impl.handleMerge(other_vid, other_members); } } public void handleSuspect(Address mbr) { synchronized(impl_mutex) { impl.handleSuspect(mbr); } } /* --------------------- End of Request handler methods -------------------------- */ boolean checkForViewEnforcer(Protocol up_protocol) { String prot_name; if(up_protocol == null) return false; prot_name=up_protocol.getName(); if(prot_name != null && "VIEW_ENFORCER".equals(prot_name)) return true; return checkForViewEnforcer(up_protocol.getUpProtocol()); } /** * <b>Callback</b>. Called by superclass when event may be handled.<p> * <b>Do not use <code>PassUp</code> in this method as the event is passed up * by default by the superclass after this method returns !</b> * * @return boolean Defaults to true. If false, event will not be passed up the stack. */ public boolean handleUpEvent(Event evt) { switch(evt.getType()) { case Event.CONNECT_OK: // sent by someone else, but WE are responsible for sending this ! case Event.DISCONNECT_OK: // dito (e.g. sent by UDP layer) return false; case Event.SET_LOCAL_ADDRESS: local_addr=(Address)evt.getArg(); if(print_local_addr) { System.out.println("\n-------------------------------------------------------\n" + "GMS: address is " + local_addr + "\n-------------------------------------------------------"); } return true; // pass up case Event.SUSPECT: try { event_queue.add(evt); } catch(Exception e) { } return true; // pass up case Event.MERGE: try { event_queue.add(evt); } catch(Exception e) { } return false; // don't pass up case Event.FLUSH_OK: synchronized(flush_mutex) { flush_rsp=(FlushRsp)evt.getArg(); flush_mutex.notifyAll(); } return false; // don't pass up case Event.REBROADCAST_MSGS_OK: synchronized(rebroadcast_mutex) { rebroadcast_mutex.notifyAll(); } return false; // don't pass up } return impl.handleUpEvent(evt); } /** * <b>Callback</b>. Called by superclass when event may be handled.<p> * <b>Do not use <code>PassDown</code> in this method as the event is passed down * by default by the superclass after this method returns !</b> * * @return boolean Defaults to true. If false, event will not be passed down the stack. */ public boolean handleDownEvent(Event evt) { switch(evt.getType()) { case Event.CONNECT: passDown(evt); try { group_addr=(String)evt.getArg(); } catch(ClassCastException cce) { if(log.isErrorEnabled()) log.error("group address must " + "be a string (group name) to make sense"); } impl.join(local_addr); passUp(new Event(Event.CONNECT_OK)); startEventHandlerThread(); return false; // don't pass down: was already passed down case Event.DISCONNECT: impl.leave((Address)evt.getArg()); passUp(new Event(Event.DISCONNECT_OK)); stopEventHandlerThread(); initState(); return true; // pass down } return impl.handleDownEvent(evt); } // Priority handling, otherwise GMS.down(DISCONNECT) would block ! // Similar to FLUSH protocol public void receiveDownEvent(Event evt) { if(evt.getType() == Event.BLOCK_OK) { passDown(evt); return; } super.receiveDownEvent(evt); } /** * Setup the Protocol instance acording to the configuration string */ public boolean setProperties(Properties props) { String str; super.setProperties(props); str=props.getProperty("join_timeout"); // time to wait for JOIN if(str != null) { join_timeout=Long.parseLong(str); props.remove("join_timeout"); } str=props.getProperty("print_local_addr"); if(str != null) { print_local_addr=Boolean.valueOf(str).booleanValue(); props.remove("print_local_addr"); } str=props.getProperty("view_change_timeout"); // time to wait for VIEW_CHANGE if(str != null) { view_change_timeout=Long.parseLong(str); props.remove("view_change_timeout"); } str=props.getProperty("join_retry_timeout"); // time to wait between JOINs if(str != null) { join_retry_timeout=Long.parseLong(str); props.remove("join_retry_timeout"); } str=props.getProperty("leave_timeout"); // time to wait until coord responds to LEAVE req. if(str != null) { leave_timeout=Long.parseLong(str); props.remove("leave_timeout"); } str=props.getProperty("flush_timeout"); // time to wait until FLUSH completes (0=forever) if(str != null) { flush_timeout=Long.parseLong(str); props.remove("flush_timeout"); } str=props.getProperty("rebroadcast_unstable_msgs"); // bcast unstable msgs (recvd from FLUSH) if(str != null) { rebroadcast_unstable_msgs=Boolean.valueOf(str).booleanValue(); props.remove("rebroadcast_unstable_msgs"); } str=props.getProperty("rebroadcast_timeout"); // time to wait until REBROADCAST_MSGS completes if(str != null) { rebroadcast_timeout=Long.parseLong(str); props.remove("rebroadcast_timeout"); } str=props.getProperty("disable_initial_coord"); // allow initial mbr to become coord or not if(str != null) { disable_initial_coord=Boolean.valueOf(str).booleanValue(); props.remove("disable_initial_coord"); } if(props.size() > 0) { log.error("GMS.setProperties(): the following properties are not recognized: " + props); return false; } return true; } public void run() { Event evt; while(evt_thread != null && event_queue != null) { try { evt=(Event)event_queue.remove(); switch(evt.getType()) { case Event.SUSPECT: impl.suspect((Address)evt.getArg()); break; case Event.MERGE: impl.merge((Vector)evt.getArg()); break; default: if(log.isErrorEnabled()) log.error("event handler thread encountered event of type " + Event.type2String(evt.getType()) + ": not handled by me !"); break; } } catch(QueueClosedException closed) { break; } catch(Exception ex) { if(warn) log.warn("exception=" + ex); } } } /* ------------------------------- Private Methods --------------------------------- */ private void initState() { becomeClient(); impl.init(); view_id=null; if(mbrs != null) mbrs.clear(); } private void startEventHandlerThread() { if(event_queue == null) event_queue=new Queue(); if(evt_thread == null) { evt_thread=new Thread(this, "GMS.EventHandlerThread"); evt_thread.setDaemon(true); evt_thread.start(); } } private void stopEventHandlerThread() { if(evt_thread != null) { event_queue.close(false); event_queue=null; evt_thread=null; return; } if(event_queue != null) { event_queue.close(false); event_queue=null; } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -