📄 fd_prob.java
字号:
// $Id: FD_PROB.java,v 1.10 2006/02/07 07:57:50 belaban Exp $package org.jgroups.protocols;import org.jgroups.*;import org.jgroups.stack.Protocol;import org.jgroups.util.Util;import org.jgroups.util.Streamable;import java.io.*;import java.util.Enumeration;import java.util.Hashtable;import java.util.Properties;import java.util.Vector;/** * Probabilistic failure detection protocol based on "A Gossip-Style Failure Detection Service" * by Renesse, Minsky and Hayden.<p> * Each member maintains a list of all other members: for each member P, 2 data are maintained, a heartbeat * counter and the time of the last increment of the counter. Each member periodically sends its own heartbeat * counter list to a randomly chosen member Q. Q updates its own heartbeat counter list and the associated * time (if counter was incremented). Each member periodically increments its own counter. If, when sending * its heartbeat counter list, a member P detects that another member Q's heartbeat counter was not incremented * for timeout seconds, Q will be suspected.<p> * This protocol can be used both with a PBCAST *and* regular stacks. * @author Bela Ban 1999 * @version $Revision: 1.10 $ */public class FD_PROB extends Protocol implements Runnable { Address local_addr=null; Thread hb=null; long timeout=3000; // before a member with a non updated timestamp is suspected long gossip_interval=1000; Vector members=null; final Hashtable counters=new Hashtable(); // keys=Addresses, vals=FdEntries final Hashtable invalid_pingers=new Hashtable(); // keys=Address, vals=Integer (number of pings from suspected mbrs) int max_tries=2; // number of times to send a are-you-alive msg (tot time= max_tries*timeout) public String getName() { return "FD_PROB"; } public boolean setProperties(Properties props) { String str; super.setProperties(props); str=props.getProperty("timeout"); if(str != null) { timeout=Long.parseLong(str); props.remove("timeout"); } str=props.getProperty("gossip_interval"); if(str != null) { gossip_interval=Long.parseLong(str); props.remove("gossip_interval"); } str=props.getProperty("max_tries"); if(str != null) { max_tries=Integer.parseInt(str); props.remove("max_tries"); } if(props.size() > 0) { log.error("FD_PROB.setProperties(): the following properties are not recognized: " + props); return false; } return true; } public void start() throws Exception { if(hb == null) { hb=new Thread(this, "FD_PROB.HeartbeatThread"); hb.setDaemon(true); hb.start(); } } public void stop() { Thread tmp=null; if(hb != null && hb.isAlive()) { tmp=hb; hb=null; tmp.interrupt(); try { tmp.join(timeout); } catch(Exception ex) { } } hb=null; } public void up(Event evt) { Message msg; FdHeader hdr=null; Object obj; switch(evt.getType()) { case Event.SET_LOCAL_ADDRESS: local_addr=(Address) evt.getArg(); break; case Event.MSG: msg=(Message) evt.getArg(); obj=msg.getHeader(getName()); if(obj == null || !(obj instanceof FdHeader)) { updateCounter(msg.getSrc()); // got a msg from this guy, reset its time (we heard from it now) break; } hdr=(FdHeader) msg.removeHeader(getName()); switch(hdr.type) { case FdHeader.HEARTBEAT: // heartbeat request; send heartbeat ack if(checkPingerValidity(msg.getSrc()) == false) // false == sender of heartbeat is not a member return; // 2. Update my own array of counters if(log.isInfoEnabled()) log.info("<-- HEARTBEAT from " + msg.getSrc()); updateCounters(hdr); return; // don't pass up ! case FdHeader.NOT_MEMBER: if(warn) log.warn("NOT_MEMBER: I'm being shunned; exiting"); passUp(new Event(Event.EXIT)); return; default: if(warn) log.warn("FdHeader type " + hdr.type + " not known"); return; } } passUp(evt); // pass up to the layer above us } public void down(Event evt) { int num_mbrs; Vector excluded_mbrs; FdEntry entry; Address mbr; switch(evt.getType()) { // Start heartbeat thread when we have more than 1 member; stop it when membership drops below 2 case Event.VIEW_CHANGE: passDown(evt); synchronized(this) { View v=(View) evt.getArg(); // mark excluded members excluded_mbrs=computeExcludedMembers(members, v.getMembers()); if(excluded_mbrs != null && excluded_mbrs.size() > 0) { for(int i=0; i < excluded_mbrs.size(); i++) { mbr=(Address) excluded_mbrs.elementAt(i); entry=(FdEntry) counters.get(mbr); if(entry != null) entry.setExcluded(true); } } members=v != null ? v.getMembers() : null; if(members != null) { num_mbrs=members.size(); if(num_mbrs >= 2) { if(hb == null) { try { start(); } catch(Exception ex) { if(warn) log.warn("exception when calling start(): " + ex); } } } else stop(); } } break; default: passDown(evt); break; } } /** Loop while more than 1 member available. Choose a member randomly (not myself !) and send a heartbeat. Wait for ack. If ack not received withing timeout, mcast SUSPECT message. */ public void run() { Message hb_msg; FdHeader hdr; Address hb_dest, key; FdEntry entry; long curr_time, diff; if(log.isInfoEnabled()) log.info("heartbeat thread was started"); while(hb != null && members.size() > 1) { // 1. Get a random member P (excluding ourself) hb_dest=getHeartbeatDest(); if(hb_dest == null) { if(warn) log.warn("hb_dest is null"); Util.sleep(gossip_interval); continue; } // 2. Increment own counter entry=(FdEntry) counters.get(local_addr); if(entry == null) { entry=new FdEntry(); counters.put(local_addr, entry); } entry.incrementCounter(); // 3. Send heartbeat to P hdr=createHeader(); if(hdr == null) if(warn) log.warn("header could not be created. Heartbeat will not be sent"); else { hb_msg=new Message(hb_dest, null, null); hb_msg.putHeader(getName(), hdr); if(log.isInfoEnabled()) log.info("--> HEARTBEAT to " + hb_dest); passDown(new Event(Event.MSG, hb_msg)); } if(log.isInfoEnabled()) log.info("own counters are " + printCounters()); // 4. Suspect members from which we haven't heard for timeout msecs for(Enumeration e=counters.keys(); e.hasMoreElements();) { curr_time=System.currentTimeMillis(); key=(Address) e.nextElement(); entry=(FdEntry) counters.get(key); if(entry.getTimestamp() > 0 && (diff=curr_time - entry.getTimestamp()) >= timeout) { if(entry.excluded()) { if(diff >= 2 * timeout) { // remove members marked as 'excluded' after 2*timeout msecs counters.remove(key); if(log.isInfoEnabled()) log.info("removed " + key); } } else { if(log.isInfoEnabled()) log.info("suspecting " + key); passUp(new Event(Event.SUSPECT, key)); } } } Util.sleep(gossip_interval); } // end while if(log.isInfoEnabled()) log.info("heartbeat thread was stopped"); } /* -------------------------------- Private Methods ------------------------------- */ Address getHeartbeatDest() { Address retval=null; int r, size; Vector members_copy; if(members == null || members.size() < 2 || local_addr == null) return null; members_copy=(Vector) members.clone(); members_copy.removeElement(local_addr); // don't select myself as heartbeat destination size=members_copy.size(); r=((int) (Math.random() * (size + 1))) % size; retval=(Address) members_copy.elementAt(r); return retval; } /** Create a header containing the counters for all members */ FdHeader createHeader() { int num_mbrs=counters.size(), index=0; FdHeader ret=null; Address key; FdEntry entry; if(num_mbrs <= 0) return null; ret=new FdHeader(FdHeader.HEARTBEAT, num_mbrs); for(Enumeration e=counters.keys(); e.hasMoreElements();) { key=(Address) e.nextElement(); entry=(FdEntry) counters.get(key); if(entry.excluded()) continue; if(index >= ret.members.length) { if(warn) log.warn("index " + index + " is out of bounds (" + ret.members.length + ')'); break; } ret.members[index]=key; ret.counters[index]=entry.getCounter(); index++; } return ret; } /** Set my own counters values to max(own-counter, counter) */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -