fd.java
来自「JGRoups源码」· Java 代码 · 共 686 行 · 第 1/2 页
JAVA
686 行
// $Id: FD.java,v 1.40 2006/10/24 13:15:39 belaban Exp $package org.jgroups.protocols;import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;import org.jgroups.*;import org.jgroups.stack.Protocol;import org.jgroups.util.*;import java.io.*;import java.util.*;import java.util.List;/** * Failure detection based on simple heartbeat protocol. Regularly polls members for * liveness. Multicasts SUSPECT messages when a member is not reachable. The simple * algorithms works as follows: the membership is known and ordered. Each HB protocol * periodically sends an 'are-you-alive' message to its *neighbor*. A neighbor is the next in * rank in the membership list, which is recomputed upon a view change. When a response hasn't * been received for n milliseconds and m tries, the corresponding member is suspected (and * eventually excluded if faulty).<p> * FD starts when it detects (in a view change notification) that there are at least * 2 members in the group. It stops running when the membership drops below 2.<p> * When a message is received from the monitored neighbor member, it causes the pinger thread to * 'skip' sending the next are-you-alive message. Thus, traffic is reduced.<p> * When we receive a ping from a member that's not in the membership list, we shun it by sending it a * NOT_MEMBER message. That member will then leave the group (and possibly rejoin). This is only done if * <code>shun</code> is true. * @author Bela Ban * @version $Revision: 1.40 $ */public class FD extends Protocol { Address ping_dest=null; Address local_addr=null; long timeout=3000; // number of millisecs to wait for an are-you-alive msg long last_ack=System.currentTimeMillis(); int num_tries=0; int max_tries=2; // number of times to send a are-you-alive msg (tot time= max_tries*timeout) final List members=new CopyOnWriteArrayList(); final Hashtable invalid_pingers=new Hashtable(7); // keys=Address, val=Integer (number of pings from suspected mbrs) /** Members from which we select ping_dest. may be subset of {@link #members} */ final List pingable_mbrs=new CopyOnWriteArrayList(); boolean shun=true; TimeScheduler timer=null; private Monitor monitor=null; // task that performs the actual monitoring for failure detection private final Object monitor_mutex=new Object(); protected int num_heartbeats=0; protected int num_suspect_events=0; /** Transmits SUSPECT message until view change or UNSUSPECT is received */ protected final Broadcaster bcast_task=new Broadcaster(); final static String name="FD"; BoundedList suspect_history=new BoundedList(20); public String getName() {return name;} public String getLocalAddress() {return local_addr != null? local_addr.toString() : "null";} public String getMembers() {return members != null? members.toString() : "null";} public String getPingableMembers() {return pingable_mbrs != null? pingable_mbrs.toString() : "null";} public String getPingDest() {return ping_dest != null? ping_dest.toString() : "null";} public int getNumberOfHeartbeatsSent() {return num_heartbeats;} public int getNumSuspectEventsGenerated() {return num_suspect_events;} public long getTimeout() {return timeout;} public void setTimeout(long timeout) {this.timeout=timeout;} public int getMaxTries() {return max_tries;} public void setMaxTries(int max_tries) {this.max_tries=max_tries;} public int getCurrentNumTries() {return num_tries;} public boolean isShun() {return shun;} public void setShun(boolean flag) {this.shun=flag;} public String printSuspectHistory() { StringBuffer sb=new StringBuffer(); for(Enumeration en=suspect_history.elements(); en.hasMoreElements();) { sb.append(new Date()).append(": ").append(en.nextElement()).append("\n"); } return sb.toString(); } public boolean setProperties(Properties props) { String str; super.setProperties(props); str=props.getProperty("timeout"); if(str != null) { timeout=Long.parseLong(str); props.remove("timeout"); } str=props.getProperty("max_tries"); // before suspecting a member if(str != null) { max_tries=Integer.parseInt(str); props.remove("max_tries"); } str=props.getProperty("shun"); if(str != null) { shun=Boolean.valueOf(str).booleanValue(); props.remove("shun"); } if(props.size() > 0) { log.error("the following properties are not recognized: " + props); return false; } return true; } public void resetStats() { num_heartbeats=num_suspect_events=0; suspect_history.removeAll(); } public void init() throws Exception { if(stack != null && stack.timer != null) timer=stack.timer; else throw new Exception("FD.init(): timer cannot be retrieved from protocol stack"); } public void stop() { stopMonitor(); } private Object getPingDest(List mbrs) { Object tmp, retval=null; if(mbrs == null || mbrs.size() < 2 || local_addr == null) return null; for(int i=0; i < mbrs.size(); i++) { tmp=mbrs.get(i); if(local_addr.equals(tmp)) { if(i + 1 >= mbrs.size()) retval=mbrs.get(0); else retval=mbrs.get(i + 1); break; } } return retval; } private void startMonitor() { synchronized(monitor_mutex) { if(monitor != null && monitor.started == false) { monitor=null; } if(monitor == null) { monitor=createMonitor(); last_ack=System.currentTimeMillis(); // start from scratch timer.add(monitor, true); // fixed-rate scheduling num_tries=0; } } } private void stopMonitor() { synchronized(monitor_mutex) { if(monitor != null) { monitor.stop(); monitor=null; } } } protected Monitor createMonitor() { return new Monitor(); } public void up(Event evt) { Message msg; FdHeader hdr; Object sender, tmphdr; switch(evt.getType()) { case Event.SET_LOCAL_ADDRESS: local_addr=(Address)evt.getArg(); break; case Event.MSG: msg=(Message)evt.getArg(); tmphdr=msg.getHeader(name); if(tmphdr == null || !(tmphdr instanceof FdHeader)) { if(ping_dest != null && (sender=msg.getSrc()) != null) { if(ping_dest.equals(sender)) { last_ack=System.currentTimeMillis(); if(trace) log.trace("received msg from " + sender + " (counts as ack)"); num_tries=0; } } break; // message did not originate from FD layer, just pass up } hdr=(FdHeader)msg.removeHeader(name); switch(hdr.type) { case FdHeader.HEARTBEAT: // heartbeat request; send heartbeat ack Address hb_sender=msg.getSrc(); if(trace) log.trace("received are-you-alive from " + hb_sender + ", sending response"); sendHeartbeatResponse(hb_sender); // 2. Shun the sender of a HEARTBEAT message if that sender is not a member. This will cause // the sender to leave the group (and possibly rejoin it later) if(shun) shunInvalidHeartbeatSender(hb_sender); break; // don't pass up ! case FdHeader.HEARTBEAT_ACK: // heartbeat ack if(ping_dest != null && ping_dest.equals(hdr.from)) { last_ack=System.currentTimeMillis(); num_tries=0; if(log.isDebugEnabled()) log.debug("received ack from " + hdr.from); } else { stop(); ping_dest=(Address)getPingDest(pingable_mbrs); if(ping_dest != null) { try { startMonitor(); } catch(Exception ex) { if(warn) log.warn("exception when calling startMonitor(): " + ex); } } } break; case FdHeader.SUSPECT: if(hdr.mbrs != null) { if(trace) log.trace("[SUSPECT] suspect hdr is " + hdr); for(int i=0; i < hdr.mbrs.size(); i++) { Address m=(Address)hdr.mbrs.elementAt(i); if(local_addr != null && m.equals(local_addr)) { if(warn) log.warn("I was suspected by " + msg.getSrc() + "; ignoring the SUSPECT " + "message and sending back a HEARTBEAT_ACK"); sendHeartbeatResponse(msg.getSrc()); continue; } else { pingable_mbrs.remove(m); ping_dest=(Address)getPingDest(pingable_mbrs); } passUp(new Event(Event.SUSPECT, m)); passDown(new Event(Event.SUSPECT, m)); } } break; case FdHeader.NOT_MEMBER: if(shun) { if(log.isDebugEnabled()) log.debug("[NOT_MEMBER] I'm being shunned; exiting"); passUp(new Event(Event.EXIT)); } break; } return; } passUp(evt); // pass up to the layer above us } public void down(Event evt) { View v; switch(evt.getType()) { case Event.VIEW_CHANGE: passDown(evt); stop(); synchronized(this) { v=(View)evt.getArg(); members.clear(); members.addAll(v.getMembers()); bcast_task.adjustSuspectedMembers(members); pingable_mbrs.clear(); pingable_mbrs.addAll(members); ping_dest=(Address)getPingDest(pingable_mbrs); if(ping_dest != null) { try { startMonitor(); } catch(Exception ex) { if(warn) log.warn("exception when calling startMonitor(): " + ex); } } } break; case Event.UNSUSPECT: unsuspect((Address)evt.getArg()); passDown(evt); break; default: passDown(evt); break; } } private void sendHeartbeatResponse(Address dest) { Message hb_ack=new Message(dest, null, null); FdHeader tmp_hdr=new FdHeader(FdHeader.HEARTBEAT_ACK); tmp_hdr.from=local_addr; hb_ack.putHeader(name, tmp_hdr); passDown(new Event(Event.MSG, hb_ack)); } private void unsuspect(Address mbr) { bcast_task.removeSuspectedMember(mbr); pingable_mbrs.clear(); pingable_mbrs.addAll(members); pingable_mbrs.removeAll(bcast_task.getSuspectedMembers()); ping_dest=(Address)getPingDest(pingable_mbrs); } /** * If sender is not a member, send a NOT_MEMBER to sender (after n pings received) */ private void shunInvalidHeartbeatSender(Address hb_sender) { int num_pings=0; Message shun_msg; if(hb_sender != null && members != null && !members.contains(hb_sender)) {
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?