⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 fd_prob.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
// $Id: FD_PROB.java,v 1.10 2006/02/07 07:57:50 belaban Exp $package org.jgroups.protocols;import org.jgroups.*;import org.jgroups.stack.Protocol;import org.jgroups.util.Util;import org.jgroups.util.Streamable;import java.io.*;import java.util.Enumeration;import java.util.Hashtable;import java.util.Properties;import java.util.Vector;/** * Probabilistic failure detection protocol based on "A Gossip-Style Failure Detection Service" * by Renesse, Minsky and Hayden.<p> * Each member maintains a list of all other members: for each member P, 2 data are maintained, a heartbeat * counter and the time of the last increment of the counter. Each member periodically sends its own heartbeat * counter list to a randomly chosen member Q. Q updates its own heartbeat counter list and the associated * time (if counter was incremented). Each member periodically increments its own counter. If, when sending * its heartbeat counter list, a member P detects that another member Q's heartbeat counter was not incremented * for timeout seconds, Q will be suspected.<p> * This protocol can be used both with a PBCAST *and* regular stacks. * @author Bela Ban 1999 * @version $Revision: 1.10 $ */public class FD_PROB extends Protocol implements Runnable {    Address local_addr=null;    Thread hb=null;    long timeout=3000;  // before a member with a non updated timestamp is suspected    long gossip_interval=1000;    Vector members=null;    final Hashtable counters=new Hashtable();        // keys=Addresses, vals=FdEntries    final Hashtable invalid_pingers=new Hashtable(); // keys=Address, vals=Integer (number of pings from suspected mbrs)    int max_tries=2;   // number of times to send a are-you-alive msg (tot time= max_tries*timeout)    public String getName() {        return "FD_PROB";    }    public boolean setProperties(Properties props) {        String str;        super.setProperties(props);        str=props.getProperty("timeout");        if(str != null) {            timeout=Long.parseLong(str);            props.remove("timeout");        }        str=props.getProperty("gossip_interval");        if(str != null) {            gossip_interval=Long.parseLong(str);            props.remove("gossip_interval");        }        str=props.getProperty("max_tries");        if(str != null) {            max_tries=Integer.parseInt(str);            props.remove("max_tries");        }        if(props.size() > 0) {            log.error("FD_PROB.setProperties(): the following properties are not recognized: " + props);            return false;        }        return true;    }    public void start() throws Exception {        if(hb == null) {            hb=new Thread(this, "FD_PROB.HeartbeatThread");            hb.setDaemon(true);            hb.start();        }    }    public void stop() {        Thread tmp=null;        if(hb != null && hb.isAlive()) {            tmp=hb;            hb=null;            tmp.interrupt();            try {                tmp.join(timeout);            }            catch(Exception ex) {            }        }        hb=null;    }    public void up(Event evt) {        Message msg;        FdHeader hdr=null;        Object obj;        switch(evt.getType()) {            case Event.SET_LOCAL_ADDRESS:                local_addr=(Address) evt.getArg();                break;            case Event.MSG:                msg=(Message) evt.getArg();                obj=msg.getHeader(getName());                if(obj == null || !(obj instanceof FdHeader)) {                    updateCounter(msg.getSrc());  // got a msg from this guy, reset its time (we heard from it now)                    break;                }                hdr=(FdHeader) msg.removeHeader(getName());                switch(hdr.type) {                    case FdHeader.HEARTBEAT:                           // heartbeat request; send heartbeat ack                        if(checkPingerValidity(msg.getSrc()) == false) // false == sender of heartbeat is not a member                            return;                        // 2. Update my own array of counters                            if(log.isInfoEnabled()) log.info("<-- HEARTBEAT from " + msg.getSrc());                        updateCounters(hdr);                        return;                                     // don't pass up !                    case FdHeader.NOT_MEMBER:                        if(warn) log.warn("NOT_MEMBER: I'm being shunned; exiting");                        passUp(new Event(Event.EXIT));                        return;                    default:                        if(warn) log.warn("FdHeader type " + hdr.type + " not known");                        return;                }        }        passUp(evt);                                        // pass up to the layer above us    }    public void down(Event evt) {        int num_mbrs;        Vector excluded_mbrs;        FdEntry entry;        Address mbr;        switch(evt.getType()) {            // Start heartbeat thread when we have more than 1 member; stop it when membership drops below 2            case Event.VIEW_CHANGE:                passDown(evt);                synchronized(this) {                    View v=(View) evt.getArg();                    // mark excluded members                    excluded_mbrs=computeExcludedMembers(members, v.getMembers());                    if(excluded_mbrs != null && excluded_mbrs.size() > 0) {                        for(int i=0; i < excluded_mbrs.size(); i++) {                            mbr=(Address) excluded_mbrs.elementAt(i);                            entry=(FdEntry) counters.get(mbr);                            if(entry != null)                                entry.setExcluded(true);                        }                    }                    members=v != null ? v.getMembers() : null;                    if(members != null) {                        num_mbrs=members.size();                        if(num_mbrs >= 2) {                            if(hb == null) {                                try {                                    start();                                }                                catch(Exception ex) {                                    if(warn) log.warn("exception when calling start(): " + ex);                                }                            }                        }                        else                            stop();                    }                }                break;            default:                passDown(evt);                break;        }    }    /**     Loop while more than 1 member available. Choose a member randomly (not myself !) and send a     heartbeat. Wait for ack. If ack not received withing timeout, mcast SUSPECT message.     */    public void run() {        Message hb_msg;        FdHeader hdr;        Address hb_dest, key;        FdEntry entry;        long curr_time, diff;            if(log.isInfoEnabled()) log.info("heartbeat thread was started");        while(hb != null && members.size() > 1) {            // 1. Get a random member P (excluding ourself)            hb_dest=getHeartbeatDest();            if(hb_dest == null) {                if(warn) log.warn("hb_dest is null");                Util.sleep(gossip_interval);                continue;            }            // 2. Increment own counter            entry=(FdEntry) counters.get(local_addr);            if(entry == null) {                entry=new FdEntry();                counters.put(local_addr, entry);            }            entry.incrementCounter();            // 3. Send heartbeat to P            hdr=createHeader();            if(hdr == null)                if(warn) log.warn("header could not be created. Heartbeat will not be sent");            else {                hb_msg=new Message(hb_dest, null, null);                hb_msg.putHeader(getName(), hdr);                    if(log.isInfoEnabled()) log.info("--> HEARTBEAT to " + hb_dest);                passDown(new Event(Event.MSG, hb_msg));            }                if(log.isInfoEnabled()) log.info("own counters are " + printCounters());            // 4. Suspect members from which we haven't heard for timeout msecs            for(Enumeration e=counters.keys(); e.hasMoreElements();) {                curr_time=System.currentTimeMillis();                key=(Address) e.nextElement();                entry=(FdEntry) counters.get(key);                if(entry.getTimestamp() > 0 && (diff=curr_time - entry.getTimestamp()) >= timeout) {                    if(entry.excluded()) {                        if(diff >= 2 * timeout) {  // remove members marked as 'excluded' after 2*timeout msecs                            counters.remove(key);                            if(log.isInfoEnabled()) log.info("removed " + key);                        }                    }                    else {                        if(log.isInfoEnabled()) log.info("suspecting " + key);                        passUp(new Event(Event.SUSPECT, key));                    }                }            }            Util.sleep(gossip_interval);        } // end while            if(log.isInfoEnabled()) log.info("heartbeat thread was stopped");    }    /* -------------------------------- Private Methods ------------------------------- */    Address getHeartbeatDest() {        Address retval=null;        int r, size;        Vector members_copy;        if(members == null || members.size() < 2 || local_addr == null)            return null;        members_copy=(Vector) members.clone();        members_copy.removeElement(local_addr); // don't select myself as heartbeat destination        size=members_copy.size();        r=((int) (Math.random() * (size + 1))) % size;        retval=(Address) members_copy.elementAt(r);        return retval;    }    /** Create a header containing the counters for all members */    FdHeader createHeader() {        int num_mbrs=counters.size(), index=0;        FdHeader ret=null;        Address key;        FdEntry entry;        if(num_mbrs <= 0)            return null;        ret=new FdHeader(FdHeader.HEARTBEAT, num_mbrs);        for(Enumeration e=counters.keys(); e.hasMoreElements();) {            key=(Address) e.nextElement();            entry=(FdEntry) counters.get(key);            if(entry.excluded())                continue;            if(index >= ret.members.length) {                if(warn) log.warn("index " + index + " is out of bounds (" +                                                     ret.members.length + ')');                break;            }            ret.members[index]=key;            ret.counters[index]=entry.getCounter();            index++;        }        return ret;    }    /** Set my own counters values to max(own-counter, counter) */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -