⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 fd_pid.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
// $Id: FD_PID.java,v 1.8 2005/08/11 12:43:47 belaban Exp $package org.jgroups.protocols;import org.jgroups.*;import org.jgroups.stack.IpAddress;import org.jgroups.stack.Protocol;import org.jgroups.util.Promise;import org.jgroups.util.TimeScheduler;import org.jgroups.util.Util;import java.io.IOException;import java.io.ObjectInput;import java.io.ObjectOutput;import java.net.InetAddress;import java.util.Enumeration;import java.util.Hashtable;import java.util.Properties;import java.util.Vector;/** * Process-ID based FD protocol. The existence of a process will be tested * via the process ID instead of message based pinging. In order to probe a process' existence, the application (or * some other protocol layer) has to send down a SET_PID event for the member. The addresses of all members will * be associated with their respective PIDs. The PID will be used to probe for the existence of that process.<p> * A cache of Addresses and PIDs is maintained in each member, which is adjusted upon reception of view changes. * The population of the addr:pid cache is as follows:<br> * When a new member joins, it requests the PID cache from the coordinator. Then it broadcasts its own addr:pid * association, so all members can update their cache. When a member P is to be pinged by Q, and Q doesn't have * P'd PID, Q will broadcast a WHO_HAS_PID message, to which all members who have that entry will respond. The * latter case should actually never happen because all members should always have consistent caches. However, * it is left in the code as a second line of defense.<p> * Note that * <em>1. The SET_PID has to be sent down after connecting to a channel !</em><p> * <em>2. Note that if a process is shunned and subsequently reconnects, the SET_PID event has to be resent !</em><p> * <em>3. This protocol only works for groups whose members are on the same host </em>. 'Host' actually means the * same IP address (e.g. for multi-homed systems). */public class FD_PID extends Protocol {    Address ping_dest=null;                 // address of the member we monitor    int ping_pid=0;                     // PID of the member we monitor    Address local_addr=null;                // our own address    int local_pid=0;                    // PID of this process    long timeout=3000;                   // msecs to wait for an are-you-alive msg    long get_pids_timeout=3000;          // msecs to wait for the PID cache from the coordinator    final long get_pids_retry_timeout=500;     // msecs to wait until we retry fetching the cache from the coord    int num_tries=3;                    // attempts the coord is solicited for PID cache until we give up    final Vector members=new Vector();           // list of group members (updated on VIEW_CHANGE)    final Hashtable pids=new Hashtable();           // keys=Addresses, vals=Integer (PIDs)    boolean own_pid_sent=false;             // has own PID been broadcast yet ?    final Vector pingable_mbrs=new Vector();     // mbrs from which we select ping_dest. possible subset of 'members'    final Promise get_pids_promise=new Promise(); // used for rendezvous on GET_PIDS and GET_PIDS_RSP    boolean got_cache_from_coord=false;     // was cache already fetched ?    TimeScheduler timer=null;                     // timer for recurring task of liveness pinging    Monitor monitor=null;                   // object that performs the actual monitoring    public String getName() {        return "FD_PID";    }    public boolean setProperties(Properties props) {        String str;        super.setProperties(props);        str=props.getProperty("timeout");        if(str != null) {            timeout=Long.parseLong(str);            props.remove("timeout");        }        str=props.getProperty("get_pids_timeout");        if(str != null) {            get_pids_timeout=Long.parseLong(str);            props.remove("get_pids_timeout");        }        str=props.getProperty("num_tries");        if(str != null) {            num_tries=Integer.parseInt(str);            props.remove("num_tries");        }        if(props.size() > 0) {            log.error("FD_PID.setProperties(): the following properties are not recognized: " + props);            return false;        }        return true;    }    public void start() throws Exception {        if(stack != null && stack.timer != null)            timer=stack.timer;        else {            if(warn) log.warn("TimeScheduler in protocol stack is null (or protocol stack is null)");            return;        }        if(monitor != null && monitor.started == false) {            monitor=null;        }        if(monitor == null) {            monitor=new Monitor();            timer.add(monitor, true);  // fixed-rate scheduling        }    }    public void stop() {        if(monitor != null) {            monitor.stop();            monitor=null;        }    }    public void up(Event evt) {        Message msg;        FdHeader hdr=null;        Object tmphdr;        switch(evt.getType()) {            case Event.SET_LOCAL_ADDRESS:                local_addr=(Address)evt.getArg();                break;            case Event.MSG:                msg=(Message)evt.getArg();                tmphdr=msg.getHeader(getName());                if(tmphdr == null || !(tmphdr instanceof FdHeader))                    break;  // message did not originate from FD_PID layer, just pass up                hdr=(FdHeader)msg.removeHeader(getName());                switch(hdr.type) {                    case FdHeader.SUSPECT:                        if(hdr.mbr != null) {                            if(log.isInfoEnabled()) log.info("[SUSPECT] hdr: " + hdr);                            passUp(new Event(Event.SUSPECT, hdr.mbr));                            passDown(new Event(Event.SUSPECT, hdr.mbr));                        }                        break;                        // If I have the PID for the address 'hdr.mbr', return it. Otherwise look it up in my cache and return it                    case FdHeader.WHO_HAS_PID:                        if(local_addr != null && local_addr.equals(msg.getSrc()))                            return; // don't reply to WHO_HAS bcasts sent by me !                        if(hdr.mbr == null) {                            if(log.isErrorEnabled()) log.error("[WHO_HAS_PID] hdr.mbr is null");                            return;                        }                        // 1. Try my own address, maybe it's me whose PID is wanted                        if(local_addr != null && local_addr.equals(hdr.mbr) && local_pid > 0) {                            sendIHavePidMessage(msg.getSrc(), hdr.mbr, local_pid);  // unicast message to msg.getSrc()                            return;                        }                        // 2. If I don't have it, maybe it is in the cache                        if(pids.containsKey(hdr.mbr))                            sendIHavePidMessage(msg.getSrc(), hdr.mbr, ((Integer)pids.get(hdr.mbr)).intValue());  // ucast msg                        break;                        // Update the cache with the add:pid entry (if on the same host)                    case FdHeader.I_HAVE_PID:                        if(log.isInfoEnabled()) log.info("i-have pid: " + hdr.mbr + " --> " + hdr.pid);                        if(hdr.mbr == null || hdr.pid <= 0) {                            if(log.isErrorEnabled()) log.error("[I_HAVE_PID] hdr.mbr is null or hdr.pid == 0");                            return;                        }                        if(!sameHost(local_addr, hdr.mbr)) {                            if(log.isErrorEnabled())                                log.error(hdr.mbr + " is not on the same host as I (" +                                        local_addr + ", discarding I_HAVE_PID event");                            return;                        }                        // if(!pids.containsKey(hdr.mbr))                        pids.put(hdr.mbr, new Integer(hdr.pid)); // update the cache                        if(log.isInfoEnabled()) log.info("[" + local_addr + "]: the cache is " + pids);                        if(ping_pid <= 0 && ping_dest != null && pids.containsKey(ping_dest)) {                            ping_pid=((Integer)pids.get(ping_dest)).intValue();                            try {                                start();                            }                            catch(Exception ex) {                                if(warn) log.warn("exception when calling start(): " + ex);                            }                        }                        break;                        // Return the cache to the sender of this message                    case FdHeader.GET_PIDS:                        if(hdr.mbr == null) {                            if(log.isErrorEnabled()) log.error("[GET_PIDS]: hdr.mbr == null");                            return;                        }                        hdr=new FdHeader(FdHeader.GET_PIDS_RSP);                        hdr.pids=(Hashtable)pids.clone();                        msg=new Message(hdr.mbr, null, null);                        msg.putHeader(getName(), hdr);                        passDown(new Event(Event.MSG, msg));                        break;                    case FdHeader.GET_PIDS_RSP:                        if(hdr.pids == null) {                            if(log.isErrorEnabled()) log.error("[GET_PIDS_RSP]: cache is null");                            return;                        }                        get_pids_promise.setResult(hdr.pids);                        break;                }                return;        }        passUp(evt);                                        // pass up to the layer above us    }    public void down(Event evt) {        Integer pid;        Address mbr, tmp_ping_dest;        View v;        switch(evt.getType()) {            case Event.SET_PID:                // 1. Set the PID for local_addr                pid=(Integer)evt.getArg();                if(pid == null) {                    if(log.isErrorEnabled()) log.error("SET_PID did not contain a pid !");                    return;                }                local_pid=pid.intValue();                if(log.isInfoEnabled()) log.info("local_pid=" + local_pid);                break;            case Event.VIEW_CHANGE:                synchronized(this) {                    v=(View)evt.getArg();                    members.removeAllElements();                    members.addAll(v.getMembers());                    pingable_mbrs.removeAllElements();                    pingable_mbrs.addAll(members);                    passDown(evt);                    // 1. Get the addr:pid cache from the coordinator (only if not already fetched)                    if(!got_cache_from_coord) {                        getPidsFromCoordinator();                        got_cache_from_coord=true;                    }                    // 2. Broadcast my own addr:pid to all members so they can update their cache                    if(!own_pid_sent) {                        if(local_pid > 0) {                            sendIHavePidMessage(null, // send to all members                                    local_addr,                                    local_pid);                            own_pid_sent=true;                        }                        else                            if(warn) log.warn("[VIEW_CHANGE]: local_pid == 0");                    }                    // 3. Remove all entries in 'pids' which are not in the new membership                    if(members != null) {                        for(Enumeration e=pids.keys(); e.hasMoreElements();) {                            mbr=(Address)e.nextElement();                            if(!members.contains(mbr))                                pids.remove(mbr);                        }                    }                    tmp_ping_dest=determinePingDest();                    ping_pid=0;                    if(tmp_ping_dest == null) {                        stop();                        ping_dest=null;                    }                    else {                        ping_dest=tmp_ping_dest;                        try {                            start();                        }                        catch(Exception ex) {                            if(warn) log.warn("exception when calling start(): " + ex);                        }                    }                }                break;            default:

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -