📄 fd.java
字号:
// $Id: FD.java,v 1.9 2005/08/11 12:43:46 belaban Exp $package org.jgroups.protocols.pbcast;import org.jgroups.Address;import org.jgroups.Event;import org.jgroups.Message;import org.jgroups.View;import org.jgroups.stack.Protocol;import org.jgroups.util.Util;import java.util.Enumeration;import java.util.Hashtable;import java.util.Properties;import java.util.Vector;/** * Passive failure detection protocol. It assumes a pbcast protocol, which uses rounds of gossiping for * reliable message delivery. Gossip protocols typically involve all the members sending gossips in regular * intervals. This protocol therefore works as follows: it allocates a timestamp for each member and updates * the timestamp whenever it receives a message from a sender P. Any type of message is accepted from P. For * example, PBCAST regularly sends the following messages: * <ul> * <li>regular mcast message from P * <li>regular ucast message from P * <li>gossip from P * <li>retransmit request from P * <li>retransmit response from P * </ul> * * @author Bela Ban */public class FD extends Protocol implements Runnable { Address local_addr=null; Thread checker=null; // checks timestamps for timeout, generates SUSPECT event final Object checker_lock=new Object(); long timeout=6000; // number of millisecs to wait for a member to be suspected // (should be higher than the gossip_interval value in PBCAST final Hashtable members=new Hashtable(11); // keys=Addresses (members), vals=Entries (timestamp) final Vector suspected_mbrs=new Vector(11); // currently suspected members (dynamically updated) static class Entry { long timestamp; Entry(long timestamp) { this.timestamp=timestamp; } public String toString() { return Long.toString(timestamp); } } public String getName() { return "FD"; } public boolean setProperties(Properties props) { String str; super.setProperties(props); str=props.getProperty("timeout"); if(str != null) { timeout=Long.parseLong(str); props.remove("timeout"); } if(props.size() > 0) { log.error("FD.setProperties(): the following properties are not recognized: " + props); return false; } return true; } public void stop() { stopChecker(); } public void up(Event evt) { Message msg; Address sender; switch(evt.getType()) { case Event.SET_LOCAL_ADDRESS: local_addr=(Address)evt.getArg(); break; case Event.MSG: msg=(Message)evt.getArg(); sender=msg.getSrc(); updateSender(sender); break; } passUp(evt); // pass up to the layer above us } public void down(Event evt) { View v; Vector mbrs; Address mbr; switch(evt.getType()) { case Event.VIEW_CHANGE: v=(View)evt.getArg(); mbrs=v.getMembers(); passDown(evt); for(Enumeration e=members.keys(); e.hasMoreElements();) { mbr=(Address)e.nextElement(); if(!mbrs.contains(mbr)) { members.remove(mbr); } } members.remove(local_addr); if(members.size() > 0 && checker == null) startChecker(); return; // generated by PBCAST, contains list of members a gossip has visited. we can safely reset their counter case Event.HEARD_FROM: updateSenders((Vector)evt.getArg()); return; // don't pass down } passDown(evt); } public void run() { Address mbr; long timestamp, diff; while(checker != null && Thread.currentThread().equals(checker) && members.size() > 0) { for(Enumeration e=members.keys(); e.hasMoreElements();) { mbr=(Address)e.nextElement(); timestamp=((Entry)members.get(mbr)).timestamp; diff=System.currentTimeMillis() - timestamp; if(diff >= timeout) { if(log.isInfoEnabled()) log.info("suspecting " + mbr); passUp(new Event(Event.SUSPECT, mbr)); if(!suspected_mbrs.contains(mbr)) suspected_mbrs.addElement(mbr); } } Util.sleep(timeout); } checker=null; } void startChecker() { synchronized(checker_lock) { if(checker == null) { checker=new Thread(this, "FD.CheckerThread"); checker.setDaemon(true); checker.start(); } } } void stopChecker() { Thread tmp; synchronized(checker_lock) { if(checker != null && checker.isAlive()) { tmp=checker; checker=null; tmp.interrupt(); try { tmp.join(timeout); } catch(Exception ex) { } if(tmp.isAlive()) if(warn) log.warn("interrupted checker thread is still alive !"); } checker=null; } } void updateSender(Address mbr) { Entry entry; long curr_time; if(mbr == null) { if(log.isDebugEnabled()) log.debug("member " + mbr + " not found"); return; } if(suspected_mbrs.size() > 0 && suspected_mbrs.contains(mbr)) { passUp(new Event(Event.UNSUSPECT, mbr)); suspected_mbrs.remove(mbr); } if(mbr.equals(local_addr)) return; entry=(Entry)members.get(mbr); curr_time=System.currentTimeMillis(); if(entry != null) entry.timestamp=curr_time; else members.put(mbr, new Entry(curr_time)); } void updateSenders(Vector v) { Address mbr; if(v == null) return; for(int i=0; i < v.size(); i++) { mbr=(Address)v.elementAt(i); updateSender(mbr); } } String printTimestamps() { StringBuffer sb=new StringBuffer(); Address mbr; synchronized(members) { for(Enumeration e=members.keys(); e.hasMoreElements();) { mbr=(Address)e.nextElement(); sb.append("\n" + mbr + ": " + (System.currentTimeMillis() - ((Entry)members.get(mbr)).timestamp)); } } return sb.toString(); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -