⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 fd_pid.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
                passDown(evt);                break;        }    }    /* ----------------------------------- Private Methods -------------------------------------- */    /**     * Determines coordinator C. If C is null and we are the first member, return. Else loop: send GET_PIDS message     * to coordinator and wait for GET_PIDS_RSP response. Loop until valid response has been received.     */    void getPidsFromCoordinator() {        Address coord;        int attempts=num_tries;        Message msg;        FdHeader hdr;        Hashtable result;        get_pids_promise.reset();        while(attempts > 0) {            if((coord=determineCoordinator()) != null) {                if(coord.equals(local_addr)) { // we are the first member --> empty cache                    if(log.isInfoEnabled()) log.info("first member; cache is empty");                    return;                }                hdr=new FdHeader(FdHeader.GET_PIDS);                hdr.mbr=local_addr;                msg=new Message(coord, null, null);                msg.putHeader(getName(), hdr);                passDown(new Event(Event.MSG, msg));                result=(Hashtable)get_pids_promise.getResult(get_pids_timeout);                if(result != null) {                    pids.putAll(result); // replace all entries (there should be none !) in pids with the new values                    if(log.isInfoEnabled())                        log.info("got cache from " +                                coord + ": cache is " + pids);                    return;                }                else {                    if(log.isErrorEnabled()) log.error("received null cache; retrying");                }            }            Util.sleep(get_pids_retry_timeout);            --attempts;        }    }    void broadcastSuspectMessage(Address suspected_mbr) {        Message suspect_msg;        FdHeader hdr;        if(log.isInfoEnabled())            log.info("suspecting " + suspected_mbr +                    " (own address is " + local_addr + ')');        hdr=new FdHeader(FdHeader.SUSPECT);        hdr.mbr=suspected_mbr;        suspect_msg=new Message();       // mcast SUSPECT to all members        suspect_msg.putHeader(getName(), hdr);        passDown(new Event(Event.MSG, suspect_msg));    }    void broadcastWhoHasPidMessage(Address mbr) {        Message msg;        FdHeader hdr;        if(local_addr != null && mbr != null)            if(log.isInfoEnabled()) log.info("[" + local_addr + "]: who-has " + mbr);        msg=new Message();  // bcast msg        hdr=new FdHeader(FdHeader.WHO_HAS_PID);        hdr.mbr=mbr;        msg.putHeader(getName(), hdr);        passDown(new Event(Event.MSG, msg));    }    /**     * Sends or broadcasts a I_HAVE_PID response. If 'dst' is null, the reponse will be broadcast, otherwise     * it will be unicast back to the requester     */    void sendIHavePidMessage(Address dst, Address mbr, int pid) {        Message msg=new Message(dst, null, null);        FdHeader hdr=new FdHeader(FdHeader.I_HAVE_PID);        hdr.mbr=mbr;        hdr.pid=pid;        msg.putHeader(getName(), hdr);        passDown(new Event(Event.MSG, msg));    }    /**     * Set ping_dest and ping_pid. If ping_pid is not known, broadcast a WHO_HAS_PID message.     */    Address determinePingDest() {        Address tmp;        if(pingable_mbrs == null || pingable_mbrs.size() < 2 || local_addr == null)            return null;        for(int i=0; i < pingable_mbrs.size(); i++) {            tmp=(Address)pingable_mbrs.elementAt(i);            if(local_addr.equals(tmp)) {                if(i + 1 >= pingable_mbrs.size())                    return (Address)pingable_mbrs.elementAt(0);                else                    return (Address)pingable_mbrs.elementAt(i + 1);            }        }        return null;    }    Address determineCoordinator() {        return members.size() > 0 ? (Address)members.elementAt(0) : null;    }    /**     * Checks whether 2 Addresses are on the same host     */    boolean sameHost(Address one, Address two) {        InetAddress a, b;        String host_a, host_b;        if(one == null || two == null) return false;        if(!(one instanceof IpAddress) || !(two instanceof IpAddress)) {            if(log.isErrorEnabled()) log.error("addresses have to be of type IpAddress to be compared");            return false;        }        a=((IpAddress)one).getIpAddress();        b=((IpAddress)two).getIpAddress();        if(a == null || b == null) return false;        host_a=a.getHostAddress();        host_b=b.getHostAddress();        return host_a.equals(host_b);    }    /* ------------------------------- End of Private Methods ------------------------------------ */    public static class FdHeader extends Header {        static final int SUSPECT=10;        static final int WHO_HAS_PID=11;        static final int I_HAVE_PID=12;        static final int GET_PIDS=13; // sent by joining member to coordinator        static final int GET_PIDS_RSP=14; // sent by coordinator to joining member in response to GET_PIDS        int type=SUSPECT;        Address mbr=null;     // set on SUSPECT (suspected mbr), WHO_HAS_PID (requested mbr), I_HAVE_PID        int pid=0;        // set on I_HAVE_PID        Hashtable pids=null;    // set on GET_PIDS_RSP        public FdHeader() {        } // used for externalization        FdHeader(int type) {            this.type=type;        }        public String toString() {            StringBuffer sb=new StringBuffer();            sb.append(type2String(type));            if(mbr != null)                sb.append(", mbr=" + mbr);            if(pid > 0)                sb.append(", pid=" + pid);            if(pids != null)                sb.append(", pids=" + pids);            return sb.toString();        }        public static String type2String(int type) {            switch(type) {                case SUSPECT:                    return "SUSPECT";                case WHO_HAS_PID:                    return "WHO_HAS_PID";                case I_HAVE_PID:                    return "I_HAVE_PID";                case GET_PIDS:                    return "GET_PIDS";                case GET_PIDS_RSP:                    return "GET_PIDS_RSP";                default:                    return "unknown type (" + type + ')';            }        }        public void writeExternal(ObjectOutput out) throws IOException {            out.writeInt(type);            out.writeObject(mbr);            out.writeInt(pid);            out.writeObject(pids);        }        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {            type=in.readInt();            mbr=(Address)in.readObject();            pid=in.readInt();            pids=(Hashtable)in.readObject();        }    }    /**     * An instance of this class will be added to the TimeScheduler to be scheduled to be run every timeout     * msecs. When there is no ping_dest (e.g. only 1 member in the group), this task will be cancelled in     * TimeScheduler (and re-scheduled if ping_dest becomes available later).     */    private class Monitor implements TimeScheduler.Task {        boolean started=true;        void stop() {            started=false;        }        /* -------------------------------------- TimeScheduler.Task Interface -------------------------------- */        public boolean cancelled() {            return !started;        }        public long nextInterval() {            return timeout;        }        /**         * Periodically probe for the destination process identified by ping_dest/ping_pid. Suspect the ping_dest         * member if /prop/<ping_pid> process does not exist.         */        public void run() {            if(ping_dest == null) {                if(warn) log.warn("ping_dest is null, skipping ping");                return;            }            if(log.isInfoEnabled())                log.info("ping_dest=" + ping_dest + ", ping_pid=" + ping_pid +                        ", cache=" + pids);            // If the PID for ping_dest is not known, send a broadcast to solicit it            if(ping_pid <= 0) {                if(ping_dest != null && pids.containsKey(ping_dest)) {                    ping_pid=((Integer)pids.get(ping_dest)).intValue();                    if(log.isInfoEnabled())                        log.info("found PID for " +                                ping_dest + " in cache (pid=" + ping_pid + ')');                }                else {                    if(log.isErrorEnabled())                        log.error("PID for " + ping_dest + " not known" +                                ", cache is " + pids);                    broadcastWhoHasPidMessage(ping_dest);                    return;                }            }            if(!Util.fileExists("/proc/" + ping_pid)) {                if(log.isInfoEnabled()) log.info("process " + ping_pid + " does not exist");                broadcastSuspectMessage(ping_dest);                pingable_mbrs.removeElement(ping_dest);                ping_dest=determinePingDest();                if(ping_dest == null)                    stop();                ping_pid=0;            }            else {                if(log.isInfoEnabled()) log.info(ping_dest + " is alive");            }        }        /* ---------------------------------- End of TimeScheduler.Task Interface ---------------------------- */    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -