fd_shun.java.txt

来自「JGRoups源码」· 文本 代码 · 共 324 行

TXT
324
字号
// $Id: FD_SHUN.java.txt,v 1.3 2005/05/30 14:31:05 belaban Exp $package org.jgroups.protocols;import java.util.Hashtable;import java.util.Properties;import java.util.Vector;class FdHeaderShun extends Header {    static final int HEARTBEAT     = 0;    static final int HEARTBEAT_ACK = 1;    static final int SUSPECT       = 2;    static final int NOT_MEMBER    = 3;  // received as response by pinged mbr when we are not a member    int      type=HEARTBEAT;    Address  suspected_mbr=null;    Address  from=null;  // member who detected that suspected_mbr has failed    FdHeaderShun(int type) {this.type=type;}    public String toString() {	switch(type) {	case HEARTBEAT:	    return "[FD_SHUN: heartbeat]";	case HEARTBEAT_ACK:	    return "[FD_SHUN: heartbeat ack]";	case SUSPECT:	    return "[FD_SHUN: SUSPECT (suspected_mbr=" + suspected_mbr + ", from=" + from + ")]";	case NOT_MEMBER: return "[FD_SHUN: NOT_MEMBER]";	default:	    return "[FD_SHUN: unknown type (" + type + ")]";	}    }}/**   Failure detection based on simple heartbeat protocol. Regularly polls members for   liveness. Passes SUSPECT message up the stack when a member is not reachable. The simple   algorithms works as follows: the membership is known and ordered. Each HB protocol   periodically sends a 'are-you-alive' message to its *neighbor*. A neighbor is the next in   rank in the membership list. It is recomputed upon a view change. When a response hasn't   been received for n milliseconds and m tries, the corresponding member is suspected (and   eventually excluded if faulty).<p>   FD_SHUN starts when it detects (in a view change notification) that there are at least   2 members in the group. It stops running when the membership drops below 2.<p>   When a message is received from the monitored neighbor member, it causes the pinger thread to   'skip' sending the next are-you-alive message. Thus, traffic is reduced.<p>   When we receive a ping from a member that's not in the membership list, we shun it by sending it a    NOT_MEMBER message. That member will then leave the group (and possibly rejoin).*/public class FD_SHUN extends Protocol implements Runnable {    boolean      trace=false;    Address      ping_dest=null;    Address      local_addr=null;    Thread       pinger=null;   // pinger thread    long         timeout=3000;  // number of millisecs to wait for an are-you-alive msg    boolean      ack_received=false, skip_heartbeat=false;    int          num_tries=0;    int          max_tries=2;   // number of times to send a are-you-alive msg (tot time= max_tries*timeout)    Vector       members=null;    Hashtable    invalid_pingers=new Hashtable();  // keys=Address, val=Integer (number of pings from suspected mbrs)    public String  getName() {return "FD_SHUN";}    public boolean setProperties(Properties props) {super.setProperties(props);	String     str;	this.props=props;	str=props.getProperty("trace");	if(str != null) {	    trace=new Boolean(str).booleanValue();	    props.remove("trace");	}	str=props.getProperty("timeout");	if(str != null) {	    timeout=new Long(str).longValue();	    props.remove("timeout");	}	str=props.getProperty("max_tries");  // before suspecting a member	if(str != null) {	    max_tries=new Integer(str).intValue();	    props.remove("max_tries");	}	if(props.size() > 0) {	    log.error("FD_SHUN.setProperties(): the following properties are not recognized: " + props);	    	    return false;	}	return true;    }    Address getPingDest(Vector members) {	Address tmp, retval=null;	if(members == null || members.size() < 2 || local_addr == null)	    return null;	for(int i=0; i < members.size(); i++) {	    tmp=(Address)members.elementAt(i);	    if(local_addr.equals(tmp)) {		if(i + 1 >= members.size())		    retval=(Address)members.elementAt(0);		else		    retval=(Address)members.elementAt(i+1);		break;	    }	}	return retval;    }    public void up(Event evt) {	Message       msg;	FdHeaderShun  hdr=null;	Address       sender;	Object        tmphdr;	int           num_pings=0;	switch(evt.getType()) {	case Event.SET_LOCAL_ADDRESS:	    local_addr=(Address)evt.getArg();	    break;	case Event.MSG:	    msg=(Message)evt.getArg();	    tmphdr=msg.peekHeader();	    if(tmphdr == null || !(tmphdr instanceof FdHeaderShun)) {		if(ping_dest != null && (sender=msg.getSrc()) != null) {		    if(ping_dest.equals(sender)) {			ack_received=true;			num_tries=0;			skip_heartbeat=true;		    }		}		break;  // message did not originate from FD_SHUN layer, just pass up	    }	    hdr=(FdHeaderShun)msg.removeHeader();	    switch(hdr.type) {	    case FdHeaderShun.HEARTBEAT:                       // heartbeat request; send heartbeat ack		Address      hb_sender=msg.getSrc();		Message      hb_ack=new Message(msg.getSrc(), null, null);		FdHeaderShun tmp_hdr=new FdHeaderShun(FdHeaderShun.HEARTBEAT_ACK);		// 1.  Send an ack		tmp_hdr.suspected_mbr=local_addr;		hb_ack.addHeader(tmp_hdr);		passDown(new Event(Event.MSG, hb_ack));		// 2. If sender is not a member, send a SUSPECT to sender (after n pings received)		if(hb_sender != null && members != null && !members.contains(hb_sender)) {		    if(invalid_pingers.containsKey(hb_sender)) {			num_pings=((Integer)invalid_pingers.get(hb_sender)).intValue();			if(num_pings >= max_tries) {			    log.error("** FD_SHUN.up(HEARTBEAT): sender " + hb_sender +					       " is not member in " + members + " ! Telling it to leave group");			    hb_ack=new Message(msg.getSrc(), null, null);			    tmp_hdr=new FdHeaderShun(FdHeaderShun.NOT_MEMBER);			    tmp_hdr.from=local_addr;			    tmp_hdr.suspected_mbr=hb_sender;			    hb_ack.addHeader(tmp_hdr);			    passDown(new Event(Event.MSG, hb_ack));			    invalid_pingers.remove(hb_sender);			}			else {			    num_pings++;			    invalid_pingers.put(hb_sender, new Integer(num_pings));			}		    }		    else {			num_pings++;			invalid_pingers.put(hb_sender, new Integer(num_pings));		    }		}		break;                                     // don't pass up !	    case FdHeaderShun.HEARTBEAT_ACK:                   // heartbeat ack		Object suspect=hdr.suspected_mbr;		if(ping_dest != null && ping_dest.equals(suspect)) {		    ack_received=true;		    num_tries=0;		}		else {		    stop();		    ping_dest=(Address)getPingDest(members);		    if(ping_dest != null)			start();		}		break;	    case FdHeaderShun.SUSPECT:		if(hdr.suspected_mbr != null) {		    if(trace)			System.out.println("FD_SHUN.up(SUSPECT): " + hdr);		    passUp(new Event(Event.SUSPECT, hdr.suspected_mbr));		    passDown(new Event(Event.SUSPECT, hdr.suspected_mbr));		}		break;	    case FdHeaderShun.NOT_MEMBER:		System.out.println("** FD_SHUN.up(NOT_MEMBER): I'm being shunned; exiting");		passUp(new Event(Event.EXIT));		break;	    }	    return;	}	passUp(evt);                                        // pass up to the layer above us    }    public void down(Event evt) {	Message msg;	switch(evt.getType()) {	case Event.STOP:	    stop();	    passDown(evt);	    break;	case Event.VIEW_CHANGE:	    synchronized(this) {		stop();		View v=(View)evt.getArg();		members=(v != null) ? v.getMembers() : null;		passDown(evt);		ping_dest=(Address)getPingDest(members);		if(ping_dest != null)		    start();	    }	    break;	default:	    passDown(evt);	    break;	}    }    public void run() {	Message       suspect_msg, hb_req;	FdHeaderShun  hdr;	while(ping_dest != null && pinger != null) {	    ack_received=false;	    if(!skip_heartbeat) {		hb_req=new Message(ping_dest, null, null);		hb_req.addHeader(new FdHeaderShun(FdHeaderShun.HEARTBEAT));  // send heartbeat request		if(trace) System.out.println("FD_SHUN: sending are-you-alive msg to " + ping_dest);		passDown(new Event(Event.MSG, hb_req));	    }	    skip_heartbeat=false;	    Util.sleep(timeout);	    if(pinger == null)		break;	    if(!ack_received && num_tries >= max_tries) {		if(trace)		    System.out.println("FD_SHUN(" + local_addr + "): received no heartbeat ack from " +				       ping_dest + ", suspecting it");		hdr=new FdHeaderShun(FdHeaderShun.SUSPECT);		hdr.suspected_mbr=ping_dest;		hdr.from=local_addr;		suspect_msg=new Message(null, null, null);       // mcast SUSPECT to all members		suspect_msg.addHeader(hdr);		passDown(new Event(Event.MSG, suspect_msg));		members.removeElement(ping_dest);                // try the next neighbor		ping_dest=(Address)getPingDest(members);	    }	    else {		if(trace && !skip_heartbeat)		    System.out.println("FD_SHUN: received heartbeat ack from " + ping_dest);		num_tries++;	    }	}    }    void start() {	if(pinger == null) {	    pinger=new Thread(this, "FD_SHUN.PingerThread");	    pinger.start();	}    }    void stop() {	Thread tmp=null;	if(pinger != null && pinger.isAlive()) {	    tmp=pinger;	    pinger=null;	    tmp.interrupt();	    try {tmp.join(timeout);} catch(Exception ex) {}	    if(tmp.isAlive())		log.error("**** FD_SHUN.stop(): interrupted pinger thread is still alive !");	}	pinger=null;    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?