fd_rand.java.txt

来自「JGRoups源码」· 文本 代码 · 共 288 行

TXT
288
字号
// $Id: FD_RAND.java.txt,v 1.3 2005/05/30 14:31:05 belaban Exp $package org.jgroups.protocols;import java.util.Properties;import java.util.Vector;class FdRandHeader extends Header {    static final int HEARTBEAT     = 0;    static final int HEARTBEAT_ACK = 1;    static final int SUSPECT       = 2;    static final int REGULAR       = 3;    int     type=HEARTBEAT;    Object  suspected_mbr=null;    FdRandHeader(int type) {this.type=type;}    public String toString() {	switch(type) {	case HEARTBEAT:	    return "[FD_RAND: heartbeat]";	case HEARTBEAT_ACK:	    return "[FD_RAND: heartbeat ack]";	case SUSPECT:	    return "[FD_RAND: suspect]";	case REGULAR:	    return "[FD_RAND: regular message]";	default:	    return "[FD_RAND: unknown type (" + type + ")]";	}    }}/**   Failure detection based on simple heartbeat protocol. Regularly polls randomly   selected members for liveness. Passes SUSPECT message up the stack when a member is   not reachable. The simple algorithms works as follows: the membership is known.  Each   HB protocol periodically sends a 'are-you-alive' message to a randomly selected   member, except itself. When a response hasn't been received for n milliseconds and m   tries, the corresponding member is suspected (and eventually excluded if faulty).<p>   FD_RAND starts when it detects (in a view change notification) that there are at least   2 members in the group. It stops running when the membership drops below 2.  */public class FD_RAND extends Protocol implements Runnable {    boolean      trace=false;    Address      ping_dest=null;    Address      local_addr=null;    Thread       pinger=null;    long         timeout=1000;  // 1 second between heartbeats    boolean      ack_received=false;    Vector       members=null;    int          num_tries=0;    final int    max_tries=2;   // 3 tries before suspecting    Object       ack_mutex=new Object();    public String  getName() {return "FD_RAND";}    public boolean setProperties(Properties props) {super.setProperties(props);	String     str;	this.props=props;	str=props.getProperty("trace");	if(str != null) {	    trace=new Boolean(str).booleanValue();	    props.remove("trace");	}		str=props.getProperty("timeout");	if(str != null) {	    timeout=new Long(str).longValue();	    props.remove("timeout");	}		str=props.getProperty("num_tries");	if(str != null) {	    num_tries=new Integer(str).intValue();	    props.remove("num_tries");	    if(num_tries < 1) {		log.error("FD_RAND.setProperties(): propertiy 'num_tries' must be at least 1 ! " +				   "Setting it to 1");		num_tries=1;	    }	}	if(props.size() > 0) {	    log.error("FD_RAND.setProperties(): the following properties are not recognized: " + props);	    	    return false;	}	return true;    }    Address getPingDest() {	Address retval=null;	int     r, size;	if(members == null || members.size() < 2 || local_addr == null)	    return null;	size=members.size();	while(members.size() > 1) {	    r=((int)(Math.random() * (size+1))) % size;	    retval=(Address)members.elementAt(r);	    if(local_addr.equals(retval))		continue;	    else		break;	}		return retval;    }    public void up(Event evt) {	Message       msg;	FdRandHeader  hdr=null;	switch(evt.getType()) {	case Event.SET_LOCAL_ADDRESS:	    local_addr=(Address)evt.getArg();	    break;	    	case Event.MSG:	    msg=(Message)evt.getArg();	    try {		hdr=(FdRandHeader)msg.removeHeader();	    }	    catch(Exception e) {		log.error("FD_RAND.up(): " + e);	    }	    switch(hdr.type) {	    case FdRandHeader.HEARTBEAT:                         // heartbeat request; send heartbeat ack		Message      hb_ack=new Message(msg.getSrc(), null, null);		FdRandHeader tmp_hdr=new FdRandHeader(FdRandHeader.HEARTBEAT_ACK);		tmp_hdr.suspected_mbr=local_addr;				hb_ack.addHeader(tmp_hdr);		passDown(new Event(Event.MSG, hb_ack));		return;                                          // don't pass up !	    case FdRandHeader.HEARTBEAT_ACK:                     // heartbeat ack		Object suspect=hdr.suspected_mbr;		if(ping_dest != null && ping_dest.equals(suspect)) {		    synchronized(ack_mutex) {			ack_received=true;			ack_mutex.notify();		    }		}		return;	    case FdRandHeader.SUSPECT:		if(hdr.suspected_mbr != null) {		    System.out.println("FD_RAND: SUSPECT(" + hdr.suspected_mbr + ")");		    passUp(new Event(Event.SUSPECT, hdr.suspected_mbr));		}		return;	    default:		break;	    }	    	}	passUp(evt);                                        // pass up to the layer above us    }    public void down(Event evt) {	Message msg;		switch(evt.getType()) {	case Event.STOP:	    stop();	    passDown(evt);	    break;	    	case Event.VIEW_CHANGE:	    synchronized(this) {		stop();		View v=(View)evt.getArg();		members=v != null ? v.getMembers() : null;				passDown(evt);		start();	    }	    break;	case Event.MSG:	    msg=(Message)evt.getArg();	    msg.addHeader(new FdRandHeader(FdRandHeader.REGULAR));  // regular message	    passDown(evt);	    break;	default:	    passDown(evt);	    break;	}    }    /**       Loop while more than 1 member available. Choose a member randomly (not myself !) and send a       heartbeat. Wait for ack. If ack not received withing timeout, mcast SUSPECT message.     */    public void run() {	Message       suspect_msg, hb_req;	FdRandHeader  hdr;	while(members.size() > 1 && pinger != null) {	    ack_received=false;	    num_tries=0;	    ping_dest=getPingDest();	    while(!ack_received && num_tries <= max_tries && pinger != null) {		hb_req=new Message(ping_dest, null, null);		hb_req.addHeader(new FdRandHeader(FdRandHeader.HEARTBEAT));  // send heartbeat request		passDown(new Event(Event.MSG, hb_req));	    				synchronized(ack_mutex) {                                    // wait for heartbeat ack		    try {ack_mutex.wait(timeout);}		    catch(Exception e) {}		}		if(pinger == null) return;		if(ack_received) {		    Util.sleep(timeout);		    break;		}		else {		    if(num_tries >= max_tries) {			System.out.println("FD_RAND(" + local_addr + "): received no heartbeat ack from " + 					   ping_dest + ", suspecting it");			hdr=new FdRandHeader(FdRandHeader.SUSPECT);			hdr.suspected_mbr=ping_dest;			suspect_msg=new Message(null, null, null);  // mcast SUSPECT to all members			suspect_msg.addHeader(hdr);			passDown(new Event(Event.MSG, suspect_msg));			break;		    }		    else {			num_tries++;			Util.sleep(timeout);		    }		}	    }	}    }    void start() {	if(pinger == null) {	    pinger=new Thread(this, "FD_RAND.PingerThread");	    pinger.start();	}    }    void stop() {	Thread tmp=null;	num_tries=0;	ack_received=false;	if(pinger != null && pinger.isAlive()) {	    tmp=pinger;	    pinger=null;	    tmp.interrupt();	    try {tmp.join(timeout);} catch(Exception ex) {}	}	pinger=null;    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?