fd.java

来自「JGRoups源码」· Java 代码 · 共 686 行 · 第 1/2 页

JAVA
686
字号
// $Id: FD.java,v 1.40 2006/10/24 13:15:39 belaban Exp $package org.jgroups.protocols;import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;import org.jgroups.*;import org.jgroups.stack.Protocol;import org.jgroups.util.*;import java.io.*;import java.util.*;import java.util.List;/** * Failure detection based on simple heartbeat protocol. Regularly polls members for * liveness. Multicasts SUSPECT messages when a member is not reachable. The simple * algorithms works as follows: the membership is known and ordered. Each HB protocol * periodically sends an 'are-you-alive' message to its *neighbor*. A neighbor is the next in * rank in the membership list, which is recomputed upon a view change. When a response hasn't * been received for n milliseconds and m tries, the corresponding member is suspected (and * eventually excluded if faulty).<p> * FD starts when it detects (in a view change notification) that there are at least * 2 members in the group. It stops running when the membership drops below 2.<p> * When a message is received from the monitored neighbor member, it causes the pinger thread to * 'skip' sending the next are-you-alive message. Thus, traffic is reduced.<p> * When we receive a ping from a member that's not in the membership list, we shun it by sending it a * NOT_MEMBER message. That member will then leave the group (and possibly rejoin). This is only done if * <code>shun</code> is true. * @author Bela Ban * @version $Revision: 1.40 $ */public class FD extends Protocol {    Address               ping_dest=null;    Address               local_addr=null;    long                  timeout=3000;  // number of millisecs to wait for an are-you-alive msg    long                  last_ack=System.currentTimeMillis();    int                   num_tries=0;    int                   max_tries=2;   // number of times to send a are-you-alive msg (tot time= max_tries*timeout)    final List            members=new CopyOnWriteArrayList();    final Hashtable       invalid_pingers=new Hashtable(7);  // keys=Address, val=Integer (number of pings from suspected mbrs)    /** Members from which we select ping_dest. may be subset of {@link #members} */    final List            pingable_mbrs=new CopyOnWriteArrayList();    boolean               shun=true;    TimeScheduler         timer=null;    private Monitor       monitor=null;  // task that performs the actual monitoring for failure detection    private final Object  monitor_mutex=new Object();    protected int         num_heartbeats=0;    protected int         num_suspect_events=0;    /** Transmits SUSPECT message until view change or UNSUSPECT is received */    protected final Broadcaster  bcast_task=new Broadcaster();    final static String   name="FD";    BoundedList           suspect_history=new BoundedList(20);    public String getName() {return name;}    public String getLocalAddress() {return local_addr != null? local_addr.toString() : "null";}    public String getMembers() {return members != null? members.toString() : "null";}    public String getPingableMembers() {return pingable_mbrs != null? pingable_mbrs.toString() : "null";}    public String getPingDest() {return ping_dest != null? ping_dest.toString() : "null";}    public int getNumberOfHeartbeatsSent() {return num_heartbeats;}    public int getNumSuspectEventsGenerated() {return num_suspect_events;}    public long getTimeout() {return timeout;}    public void setTimeout(long timeout) {this.timeout=timeout;}    public int getMaxTries() {return max_tries;}    public void setMaxTries(int max_tries) {this.max_tries=max_tries;}    public int getCurrentNumTries() {return num_tries;}    public boolean isShun() {return shun;}    public void setShun(boolean flag) {this.shun=flag;}    public String printSuspectHistory() {        StringBuffer sb=new StringBuffer();        for(Enumeration en=suspect_history.elements(); en.hasMoreElements();) {            sb.append(new Date()).append(": ").append(en.nextElement()).append("\n");        }        return sb.toString();    }    public boolean setProperties(Properties props) {        String str;        super.setProperties(props);        str=props.getProperty("timeout");        if(str != null) {            timeout=Long.parseLong(str);            props.remove("timeout");        }        str=props.getProperty("max_tries");  // before suspecting a member        if(str != null) {            max_tries=Integer.parseInt(str);            props.remove("max_tries");        }        str=props.getProperty("shun");        if(str != null) {            shun=Boolean.valueOf(str).booleanValue();            props.remove("shun");        }        if(props.size() > 0) {            log.error("the following properties are not recognized: " + props);            return false;        }        return true;    }    public void resetStats() {        num_heartbeats=num_suspect_events=0;        suspect_history.removeAll();    }    public void init() throws Exception {        if(stack != null && stack.timer != null)            timer=stack.timer;        else            throw new Exception("FD.init(): timer cannot be retrieved from protocol stack");    }    public void stop() {        stopMonitor();    }    private Object getPingDest(List mbrs) {        Object tmp, retval=null;        if(mbrs == null || mbrs.size() < 2 || local_addr == null)            return null;        for(int i=0; i < mbrs.size(); i++) {            tmp=mbrs.get(i);            if(local_addr.equals(tmp)) {                if(i + 1 >= mbrs.size())                    retval=mbrs.get(0);                else                    retval=mbrs.get(i + 1);                break;            }        }        return retval;    }    private void startMonitor() {        synchronized(monitor_mutex) {            if(monitor != null && monitor.started == false) {                monitor=null;            }            if(monitor == null) {                monitor=createMonitor();                last_ack=System.currentTimeMillis();  // start from scratch                timer.add(monitor, true);  // fixed-rate scheduling                num_tries=0;            }        }    }    private void stopMonitor() {        synchronized(monitor_mutex) {            if(monitor != null) {                monitor.stop();                monitor=null;            }        }    }    protected Monitor createMonitor() {        return new Monitor();    }    public void up(Event evt) {        Message msg;        FdHeader hdr;        Object sender, tmphdr;        switch(evt.getType()) {            case Event.SET_LOCAL_ADDRESS:                local_addr=(Address)evt.getArg();                break;            case Event.MSG:                msg=(Message)evt.getArg();                tmphdr=msg.getHeader(name);                if(tmphdr == null || !(tmphdr instanceof FdHeader)) {                    if(ping_dest != null && (sender=msg.getSrc()) != null) {                        if(ping_dest.equals(sender)) {                            last_ack=System.currentTimeMillis();                            if(trace)                                log.trace("received msg from " + sender + " (counts as ack)");                            num_tries=0;                        }                    }                    break;  // message did not originate from FD layer, just pass up                }                hdr=(FdHeader)msg.removeHeader(name);                switch(hdr.type) {                    case FdHeader.HEARTBEAT:                       // heartbeat request; send heartbeat ack                        Address hb_sender=msg.getSrc();                        if(trace)                            log.trace("received are-you-alive from " + hb_sender + ", sending response");                        sendHeartbeatResponse(hb_sender);                        // 2. Shun the sender of a HEARTBEAT message if that sender is not a member. This will cause                        //    the sender to leave the group (and possibly rejoin it later)                        if(shun)                            shunInvalidHeartbeatSender(hb_sender);                        break;                                     // don't pass up !                    case FdHeader.HEARTBEAT_ACK:                   // heartbeat ack                        if(ping_dest != null && ping_dest.equals(hdr.from)) {                            last_ack=System.currentTimeMillis();                            num_tries=0;                            if(log.isDebugEnabled()) log.debug("received ack from " + hdr.from);                        }                        else {                            stop();                            ping_dest=(Address)getPingDest(pingable_mbrs);                            if(ping_dest != null) {                                try {                                    startMonitor();                                }                                catch(Exception ex) {                                    if(warn) log.warn("exception when calling startMonitor(): " + ex);                                }                            }                        }                        break;                    case FdHeader.SUSPECT:                        if(hdr.mbrs != null) {                            if(trace) log.trace("[SUSPECT] suspect hdr is " + hdr);                            for(int i=0; i < hdr.mbrs.size(); i++) {                                Address m=(Address)hdr.mbrs.elementAt(i);                                if(local_addr != null && m.equals(local_addr)) {                                    if(warn)                                        log.warn("I was suspected by " + msg.getSrc() + "; ignoring the SUSPECT " +                                                "message and sending back a HEARTBEAT_ACK");                                    sendHeartbeatResponse(msg.getSrc());                                    continue;                                }                                else {                                    pingable_mbrs.remove(m);                                    ping_dest=(Address)getPingDest(pingable_mbrs);                                }                                passUp(new Event(Event.SUSPECT, m));                                passDown(new Event(Event.SUSPECT, m));                            }                        }                        break;                    case FdHeader.NOT_MEMBER:                        if(shun) {                            if(log.isDebugEnabled()) log.debug("[NOT_MEMBER] I'm being shunned; exiting");                            passUp(new Event(Event.EXIT));                        }                        break;                }                return;        }        passUp(evt); // pass up to the layer above us    }    public void down(Event evt) {        View v;        switch(evt.getType()) {            case Event.VIEW_CHANGE:                passDown(evt);                stop();                synchronized(this) {                    v=(View)evt.getArg();                    members.clear();                    members.addAll(v.getMembers());                    bcast_task.adjustSuspectedMembers(members);                    pingable_mbrs.clear();                    pingable_mbrs.addAll(members);                    ping_dest=(Address)getPingDest(pingable_mbrs);                    if(ping_dest != null) {                        try {                            startMonitor();                        }                        catch(Exception ex) {                            if(warn) log.warn("exception when calling startMonitor(): " + ex);                        }                    }                }                break;            case Event.UNSUSPECT:                unsuspect((Address)evt.getArg());                passDown(evt);                break;            default:                passDown(evt);                break;        }    }    private void sendHeartbeatResponse(Address dest) {        Message hb_ack=new Message(dest, null, null);        FdHeader tmp_hdr=new FdHeader(FdHeader.HEARTBEAT_ACK);        tmp_hdr.from=local_addr;        hb_ack.putHeader(name, tmp_hdr);        passDown(new Event(Event.MSG, hb_ack));    }    private void unsuspect(Address mbr) {        bcast_task.removeSuspectedMember(mbr);        pingable_mbrs.clear();        pingable_mbrs.addAll(members);        pingable_mbrs.removeAll(bcast_task.getSuspectedMembers());        ping_dest=(Address)getPingDest(pingable_mbrs);    }    /**     * If sender is not a member, send a NOT_MEMBER to sender (after n pings received)     */    private void shunInvalidHeartbeatSender(Address hb_sender) {        int num_pings=0;        Message shun_msg;        if(hb_sender != null && members != null && !members.contains(hb_sender)) {

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?