⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 fd.java

📁 JGRoups源码
💻 JAVA
字号:
// $Id: FD.java,v 1.9 2005/08/11 12:43:46 belaban Exp $package org.jgroups.protocols.pbcast;import org.jgroups.Address;import org.jgroups.Event;import org.jgroups.Message;import org.jgroups.View;import org.jgroups.stack.Protocol;import org.jgroups.util.Util;import java.util.Enumeration;import java.util.Hashtable;import java.util.Properties;import java.util.Vector;/** * Passive failure detection protocol. It assumes a pbcast protocol, which uses rounds of gossiping for * reliable message delivery. Gossip protocols typically involve all the members sending gossips in regular * intervals. This protocol therefore works as follows: it allocates a timestamp for each member and updates * the timestamp whenever it receives a message from a sender P. Any type of message is accepted from P. For * example, PBCAST regularly sends the following messages: * <ul> * <li>regular mcast message from P * <li>regular ucast message from P * <li>gossip from P * <li>retransmit request from P * <li>retransmit response from P * </ul> * * @author Bela Ban */public class FD extends Protocol implements Runnable {    Address local_addr=null;    Thread checker=null;   // checks timestamps for timeout, generates SUSPECT event    final Object checker_lock=new Object();    long timeout=6000;   // number of millisecs to wait for a member to be suspected    // (should be higher than the gossip_interval value in PBCAST    final Hashtable members=new Hashtable(11); // keys=Addresses (members), vals=Entries (timestamp)    final Vector suspected_mbrs=new Vector(11); // currently suspected members (dynamically updated)    static class Entry {        long timestamp;        Entry(long timestamp) {            this.timestamp=timestamp;        }        public String toString() {            return Long.toString(timestamp);        }    }    public String getName() {        return "FD";    }    public boolean setProperties(Properties props) {        String str;        super.setProperties(props);        str=props.getProperty("timeout");        if(str != null) {            timeout=Long.parseLong(str);            props.remove("timeout");        }        if(props.size() > 0) {            log.error("FD.setProperties(): the following properties are not recognized: " + props);                        return false;        }        return true;    }    public void stop() {        stopChecker();    }    public void up(Event evt) {        Message msg;        Address sender;        switch(evt.getType()) {            case Event.SET_LOCAL_ADDRESS:                local_addr=(Address)evt.getArg();                break;            case Event.MSG:                msg=(Message)evt.getArg();                sender=msg.getSrc();                updateSender(sender);                break;        }        passUp(evt);   // pass up to the layer above us    }    public void down(Event evt) {        View v;        Vector mbrs;        Address mbr;        switch(evt.getType()) {            case Event.VIEW_CHANGE:                v=(View)evt.getArg();                mbrs=v.getMembers();                passDown(evt);                for(Enumeration e=members.keys(); e.hasMoreElements();) {                    mbr=(Address)e.nextElement();                    if(!mbrs.contains(mbr)) {                        members.remove(mbr);                    }                }                members.remove(local_addr);                if(members.size() > 0 && checker == null)                    startChecker();                return;                // generated by PBCAST, contains list of members a gossip has visited. we can safely reset their counter            case Event.HEARD_FROM:                updateSenders((Vector)evt.getArg());                return;  // don't pass down        }        passDown(evt);    }    public void run() {        Address mbr;        long timestamp, diff;        while(checker != null && Thread.currentThread().equals(checker) && members.size() > 0) {            for(Enumeration e=members.keys(); e.hasMoreElements();) {                mbr=(Address)e.nextElement();                timestamp=((Entry)members.get(mbr)).timestamp;                diff=System.currentTimeMillis() - timestamp;                if(diff >= timeout) {                    if(log.isInfoEnabled()) log.info("suspecting " + mbr);                    passUp(new Event(Event.SUSPECT, mbr));                    if(!suspected_mbrs.contains(mbr))                        suspected_mbrs.addElement(mbr);                }            }            Util.sleep(timeout);        }        checker=null;    }    void startChecker() {        synchronized(checker_lock) {            if(checker == null) {                checker=new Thread(this, "FD.CheckerThread");                checker.setDaemon(true);                checker.start();            }        }    }    void stopChecker() {        Thread tmp;        synchronized(checker_lock) {            if(checker != null && checker.isAlive()) {                tmp=checker;                checker=null;                tmp.interrupt();                try {                    tmp.join(timeout);                }                catch(Exception ex) {                }                if(tmp.isAlive())                    if(warn) log.warn("interrupted checker thread is still alive !");            }            checker=null;        }    }    void updateSender(Address mbr) {        Entry entry;        long curr_time;        if(mbr == null) {            if(log.isDebugEnabled()) log.debug("member " + mbr + " not found");            return;        }        if(suspected_mbrs.size() > 0 && suspected_mbrs.contains(mbr)) {            passUp(new Event(Event.UNSUSPECT, mbr));            suspected_mbrs.remove(mbr);        }        if(mbr.equals(local_addr))            return;        entry=(Entry)members.get(mbr);        curr_time=System.currentTimeMillis();        if(entry != null)            entry.timestamp=curr_time;        else            members.put(mbr, new Entry(curr_time));    }    void updateSenders(Vector v) {        Address mbr;        if(v == null) return;        for(int i=0; i < v.size(); i++) {            mbr=(Address)v.elementAt(i);            updateSender(mbr);        }    }    String printTimestamps() {        StringBuffer sb=new StringBuffer();        Address mbr;        synchronized(members) {            for(Enumeration e=members.keys(); e.hasMoreElements();) {                mbr=(Address)e.nextElement();                sb.append("\n" + mbr + ": " + (System.currentTimeMillis() - ((Entry)members.get(mbr)).timestamp));            }        }        return sb.toString();    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -