fd.java

来自「JGRoups源码」· Java 代码 · 共 686 行 · 第 1/2 页

JAVA
686
字号
            if(invalid_pingers.containsKey(hb_sender)) {                num_pings=((Integer)invalid_pingers.get(hb_sender)).intValue();                if(num_pings >= max_tries) {                    if(log.isDebugEnabled())                        log.debug(hb_sender + " is not in " + members + " ! Shunning it");                    shun_msg=new Message(hb_sender, null, null);                    shun_msg.putHeader(name, new FdHeader(FdHeader.NOT_MEMBER));                    passDown(new Event(Event.MSG, shun_msg));                    invalid_pingers.remove(hb_sender);                }                else {                    num_pings++;                    invalid_pingers.put(hb_sender, new Integer(num_pings));                }            }            else {                num_pings++;                invalid_pingers.put(hb_sender, new Integer(num_pings));            }        }    }    public static class FdHeader extends Header implements Streamable {        public static final byte HEARTBEAT=0;        public static final byte HEARTBEAT_ACK=1;        public static final byte SUSPECT=2;        public static final byte NOT_MEMBER=3;  // received as response by pinged mbr when we are not a member        byte    type=HEARTBEAT;        Vector  mbrs=null;        Address from=null;  // member who detected that suspected_mbr has failed        public FdHeader() {        } // used for externalization        public FdHeader(byte type) {            this.type=type;        }        public FdHeader(byte type, Vector mbrs, Address from) {            this(type);            this.mbrs=mbrs;            this.from=from;        }        public String toString() {            switch(type) {                case HEARTBEAT:                    return "[FD: heartbeat]";                case HEARTBEAT_ACK:                    return "[FD: heartbeat ack]";                case SUSPECT:                    return "[FD: SUSPECT (suspected_mbrs=" + mbrs + ", from=" + from + ")]";                case NOT_MEMBER:                    return "[FD: NOT_MEMBER]";                default:                    return "[FD: unknown type (" + type + ")]";            }        }        public void writeExternal(ObjectOutput out) throws IOException {            out.writeByte(type);            if(mbrs == null)                out.writeBoolean(false);            else {                out.writeBoolean(true);                out.writeInt(mbrs.size());                for(Iterator it=mbrs.iterator(); it.hasNext();) {                    Address addr=(Address)it.next();                    Marshaller.write(addr, out);                }            }            Marshaller.write(from, out);        }        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {            type=in.readByte();            boolean mbrs_not_null=in.readBoolean();            if(mbrs_not_null) {                int len=in.readInt();                mbrs=new Vector(11);                for(int i=0; i < len; i++) {                    Address addr=(Address)Marshaller.read(in);                    mbrs.add(addr);                }            }            from=(Address)Marshaller.read(in);        }        public long size() {            int retval=Global.BYTE_SIZE; // type            retval+=Util.size(mbrs);            retval+=Util.size(from);            return retval;        }        public void writeTo(DataOutputStream out) throws IOException {            out.writeByte(type);            Util.writeAddresses(mbrs, out);            Util.writeAddress(from, out);        }        public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {            type=in.readByte();            mbrs=(Vector)Util.readAddresses(in, Vector.class);            from=Util.readAddress(in);        }    }    protected class Monitor implements TimeScheduler.Task {        boolean started=true;        public void stop() {            started=false;        }        public boolean cancelled() {            return !started;        }        public long nextInterval() {            return timeout;        }        public void run() {            Message hb_req;            long not_heard_from; // time in msecs we haven't heard from ping_dest            if(ping_dest == null) {                if(warn)                    log.warn("ping_dest is null: members=" + members + ", pingable_mbrs=" +                            pingable_mbrs + ", local_addr=" + local_addr);                return;            }            // 1. send heartbeat request            hb_req=new Message(ping_dest, null, null);            hb_req.putHeader(name, new FdHeader(FdHeader.HEARTBEAT));  // send heartbeat request            if(log.isDebugEnabled())                log.debug("sending are-you-alive msg to " + ping_dest + " (own address=" + local_addr + ')');            passDown(new Event(Event.MSG, hb_req));            num_heartbeats++;            // 2. If the time of the last heartbeat is > timeout and max_tries heartbeat messages have not been            //    received, then broadcast a SUSPECT message. Will be handled by coordinator, which may install            //    a new view            not_heard_from=System.currentTimeMillis() - last_ack;            // quick & dirty fix: increase timeout by 500msecs to allow for latency (bela June 27 2003)            if(not_heard_from > timeout + 500) { // no heartbeat ack for more than timeout msecs                if(num_tries >= max_tries) {                    if(log.isDebugEnabled())                        log.debug("[" + local_addr + "]: received no heartbeat ack from " + ping_dest +                                " for " + (num_tries +1) + " times (" + ((num_tries+1) * timeout) +                                " milliseconds), suspecting it");                    // broadcast a SUSPECT message to all members - loop until                    // unsuspect or view change is received                    bcast_task.addSuspectedMember(ping_dest);                    num_tries=0;                    if(stats) {                        num_suspect_events++;                        suspect_history.add(ping_dest);                    }                }                else {                    if(log.isDebugEnabled())                        log.debug("heartbeat missing from " + ping_dest + " (number=" + num_tries + ')');                    num_tries++;                }            }        }        public String toString() {            return Boolean.toString(started);        }    }    /**     * Task that periodically broadcasts a list of suspected members to the group. Goal is not to lose     * a SUSPECT message: since these are bcast unreliably, they might get dropped. The BroadcastTask makes     * sure they are retransmitted until a view has been received which doesn't contain the suspected members     * any longer. Then the task terminates.     */    protected final class Broadcaster {        final Vector suspected_mbrs=new Vector(7);        BroadcastTask task=null;        private final Object bcast_mutex=new Object();        Vector getSuspectedMembers() {            return suspected_mbrs;        }        /**         * Starts a new task, or - if already running - adds the argument to the running task.         * @param suspect         */        private void startBroadcastTask(Address suspect) {            synchronized(bcast_mutex) {                if(task == null || task.cancelled()) {                    task=new BroadcastTask((Vector)suspected_mbrs.clone());                    task.addSuspectedMember(suspect);                    task.run();      // run immediately the first time                    timer.add(task); // then every timeout milliseconds, until cancelled                    if(trace)                        log.trace("BroadcastTask started");                }                else {                    task.addSuspectedMember(suspect);                }            }        }        private void stopBroadcastTask() {            synchronized(bcast_mutex) {                if(task != null) {                    task.stop();                    task=null;                }            }        }        /** Adds a suspected member. Starts the task if not yet running */        protected void addSuspectedMember(Address mbr) {            if(mbr == null) return;            if(!members.contains(mbr)) return;            boolean added=false;            synchronized(suspected_mbrs) {                if(!suspected_mbrs.contains(mbr)) {                    suspected_mbrs.addElement(mbr);                    added=true;                }            }            if(added)                startBroadcastTask(mbr);        }        void removeSuspectedMember(Address suspected_mbr) {            if(suspected_mbr == null) return;            if(log.isDebugEnabled()) log.debug("member is " + suspected_mbr);            synchronized(suspected_mbrs) {                suspected_mbrs.removeElement(suspected_mbr);                if(suspected_mbrs.size() == 0)                    stopBroadcastTask();            }        }        void removeAll() {            synchronized(suspected_mbrs) {                suspected_mbrs.removeAllElements();                stopBroadcastTask();            }        }        /** Removes all elements from suspected_mbrs that are <em>not</em> in the new membership */        void adjustSuspectedMembers(List new_mbrship) {            if(new_mbrship == null || new_mbrship.size() == 0) return;            StringBuffer sb=new StringBuffer();            synchronized(suspected_mbrs) {                sb.append("suspected_mbrs: ").append(suspected_mbrs);                suspected_mbrs.retainAll(new_mbrship);                if(suspected_mbrs.size() == 0)                    stopBroadcastTask();                sb.append(", after adjustment: ").append(suspected_mbrs);                log.debug(sb.toString());            }        }    }    protected final class BroadcastTask implements TimeScheduler.Task {        boolean cancelled=false;        private final Vector suspected_members=new Vector();        BroadcastTask(Vector suspected_members) {            this.suspected_members.addAll(suspected_members);        }        public void stop() {            cancelled=true;            suspected_members.clear();            if(trace)                log.trace("BroadcastTask stopped");        }        public boolean cancelled() {            return cancelled;        }        public long nextInterval() {            return FD.this.timeout;        }        public void run() {            Message suspect_msg;            FD.FdHeader hdr;            synchronized(suspected_members) {                if(suspected_members.size() == 0) {                    stop();                    if(log.isDebugEnabled()) log.debug("task done (no suspected members)");                    return;                }                hdr=new FdHeader(FdHeader.SUSPECT);                hdr.mbrs=(Vector)suspected_members.clone();                hdr.from=local_addr;            }            suspect_msg=new Message();       // mcast SUSPECT to all members            suspect_msg.putHeader(name, hdr);            if(log.isDebugEnabled())                log.debug("broadcasting SUSPECT message [suspected_mbrs=" + suspected_members + "] to group");            passDown(new Event(Event.MSG, suspect_msg));            if(log.isDebugEnabled()) log.debug("task done");        }        public void addSuspectedMember(Address suspect) {            if(suspect != null && !suspected_members.contains(suspect)) {                suspected_members.add(suspect);            }        }    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?