📄 fd_pid.java
字号:
// $Id: FD_PID.java,v 1.8 2005/08/11 12:43:47 belaban Exp $package org.jgroups.protocols;import org.jgroups.*;import org.jgroups.stack.IpAddress;import org.jgroups.stack.Protocol;import org.jgroups.util.Promise;import org.jgroups.util.TimeScheduler;import org.jgroups.util.Util;import java.io.IOException;import java.io.ObjectInput;import java.io.ObjectOutput;import java.net.InetAddress;import java.util.Enumeration;import java.util.Hashtable;import java.util.Properties;import java.util.Vector;/** * Process-ID based FD protocol. The existence of a process will be tested * via the process ID instead of message based pinging. In order to probe a process' existence, the application (or * some other protocol layer) has to send down a SET_PID event for the member. The addresses of all members will * be associated with their respective PIDs. The PID will be used to probe for the existence of that process.<p> * A cache of Addresses and PIDs is maintained in each member, which is adjusted upon reception of view changes. * The population of the addr:pid cache is as follows:<br> * When a new member joins, it requests the PID cache from the coordinator. Then it broadcasts its own addr:pid * association, so all members can update their cache. When a member P is to be pinged by Q, and Q doesn't have * P'd PID, Q will broadcast a WHO_HAS_PID message, to which all members who have that entry will respond. The * latter case should actually never happen because all members should always have consistent caches. However, * it is left in the code as a second line of defense.<p> * Note that * <em>1. The SET_PID has to be sent down after connecting to a channel !</em><p> * <em>2. Note that if a process is shunned and subsequently reconnects, the SET_PID event has to be resent !</em><p> * <em>3. This protocol only works for groups whose members are on the same host </em>. 'Host' actually means the * same IP address (e.g. for multi-homed systems). */public class FD_PID extends Protocol { Address ping_dest=null; // address of the member we monitor int ping_pid=0; // PID of the member we monitor Address local_addr=null; // our own address int local_pid=0; // PID of this process long timeout=3000; // msecs to wait for an are-you-alive msg long get_pids_timeout=3000; // msecs to wait for the PID cache from the coordinator final long get_pids_retry_timeout=500; // msecs to wait until we retry fetching the cache from the coord int num_tries=3; // attempts the coord is solicited for PID cache until we give up final Vector members=new Vector(); // list of group members (updated on VIEW_CHANGE) final Hashtable pids=new Hashtable(); // keys=Addresses, vals=Integer (PIDs) boolean own_pid_sent=false; // has own PID been broadcast yet ? final Vector pingable_mbrs=new Vector(); // mbrs from which we select ping_dest. possible subset of 'members' final Promise get_pids_promise=new Promise(); // used for rendezvous on GET_PIDS and GET_PIDS_RSP boolean got_cache_from_coord=false; // was cache already fetched ? TimeScheduler timer=null; // timer for recurring task of liveness pinging Monitor monitor=null; // object that performs the actual monitoring public String getName() { return "FD_PID"; } public boolean setProperties(Properties props) { String str; super.setProperties(props); str=props.getProperty("timeout"); if(str != null) { timeout=Long.parseLong(str); props.remove("timeout"); } str=props.getProperty("get_pids_timeout"); if(str != null) { get_pids_timeout=Long.parseLong(str); props.remove("get_pids_timeout"); } str=props.getProperty("num_tries"); if(str != null) { num_tries=Integer.parseInt(str); props.remove("num_tries"); } if(props.size() > 0) { log.error("FD_PID.setProperties(): the following properties are not recognized: " + props); return false; } return true; } public void start() throws Exception { if(stack != null && stack.timer != null) timer=stack.timer; else { if(warn) log.warn("TimeScheduler in protocol stack is null (or protocol stack is null)"); return; } if(monitor != null && monitor.started == false) { monitor=null; } if(monitor == null) { monitor=new Monitor(); timer.add(monitor, true); // fixed-rate scheduling } } public void stop() { if(monitor != null) { monitor.stop(); monitor=null; } } public void up(Event evt) { Message msg; FdHeader hdr=null; Object tmphdr; switch(evt.getType()) { case Event.SET_LOCAL_ADDRESS: local_addr=(Address)evt.getArg(); break; case Event.MSG: msg=(Message)evt.getArg(); tmphdr=msg.getHeader(getName()); if(tmphdr == null || !(tmphdr instanceof FdHeader)) break; // message did not originate from FD_PID layer, just pass up hdr=(FdHeader)msg.removeHeader(getName()); switch(hdr.type) { case FdHeader.SUSPECT: if(hdr.mbr != null) { if(log.isInfoEnabled()) log.info("[SUSPECT] hdr: " + hdr); passUp(new Event(Event.SUSPECT, hdr.mbr)); passDown(new Event(Event.SUSPECT, hdr.mbr)); } break; // If I have the PID for the address 'hdr.mbr', return it. Otherwise look it up in my cache and return it case FdHeader.WHO_HAS_PID: if(local_addr != null && local_addr.equals(msg.getSrc())) return; // don't reply to WHO_HAS bcasts sent by me ! if(hdr.mbr == null) { if(log.isErrorEnabled()) log.error("[WHO_HAS_PID] hdr.mbr is null"); return; } // 1. Try my own address, maybe it's me whose PID is wanted if(local_addr != null && local_addr.equals(hdr.mbr) && local_pid > 0) { sendIHavePidMessage(msg.getSrc(), hdr.mbr, local_pid); // unicast message to msg.getSrc() return; } // 2. If I don't have it, maybe it is in the cache if(pids.containsKey(hdr.mbr)) sendIHavePidMessage(msg.getSrc(), hdr.mbr, ((Integer)pids.get(hdr.mbr)).intValue()); // ucast msg break; // Update the cache with the add:pid entry (if on the same host) case FdHeader.I_HAVE_PID: if(log.isInfoEnabled()) log.info("i-have pid: " + hdr.mbr + " --> " + hdr.pid); if(hdr.mbr == null || hdr.pid <= 0) { if(log.isErrorEnabled()) log.error("[I_HAVE_PID] hdr.mbr is null or hdr.pid == 0"); return; } if(!sameHost(local_addr, hdr.mbr)) { if(log.isErrorEnabled()) log.error(hdr.mbr + " is not on the same host as I (" + local_addr + ", discarding I_HAVE_PID event"); return; } // if(!pids.containsKey(hdr.mbr)) pids.put(hdr.mbr, new Integer(hdr.pid)); // update the cache if(log.isInfoEnabled()) log.info("[" + local_addr + "]: the cache is " + pids); if(ping_pid <= 0 && ping_dest != null && pids.containsKey(ping_dest)) { ping_pid=((Integer)pids.get(ping_dest)).intValue(); try { start(); } catch(Exception ex) { if(warn) log.warn("exception when calling start(): " + ex); } } break; // Return the cache to the sender of this message case FdHeader.GET_PIDS: if(hdr.mbr == null) { if(log.isErrorEnabled()) log.error("[GET_PIDS]: hdr.mbr == null"); return; } hdr=new FdHeader(FdHeader.GET_PIDS_RSP); hdr.pids=(Hashtable)pids.clone(); msg=new Message(hdr.mbr, null, null); msg.putHeader(getName(), hdr); passDown(new Event(Event.MSG, msg)); break; case FdHeader.GET_PIDS_RSP: if(hdr.pids == null) { if(log.isErrorEnabled()) log.error("[GET_PIDS_RSP]: cache is null"); return; } get_pids_promise.setResult(hdr.pids); break; } return; } passUp(evt); // pass up to the layer above us } public void down(Event evt) { Integer pid; Address mbr, tmp_ping_dest; View v; switch(evt.getType()) { case Event.SET_PID: // 1. Set the PID for local_addr pid=(Integer)evt.getArg(); if(pid == null) { if(log.isErrorEnabled()) log.error("SET_PID did not contain a pid !"); return; } local_pid=pid.intValue(); if(log.isInfoEnabled()) log.info("local_pid=" + local_pid); break; case Event.VIEW_CHANGE: synchronized(this) { v=(View)evt.getArg(); members.removeAllElements(); members.addAll(v.getMembers()); pingable_mbrs.removeAllElements(); pingable_mbrs.addAll(members); passDown(evt); // 1. Get the addr:pid cache from the coordinator (only if not already fetched) if(!got_cache_from_coord) { getPidsFromCoordinator(); got_cache_from_coord=true; } // 2. Broadcast my own addr:pid to all members so they can update their cache if(!own_pid_sent) { if(local_pid > 0) { sendIHavePidMessage(null, // send to all members local_addr, local_pid); own_pid_sent=true; } else if(warn) log.warn("[VIEW_CHANGE]: local_pid == 0"); } // 3. Remove all entries in 'pids' which are not in the new membership if(members != null) { for(Enumeration e=pids.keys(); e.hasMoreElements();) { mbr=(Address)e.nextElement(); if(!members.contains(mbr)) pids.remove(mbr); } } tmp_ping_dest=determinePingDest(); ping_pid=0; if(tmp_ping_dest == null) { stop(); ping_dest=null; } else { ping_dest=tmp_ping_dest; try { start(); } catch(Exception ex) { if(warn) log.warn("exception when calling start(): " + ex); } } } break; default:
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -